Pyspark window functions - geramkumar/ram-databricks-lab GitHub Wiki

from pyspark.sql import SparkSession from pyspark.sql.functions import col, row_number, rank, dense_rank, lag, lead, sum as _sum from pyspark.sql.window import Window

Initialize Spark session

spark = SparkSession.builder
.appName("AdvancedWindowFunctions")
.getOrCreate()

Sample data: Employee records

data = [ (1, "Alice", "HR", 5000, "2023-01-10"), (2, "Bob", "IT", 6000, "2023-01-12"), (3, "Charlie", "IT", 6000, "2023-01-13"), (4, "David", "Finance", 7000, "2023-01-11"), (5, "Eve", "HR", 4500, "2023-01-09"), (6, "Frank", "Finance", 7000, "2023-01-15"), (7, "Grace", "IT", 5500, "2023-01-14"), (8, "Heidi", "HR", 4800, "2023-01-08"), (9, "Ivan", "Finance", 7500, "2023-01-12"), (10, "Judy", "IT", 6200, "2023-01-10") ]

columns = ["emp_id", "name", "dept", "salary", "hire_date"]

Create DataFrame

df = spark.createDataFrame(data, schema=columns)

Define window by department ordered by salary descending

windowSpec = Window.partitionBy("dept").orderBy(col("salary").desc())

Add window functions

df_with_window = df.withColumn("row_num", row_number().over(windowSpec))
.withColumn("rank", rank().over(windowSpec))
.withColumn("dense_rank", dense_rank().over(windowSpec))
.withColumn("lag_salary", lag("salary", 1).over(windowSpec))
.withColumn("lead_salary", lead("salary", 1).over(windowSpec))
.withColumn("cumulative_salary", _sum("salary").over(windowSpec.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

Show results

df_with_window.orderBy("dept", "salary", ascending=False).show(truncate=False)