Spark Streaming example - geramkumar/ram-databricks-lab GitHub Wiki

Welcome to the ram-databricks-lab wiki! {"emp_id": 1, "name": "Alice", "department": "HR", "salary": 50000, "event_time": "2025-05-20T10:01:00"} {"emp_id": 2, "name": "Bob", "department": "Finance", "salary": 60000, "event_time": "2025-05-20T10:02:00"} {"emp_id": 3, "name": "Charlie", "department": "HR", "salary": 52000, "event_time": "2025-05-20T10:05:00"} {"emp_id": 4, "name": "David", "department": "IT", "salary": 70000, "event_time": "2025-05-20T10:07:00"} {"emp_id": 5, "name": "Eva", "department": "Finance", "salary": 62000, "event_time": "2025-05-20T10:09:00"} {"emp_id": 6, "name": "Frank", "department": "HR", "salary": 51000, "event_time": "2025-05-20T10:12:00"} {"emp_id": 7, "name": "Grace", "department": "IT", "salary": 73000, "event_time": "2025-05-20T10:15:00"} {"emp_id": 8, "name": "Helen", "department": "Finance", "salary": 64000, "event_time": "2025-05-20T10:17:00"} {"emp_id": 9, "name": "Ian", "department": "HR", "salary": 54000, "event_time": "2025-05-20T10:20:00"} {"emp_id": 10, "name": "Jack", "department": "IT", "salary": 75000, "event_time": "2025-05-20T10:22:00"} {"emp_id": 11, "name": "Kate", "department": "Finance", "salary": 66000, "event_time": "2025-05-20T10:25:00"} {"emp_id": 12, "name": "Leo", "department": "HR", "salary": 55000, "event_time": "2025-05-20T10:28:00"} {"emp_id": 13, "name": "Mona", "department": "IT", "salary": 77000, "event_time": "2025-05-20T10:30:00"} {"emp_id": 14, "name": "Nina", "department": "Finance", "salary": 68000, "event_time": "2025-05-20T10:32:00"} {"emp_id": 15, "name": "Oscar", "department": "HR", "salary": 57000, "event_time": "2025-05-20T10:35:00"} {"emp_id": 16, "name": "Paul", "department": "IT", "salary": 79000, "event_time": "2025-05-20T10:37:00"} {"emp_id": 17, "name": "Quinn", "department": "Finance", "salary": 70000, "event_time": "2025-05-20T10:40:00"} {"emp_id": 18, "name": "Rita", "department": "HR", "salary": 58000, "event_time": "2025-05-20T10:43:00"} {"emp_id": 19, "name": "Steve", "department": "IT", "salary": 80000, "event_time": "2025-05-20T10:45:00"} {"emp_id": 20, "name": "Tina", "department": "Finance", "salary": 72000, "event_time": "2025-05-20T10:48:00"}

== Tumbling Window and Hopping Window

from pyspark.sql import SparkSession from pyspark.sql.functions import window, col, sum

Initialize Spark session

spark = SparkSession.builder
.appName("SalesAggregationStream")
.getOrCreate()

Set your ADLS path and Delta table output path

input_path = "abfss://@.dfs.core.windows.net/sales_data/" output_path = "abfss://@.dfs.core.windows.net/aggregated_sales_delta/"

Read streaming JSON data from ADLS

sales_df = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(input_path)

Ensure required schema: timestamp and sales value

Assume the timestamp column is named "transaction_time" and is in ISO8601 format

sales_df = sales_df.withColumn("transaction_time", col("transaction_time").cast("timestamp"))

Aggregate sales every 10 minutes, allowing 15-minute lateness

aggregated_df = sales_df \

.withWatermark("transaction_time", "15 minutes") \
.groupBy(window(col("transaction_time"), "10 minutes")) \			 # Tumbling Window
		 window(col("transaction_time"), "10 minutes", "5 minutes")  # Hopping Window
.agg(sum("sales_value").alias("total_sales_value")) \
.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("total_sales_value")
)

Write the aggregated result to a Delta table

query = aggregated_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", output_path + "/_checkpoint")
.start(output_path + "/data")

query.awaitTermination()

⚠️ **GitHub.com Fallback** ⚠️