Develop a simple streaming application using Spark in Databricks - dtoinagn/flyingbird.github.io GitHub Wiki

Important Streaming Terminologies

  • "ReadStream": the Entry Point to a steaming Application. It can be a function, or an API, using which the data can be consumed from the external source system. The external source system can be any database, file system or popular streaming services like Apache Kafka or Azure event Hub
  • "WriteStream": the output of a streaming application. It can be a function or a API. Once the data to process is consumed from the source system, and the data is processed, i.e. transformed and the required business aggregation is applied on the data, then the processed data can be written to the target using the "WriteStream" API
  • "CheckPoint": is used for incremental data loading. To handle the incremental data loading, the checkpoint keeps the track of the offset value of the data that gets processed in each of the batches. So that, when a streaming application is executed the next time only the unprocessed data gets processed. When a Spark Streaming application is executed the next time, it will first refer to the checkpoint location to retrieve the offset value. Based on the offset value, Spark understands which is the already processed data and which is the yet to process data. Spark will only pick the unprocessed data. Checkpoint is also used for Fault Tolerance in Spark streaming applications.
  • What is a Trigger Trigger is a special event that initiates the Spark Streaming application. There are three types of trigger:
    1. Default: In the default trigger, the incoming streaming data would be segregated as different micro batches. The Spark Streaming Application will start processing one micro batch at a time. Once, the processing of all the data of one micro batch is finished, only then Spark will start processing the data of next Micro Batch
    1. Fixed Interval: To process all the streaming data, which is accumulated during a fixed time interval, like -- in Every 1 hour or every 5 minutes.
    1. One time: To process all the accumulated streaming data at one shot. The one time trigger is equivalent to the Batch Mode

What are the different Output Modes of "Streaming Application"

The output modes of a Spark Streaming Application can be configured in the following ways:

    1. Append: to add, or accumulate only the incremental data of each trigger run. It is only applicable where the existing rows in the target are not expected to change
    1. Complete: in certain cases, some aggregation operation need to be performed on the data of each trigger run, thus the complete data of each trigger run needs to be included to achieve the correct aggregated value.
    1. Update: to update the existing rows in the target with the new data that is coming from each trigger run, the update output mode can be used

Standard Architecture of Streaming Application

In the standard architecture of the streaming application, there are four processes:

    1. Ingestion
    1. Processing: Once the source data is consumed from the topics of the streaming services, the databricks engine can apply business logics on the source data to cleanse, transform or aggregate.
    1. Storing: Once the transformation on the source data is done, the transformed data or aggregated data can be moved to the data warehouse, which can be a relational database (Cloud SQL) or datawarehouse (BigQuery)
    1. Reporting: From the Data warehouse, the data can flow to Reporting image

Brief of Implementation

    1. Framework to use: pyspark.sql
    1. Define the Schema of the incoming streaming data
    1. Raw storage of events: DBFS For a streaming application on Google Cloud Platform, raw event storage options typically include:
  1. Cloud Storage: it is frequently used as the landing zone for raw event data before further processing. It supports storing files like logs, batches of events, or data from streaming platforms in various formats (JSON, CSV, Avro, Parquet?)
  2. BigQuery: serverless, act as both a raw storage and analysis engine, integrates well with streaming services like Dataflow or Pub/Sub
  3. Cloud Pub/Sub: it is a real0time messaging service that can be used to collect and deliver event streams. While not primarily a storage service, it acts as an intermediary buffer for event data that can be ingested by downstream services (like Dataflow) for further processing and storage.
  4. Cloud Firestore: NoSQL database that can store event data, especially if the app requires a document-based database with real-time sync.
  5. Cloud Bigtable: High-performance NoSQL side-column database for large scale, low-latency workloads
  6. Cloud Spanner: transactional consistency is critical.
    1. Sample Codes:
from pyspark.sql.functions import *
from pyspark.sql.types import *

person_schema = StructType([
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("current_company", StringType(), False),
    StructField("exp_in_current_company", DoubleType(), False)
])

dbutils.fs.mkdirs("/FileStore/tables/streaming-data/incoming-streaming-data/json-files")
dbutils.fs.mkdirs("/FileStore/tables/streaming-data/output-of-streaming-data/")
dbutils.fs.mkdirs("/FileStore/tables/streaming-data/checkpoint-location")

df_stream = spark.readStream.format("json")\
                            .schema(person_schema)\
                            .option("multiLine", "true")\
                            .load("/FileStore/tables/streaming-data/incoming-streaming-data/json-files")
display(df_stream)

final_df = df_stream.writeStream.format("parquet")\
                                .outputMode("append")\
                                .option("path", "/FileStore/tables/streaming-data/output-of-streaming-data/")\
                                .option("checkpointLocation", "/FileStore/tables/streaming-data/checkpoint-location")\
                                .start()\
                                .awaitTermination()

dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/")
display(spark.read.format("parquet").load("/FileStore/tables/streaming-data/output-of-streaming-data/*.parquet"))
dbutils.fs.ls("/FileStore/tables/streaming-data/output-of-streaming-data/_spark_metadata/")