203. Kinesis Streams and DynamoDB Streams - qyjohn/AWS_Tutorials GitHub Wiki
(1) Create a Kinesis Stream
Read the following AWS documentation to understand what Kinesis Stream is.
In your Kinesis console, create a Kinesis stream with 2 shards. Then use the AWS CLI to practice the following operations. For more information on AWS CLI commands for Kinesis Stream, please refer to the following AWS documentation:
$ aws kinesis list-streams
$ aws kinesis describe-stream --stream-name [stream-name]
$ for i in `seq 01 99`; do
> aws kinesis put-record --stream-name [stream-name] --data "Test Data $i" --partition-key "$i"
> done
$ aws kinesis get-shard-iterator --stream-name [stream-name] --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON
$ aws kinesis get-records --shard-iterator [shard-iterator] --limit 10
It should be noted that the "Data" you saw from the CLI output is base64 encoded. You will need to decode it to view the real data. After you read some data from the shard, you get a NextShardIterator so that you can continue to read from the current position.
Use the AWS CLI or the AWS Console to change the number of shards in a stream. Compare the output from "aws kinesis describe-stream" to see what happens when the number of shards is changed.
(2) Amazon Kinesis Agent
The Amazon Kinesis Agent is a stand-alone Java software application that offers an easier way to collect and ingest data into Amazon Kinesis services, including Amazon Kinesis Streams and Amazon Kinesis Firehose.
Launch an EC2 instance from the AMI you created from your scalable web application homework. Install Amazon Kinesis Agent and configure the agent to push Apach logs (/var/log/apache2/*) to your Kinesis stream. Once you have this working, create a new AMI from the EC2 instance. Create a new launch configuration with the new AMI, update your Auto Scaling group to use the new launch configuration. Create an ELB with at least 2 web server in the back end.
Generate some test traffic from around the world using free test tools such as the following. Then read from the stream to see what you get.
Again, use the AWS CLI to read from the stream. As you can see, you are now aggregating the Apache logs from multiple web servers to the same Kinesis stream. In other words, the Kinesis stream now becomes your data sink for Apache logs.
(3) Some Programming with Java
Below is the demo code to dump the data in your Kinesis stream in real time. Run this code against your Kinesis stream to see what happens.
import java.io.*;
import java.util.*;
import java.nio.charset.Charset;
import com.amazonaws.*;
import com.amazonaws.auth.*;
import com.amazonaws.auth.profile.*;
import com.amazonaws.regions.*;
import com.amazonaws.services.kinesis.*;
import com.amazonaws.services.kinesis.model.*;
class ShardReader extends Thread
{
AmazonKinesisClient client;
String streamName, shardId;
public ShardReader(String streamName, String shardId)
{
client = new AmazonKinesisClient();
client.configureRegion(Regions.AP_SOUTHEAST_2);
this.streamName = streamName;
this.shardId = shardId;
}
public void run()
{
try
{
GetShardIteratorResult result1 = client.getShardIterator(streamName, shardId, "TRIM_HORIZON");
String shardIterator = result1.getShardIterator();
boolean hasData = true;
while (hasData)
{
GetRecordsResult result2 = client.getRecords(new GetRecordsRequest().withShardIterator(shardIterator));
shardIterator = result2.getNextShardIterator();
if (shardIterator == null)
{
hasData = false; // Shard closed.
}
List<Record> records = result2.getRecords();
if (records.isEmpty())
{
sleep(2000); // No records
}
else
{
for (Record record : records)
{
String data = Charset.forName("UTF-8").decode(record.getData()).toString();
System.out.println(shardId + "\t" + data);
}
}
}
}catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
public class KinesisDemo
{
public AmazonKinesisClient client;
public String streamName;
public KinesisDemo()
{
client = new AmazonKinesisClient();
client.configureRegion(Regions.AP_SOUTHEAST_2);
try
{
Properties prop = new Properties();
InputStream input = new FileInputStream("kinesis.properties");
prop.load(input);
streamName = prop.getProperty("streamName");
}catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
public void run()
{
try
{
DescribeStreamResult result = client.describeStream(streamName);
StreamDescription description = result.getStreamDescription();
List<Shard> shards = description.getShards();
for (Shard shard : shards)
{
String shardId = shard.getShardId();
new ShardReader(streamName, shardId).start();
}
}catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
public static void main(String[] args)
{
try
{
KinesisDemo demo = new KinesisDemo();
demo.run();
} catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
As you can see, this code gets a list of shards upon start up. If there is a change in the number of shards when the code starts execution, the code will not be aware of the new shards in the stream. Also, if there is something wrong during the execution of the program (for example, out-of-memory error), the program will need to start from the beginning of the stream again.
The Amazon Kinesis Client Library for Java (Amazon KCL) enables Java developers to easily consume and process data from Amazon Kinesis. KCL provides an easy-to-use programming model for processing data using Amazon Kinesis. KCL also helps with scale-out and fault-tolerant processing.
What would you do to develop an application to achieve display the top 10 IP addresses accessing your web site, ordered by the number of HTTP requests, during the past 5 minutes?
A DynamoDB stream is an ordered flow of information about changes to items in an Amazon DynamoDB table. When you enable a stream on a table, DynamoDB captures information about every modification to data items in the table. Whenever an application creates, updates, or deletes items in the table, DynamoDB Streams writes a stream record with the primary key attribute(s) of the items that were modified. DynamoDB Streams work in the same way as Kinesis Streams. As such, you can change the above-mentioned demo code to print out the changes in your DynamoDB table.
(4) Lambda Function
AWS Lambda is a serverless compute service that runs your code in response to events and automatically manages the underlying compute resources for you. You can use Kinesis Streams or DynamoDB Streams as triggers to your Lambda function.
Create a Lambda function with the following Python code. Add your Kinesis stream as a trigger to the Lambda function. Look into the CloudWatch logs to observe how the Lambda function executes.
from __future__ import print_function
import base64
import json
print('Loading function')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
print("Decoded payload: " + payload)
return 'Successfully processed {} records.'.format(len(event['Records']))
The following Python code can be used to process events from DynamoDB Streams:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
//console.log('Received event:', JSON.stringify(event, null, 2));
event.Records.forEach((record) => {
console.log(record.eventID);
console.log(record.eventName);
console.log('DynamoDB Record: %j', record.dynamodb);
});
callback(null, `Successfully processed ${event.Records.length} records.`);
};
(5) Homework
Write a Lambda function in Java to perform cross-region replication of a DynamoDB table. For example, you have a DynamoDB table in the ap-southeast-2 region, and any updates to the table must be replicated to another table in the us-east-1 region.