Amazon Kinesis Observer setup guide - snowplow-archive/sauna GitHub Wiki
HOME > GUIDE FOR DEVOPS > SETTING UP SAUNA > OBSERVERS > Amazon Kinesis Observer setup guide
This responder has not yet been released.
- 1. Overview
- 2. Compatibility
- 3. Amazon Kinesis setup
- 4. Sauna setup
- 4.1 Avro Schema
- 4.2 Example
- 5. Required AWS resources
- 6. Troubleshooting
This observer monitors an Amazon Kinesis stream for records arriving; each record will be processed as a Sauna command, which will trigger an appropriate responder action.
Note that this Kinesis stream can be homogeneous (all commands trigger a single responder) or heterogeneous (commands trigger different responders).
This responder will be released in Sauna version TBC.
Create a new Kinesis stream using the AWS CLI:
xxx
We recommend giving this stream the maximum record lifetime:
xxx
The stream will be created with a single shard. Shards are the unit of throughput in Amazon Kinesis; a single shard supports XXXX, which equates to about XXX commands per second (taking a very conservative estimate of 2kb per command).
If you need to send more commands, you can increase the number of shards:
xxx
The Amazon Kinesis Observer must be configured using a self-describing Avro which validates against this Schema:
iglu:com.snowplowanalytics.sauna.observers/AmazonKinesisConfig/avro/1-0-0
We can enable this observer by placing following Avro configurations to the configuration directory (filename must have extension .avro
or .json
):
{
"schema": "iglu:com.snowplowanalytics.sauna.observers/AmazonKinesisConfig/avro/1-0-0",
"data": {
"enabled": true,
"id": "com.acme.MyStreamObserver",
"parameters": {
"aws": {
"accessKeyId": "...",
"secretAccessKey": "..."
},
"kinesis": {
"region": "us-west-2",
"streamName": "acme-sauna-prioritycommands",
"maxRecords": 100,
"initialPosition": "AT_TIMESTAMP",
"initialPositionModifiers": {
"timestamp": "1486998718968"
}
}
}
}
}
Where:
-
id
uniquely defines this responder, and is used to track which records have already been processed -
aws
provides the credentials to read the stream -
kinesis.region
is the AWS region containing your stream -
kinesis.streamName
is the name of the Kinesis stream -
kinesis.maxRecords
is the maximum number of records to get from Kinesis per call toGetRecords
-
kinesis.initialPosition
only affects the first run of this Sauna responder. If set toLATEST
, this responder will start processing only the most recent records;TRIM_HORIZON
starts with the oldest records in the stream; andAT_TIMESTAMP
starts with the earliest record pushed after a specific timestamp -
kinesis.initialPositionModifiers
is the timestamp (as the Unix epoch date with precision in milliseconds) used with theAT_TIMESTAMP
initial position mode. (IfinitialPosition
is set toLATEST
orTRIM_HORIZON
,initialPositionModifiers
are ignored and should be set tonull
)
Under the hood, the Amazon Kinesis Observer uses the Kinesis Client Library, which is a Java library from AWS to simplify at-least once processing of all of a Kinesis stream's records across all shards.
To keep track of progress in processing these records, the KCL creates and manages a DynamoDB table, in which it stores checkpoints. This table is named after the responder's id
field.
The Kinesis observer implements a stream consumer which, besides retrieving data records, uses an Amazon DynamoDB table to store the application state and uploads metrics to CloudWatch. Therefore, the user whose credentials are provided in the config should have the following permissions:
-
Kinesis:
DescribeStream
,GetRecords
,GetShardIterator
-
DynamoDB:
CreateTable
,DescribeTable
,GetItem
,PutItem
,Scan
,UpdateItem
,DeleteItem
-
CloudWatch:
PutMetricData
A policy that grants the above permissions can be found in the IAM section of the Kinesis Developer Guide.
It is vitally important that the Amazon Kinesis Observer keep track of which records in the stream it has already executed as actions, and which ones are pending. If the observer loses track, then commands could be actioned twice.
As described in Required AWS resources above, the observer checkpoints its progress in a table in DynamoDB. To preserve these checkpoints and prevent commands being actioned twice:
- Make sure the DynamoDB table is not accidentally deleted or truncated
- Avoid changing the value for the
id
field in the configuration. Thisid
is used to identify the DynamoDB table, so changing theid
will effectively restart processing with a new checkpointing table