Spark Streaming - cchantra/bigdata.github.io GitHub Wiki

Introduction

In any stream processing system, broadly speaking, there are three steps in processing the data.

  1. Receiving the data: The data is received from sources using Receivers or otherwise.

  2. Transforming the data: The received data is transformed using DStream and RDD transformations.

  3. Pushing out the data: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

spark1

spark2

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Steps

Initialize streaming context

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

 

sc = SparkContext("local[2]", "NetworkWordCount")

ssc = StreamingContext(sc, 1)

create stream socket at port

lines = ssc.socketTextStream("localhost", 9999)

perform operation


words = lines.flatMap(lambda line: line.split(" "))

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

start socker server at port

go to terminal and type



nc -lk 9999

start spark context

ssc.start()  

example1

example2

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable distributed dataset.

Line stream in below ipynb example:

spark4

Streaming sources:

-Basic sources: Sources directly available in the StreamingContext API. Examples: file systems, and socket connections.

(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

-Advanced sources: Sources like Kafka, Flume, Kinesis, etc. are available through extra utility classes. (http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking)

 ssc.textFileStream('hdfs://localhost:9000/bank.csv')

Transformation on stream

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams

Ex: join streams

stream1 = ...

stream2 = ...

joinedStream = stream1.join(stream2)

(http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams)

dstream.foreachRDD is a powerful primitive that allows data to be sent out to external systems. However, it is important to understand how to use this primitive correctly and efficiently. Some of the common mistakes to avoid are as follows.

def sendRecord(rdd):

    connection = createNewConnection()  # executed at the driver

    rdd.foreach(lambda record: connection.send(record))

    connection.close()

To call it,

dstream.foreachRDD(sendRecord)

connection

Optimize the connection: by reusing connection objects across multiple RDDs/batches. One can maintain a static pool of connection objects than can be reused as RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.

def sendPartition(iter):

    # ConnectionPool is a static, lazily initialized pool of connections

    connection = ConnectionPool.getConnection()

    for record in iter:

        connection.send(record)

    # return to the pool for future reuse

    ConnectionPool.returnConnection(connection)

To call it,

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

Dataframe streaming and sql

http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

Streaming with MLIB

http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

Streaming linear regression

http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression

Streaming KMeans

http://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means

Checkpointing

A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures.

When to enable check pointing.

Usage of stateful transformations

Recovering from failures of the driver running the application

Configure check pointing

sc = SparkContext(...)  # new context

ssc = StreamingContext(...)

lines = ssc.socketTextStream(...)  # create DStreams

    ...

ssc.checkpoint(checkpointDirectory) 

To run in jupyter notebook. Make sure you set

export PYSPARK_PYTHON=/usr/bin/python3

export PYSPARK_DRIVER_PYTHON=/usr/bin/python3

in .bashrc file to make sure python version is consistent.

References

https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/ http://spark.apache.org/docs/latest/streaming-programming-guide.html