EMR Spark Kinesis Quick Test - qyjohn/AWS_Tutorials GitHub Wiki

Data Source

Create a Kinesis data stream with multiple shards, use the following python code to write some data into the stream:

import boto3
import uuid
import random
import time
import json

boto3.set_stream_logger(name='boto3')
client = boto3.client('kinesis')
stream = 'my-stream'
while True:
    msg_id = str(uuid.uuid1())
    msg_rd = random.randrange(100)
    message = {'message_id': msg_id, 'count': msg_rd}
    client.put_record(StreamName=stream, Data=json.dumps(message), PartitionKey=msg_id)
    time.sleep(1)

Kinesis Connector for Structured Streaming

wget https://repo1.maven.org/maven2/com/qubole/spark/spark-sql-kinesis_2.11/1.2.0_spark-2.4/spark-sql-kinesis_2.11-1.2.0_spark-2.4.jar

Start pyspark

pyspark --jars /home/hadoop/spark-sql-kinesis_2.11-1.2.0_spark-2.4.jar

Read from Stream

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json

stream='my-stream'
region='ap-southeast-2'
endpoint='https://kinesis.ap-southeast-2.amazonaws.com'


kinesis = spark.readStream.format('kinesis').option('streamName', stream).option('region', region).option('endpointUrl', endpoint).option('startingposition', 'TRIM_HORIZON').load()

schema = StructType([StructField("message_id", StringType()), StructField("count", IntegerType())])

kinesis.selectExpr('CAST(data AS STRING)').select(from_json('data', schema).alias('data')).select('data.*').writeStream.outputMode('append').format('console').trigger(once=True).start()