Pyspark basics 1 - geramkumar/ram-databricks-lab GitHub Wiki
from pyspark.sql import SparkSession from pyspark.sql.functions import col, when, isnull, lower, upper, concat_ws, substr, to_date, current_date, datediff from pyspark.sql.types import StringType from pyspark.sql.functions import udf
Initialize Spark session
spark = SparkSession.builder
.appName("Nulls_Joins_UDF_String_Date")
.getOrCreate()
Sample employee data with nulls
emp_data = [ (1, "Alice", "HR", 5000, "2023-01-10"), (2, "Bob", "IT", None, "2023-01-12"), (3, None, "IT", 6000, "2023-01-13"), (4, "David", None, 7000, None), (5, "Eve", "HR", 4500, "2023-01-09"), (6, "Frank", "Finance", None, "2023-01-15"), (7, "Grace", "IT", 5500, "2023-01-14"), (8, "Heidi", "HR", 4800, "2023-01-08"), (9, "Ivan", "Finance", 7500, "2023-01-12"), (10, "Judy", None, 6200, "2023-01-10") ] emp_columns = ["emp_id", "name", "dept", "salary", "hire_date"]
emp_df = spark.createDataFrame(emp_data, emp_columns)
--- 1. Null handling ---
Fill null values
emp_df_filled = emp_df.fillna({"salary": 0, "dept": "Unknown", "name": "Unnamed", "hire_date": "1900-01-01"})
Drop rows where any column is null
emp_df_dropped = emp_df.dropna()
Filter rows where salary is null
emp_df_null_salary = emp_df.filter(col("salary").isNull())
--- 2. String functions ---
emp_str_df = emp_df_filled.withColumn("name_upper", upper(col("name")))
.withColumn("dept_lower", lower(col("dept")))
.withColumn("initial", substr(col("name"), 1, 1))
.withColumn("emp_full", concat_ws(" - ", col("name"), col("dept")))
--- 3. Date functions ---
emp_date_df = emp_str_df.withColumn("hire_date", to_date("hire_date", "yyyy-MM-dd"))
.withColumn("days_since_hired", datediff(current_date(), col("hire_date")))
--- 4. UDF: Add suffix to name ---
def add_suffix(name): return name + "_emp" if name else "unknown_emp"
add_suffix_udf = udf(add_suffix, StringType()) emp_udf_df = emp_date_df.withColumn("name_with_suffix", add_suffix_udf(col("name")))
--- 5. Joining with dept metadata ---
dept_data = [ ("HR", "Human Resources"), ("IT", "Information Tech"), ("Finance", "Finance Dept") ] dept_columns = ["dept", "dept_fullname"] dept_df = spark.createDataFrame(dept_data, dept_columns)
Various joins
inner_join_df = emp_udf_df.join(dept_df, on="dept", how="inner") left_join_df = emp_udf_df.join(dept_df, on="dept", how="left") outer_join_df = emp_udf_df.join(dept_df, on="dept", how="outer")
Show final DataFrames
print("\n=== Null Salaries ===") emp_df_null_salary.show()
print("\n=== String Functions ===") emp_str_df.select("emp_id", "name", "name_upper", "dept", "dept_lower", "initial", "emp_full").show()
print("\n=== Date Functions + UDF ===") emp_udf_df.select("emp_id", "name", "hire_date", "days_since_hired", "name_with_suffix").show()
print("\n=== Inner Join ===") inner_join_df.select("emp_id", "name", "dept", "dept_fullname").show()
print("\n=== Outer Join (to see unmatched rows too) ===") outer_join_df.select("emp_id", "name", "dept", "dept_fullname").show()