Sinks - rambabu-chamakuri/PSTL-DOC GitHub Wiki
Table of Contents
Overview
A sink is...
Syntax
SAVE STREAM [database-name.]table-name
TO [streaming-query-provider]
[IDENTIFIED BY streaming-query-name]
[TRIGGER INTERVAL]
[OUTPUT MODE [APPEND | COMPLETE | UPDATE]]
[PARTITIONED ON(column, ...)]
[OPTIONS ('key'='value', ...)]
Features
Exponential Backoff
Jaguar provides supervised exponential backoff for all streaming queries. If a streaming query fails at runtime, Jaguar will automatically attempt to restart the streaming query at a point in the future, using exponential backoff to calculate the amount of delay to incur prior to attempting to restart the streaming query. Each streaming query tracks the number of successive restarts and failures since it last successfully processed data for a stable period of time. The number of successive restarts and failures is used in the exponent component when calculating the next restart delay. Specifics about implementation details can be found in the How It Works Guide. Below is an example showing how the exponential backoff calculations grow for successive failures:
number_failures | min_backoff | max_backoff | random_factor | delay |
---|---|---|---|---|
0 | 1s | 1h | 0.0 | 1s |
1 | 1s | 1h | 0.0 | 2s |
2 | 1s | 1h | 0.0 | 4s |
3 | 1s | 1h | 0.0 | 8s |
4 | 1s | 1h | 0.0 | 16s |
number_failures | min_backoff | max_backoff | random_factor | delay |
---|---|---|---|---|
0 | 1s | 1h | 0.1 | 1023ms |
1 | 1s | 1h | 0.1 | 2064ms |
2 | 1s | 1h | 0.1 | 4094ms |
3 | 1s | 1h | 0.1 | 8495ms |
4 | 1s | 1h | 0.1 | 16062ms |
Query Name
All streaming queries must have a unique name within their job definition. By default, streaming queries will attempt to derive a unique name based on their input. Users can explicitly name their streaming queries if they prefer a particular naming convention. Occassionally, certain job definitions result in streaming queries deriving conflicting query names, in these scenarios the user must provide unique names for each query.
In the following example, the user has not provided an explicit query name for their streaming query. In these cases, the table providing data to the streaming query will be used as the query name. In this case, the query name would be foo
.
SAVE STREAM foo
TO CONSOLE;
In some cases, the user may wish to save the same table to two different data stores. In these cases, the user must explicitly name each streaming query, otherwise two streaming queries will attempt to use the same query name: foo
. Since this would result in two streaming queries with the same name, it would fail, since query names must be unique within their job definition. Instead, the user can name each streaming query using the IDENTIFIED BY
clause.
SAVE STREAM foo
TO CONSOLE
IDENTIFIED BY foo_console;
SAVE STREAM foo
TO JSON
IDENTIFIED BY foo_json;
Even if the detected query name is unique within a job definition, the user can always choose to provide their own preferred query name using the IDENTIFIED BY
clause.
SAVE STREAM foo
TO CONSOLE
IDENTIFIED BY foo_console;
Output Mode
Not all streaming queries support every output mode, please refer to sink specific documentation to determine which output modes are supported by a specific sink.
A streaming query's output mode determines how
Append
Complete
Update
Partitioning
Not all streaming queries support partitioning, those which do will document how the PARTITIONED ON
clause is handled.
A streaming query can optionally specify a list of columns to be used for partitioning. The columns provided for partitioning must be defined in the streaming query's input table. Each type of streaming query will implement the partitioning clause if the underlying infrastructure for that type of streaming query has such a concept.
SAVE STREAM foo(a int, b int, c int)
TO JSON
PARTITIONED ON(a, b);
Trigger
A streaming query can optionally specify a trigger. A trigger specifies how frequently a streaming query should attempt to execute micro-batches. By default, no trigger is specified (e.g., process data as quickly as possible). When a trigger is specified, streaming queries will schedule micro-batches based on the user provided interval. Typically, users will provide a trigger when and where needed, a few example use cases are listed below:
- Job writing to distributed file system may trigger once an hour to reduce small file amplification by queueing data longer at the source.
- Job processing a volatile, bursty source may trigger once per minute combined with source throttling to ensure consistent batch sizes and end-to-end latency.
- Job processing a considerable historical backlog may trigger once per minute combined with source throttling to ensure consistent batch sizes and end-to-end latency.
When no trigger is specified, streaming queries attempt to execute as quickly as they can process micro-batches, assuming the underlying source(s) have new data available to process. If a streaming query's underlying source(s) have no new data available, the streaming query will wait up until spark.sql.streaming.pollingDelay
(defaults to 10ms) before attempting to poll for new data. This delay is important to prevent streaming queries from polling for new data in a tight loop, potentially DDoSing the source with metadata requests, etc.
Provide values in the format 15 SECONDS
, 1 MINUTE
, or 2 HOURS
, etc. Valid units include:
MILLISECOND
: millisecondsMILLISECONDS
: millisecondsSECOND
: secondsSECONDS
: secondsHOUR
: hoursHOURS
: hours
SAVE STREAM foo
TO CONSOLE
TRIGGER 5 MINUTES;
Write Ahead Log
Not all streaming queries support a write-ahead-log, those which do not will explicitly state a lack of support in their feature list.
Most streaming queries leverage a write-ahead-log (WAL) internally. The WAL allows the streaming query to checkpoint incremental state. When a streaming query fails, this state is recovered from the WAL. In most cases, this allows the streaming query to pick up exactly where it left off. In some cases, the streaming query may need to reprocess the most recent micro-batch.
The contents of each streaming query's WAL will vary depending on the DAG being processed by that specific streaming query (e.g., dependent on the type of source(s) in the leaf node(s), etc.). However, most WAL(s) will have a common structure, with a few prominent subdirectories. It is very important the WAL is written to a durable, highly-available storage location. If a streaming query fails and is restarted elsewhere on a cluster, the WAL must be available so the streaming query can recover state (e.g., if the WAL is unavailable, the streaming query can not start). For these reasons, the WAL is typically stored in HDFS or a DFS with similar characteristics.
Below is sample contents from a streaming query's WAL. Each component of the WAL is explained following.
[bowdch01@msc02-jag-en-001 ~]$ hdfs dfs -ls /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd
Found 4 items
drwxr-xr-x - pstl hdfs 0 2018-01-11 23:01 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits
-rw-r--r-- 2 pstl hdfs 45 2017-12-08 18:49 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/metadata
drwxr-xr-x - pstl hdfs 0 2018-01-11 23:00 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/offsets
drwxr-xr-x - pstl hdfs 0 2017-12-08 18:49 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/sources
commits
The commits subdirectory contains empty files, named by a monotonically increasing id (the id of each micro-batch executed by this streaming query). When a streaming query recovers, the latest commit file indicates to the streaming query the last successfully processed micro-batch. If the latest offset file matches the latest commit file, the streaming query can safely start processing the next micro-batch. The commits directory is self compacting, that is, old commit files are phased out as they become less important as new micro-batches are processed.
[bowdch01@msc02-jag-en-001 ~]$ hdfs dfs -ls /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 20:46 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/960
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 21:01 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/961
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 21:16 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/962
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 21:31 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/963
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 21:46 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/964
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 22:01 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/965
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 22:17 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/966
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 22:31 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/967
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 22:47 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/968
-rw-r--r-- 2 pstl hdfs 5 2018-01-11 23:01 /user/pstl/write-ahead-logs/kafka_hdfs_archive/kafka_yyyymmdd/commits/969
metadata
The metadata file contains JSON encoded metadata about the streaming query, namely a unique id for the streaming query which is consistent across restarts. Typically, the metadata file is written only once, the first time a streaming query starts. Subsequently, the metadata file is rarely touched, only being read on recovery to extract the id of the streaming query.
{"id":"8d03b1f6-c2d8-43eb-ae21-65ac2b7f58e8"}
offsets
The offsets directory contains JSON encoded files, named by a monotonically increasing id (the id of each micro-batch). The contents of these JSON files varies depending on the type of streaming source(s) this streaming query is processing. The contents of these JSON files represents the underlying streaming source(s) offset implementation. The state represented by this data indicates the frame of data processed in each micro-batch (e.g., offset file 989-990 is the metadata about the range of data processed for micro-batch 990). The offsets directory is self compacting, that is, old offset files are phased out as they become less important as new micro-batches are processed.
v1
{"batchWatermarkMs":0,"batchTimestampMs":1515711600158,"conf":{"spark.sql.shuffle.partitions":"200","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
{"mdn_prod_gdcs_sevone":{"8":5472909926,"11":5473266638,"2":5473295504,"5":5473314495,"4":5473315204,"7":5472544553,"10":5473244264,"1":5473312321,"9":5473098186,"3":5473222379,"6":5472357203,"0":5473297970},"nwk_prod_gdcs_sevone":{"8":6788527896,"11":6788300045,"2":6788539755,"5":6788535792,"4":6788538271,"7":6788532202,"10":6788538223,"1":6788535192,"9":6788538774,"3":6788531994,"6":6788533577,"0":6788508905}}
sources
The sources directory contains a subdirectory for each streaming source required by this streaming query's DAG. Each subdirectory is named 0
, 1
, 2
, etc. based on the ordinal position of each streaming source in the streaming query's DAG. Within each subdirectory is a file, whose name is the id of the micro-batch at which point this source first started contributing data to the streaming query's execution plan. The contents of these file(s) are JSON encoded contents in the same format of the offsets
WAL directory (e.g., their contents vary depending on the underlying streaming source).
v1
{"mdn_prod_gdcs_sevone":{"8":238049894,"11":238452150,"2":238453383,"5":238455513,"4":238455351,"7":237738872,"10":238432534,"1":238452231,"9":238238049,"3":238429618,"6":237558524,"0":238436863},"nwk_prod_gdcs_sevone":{"8":299224276,"11":298992074,"2":299225791,"5":299223615,"4":299225249,"7":299224724,"10":299225828,"1":299224454,"9":299225377,"3":299224974,"6":299225528,"0":299203423}}
See checkpointLocation for information about write-ahead-log configuration.
Options
checkpointLocation
Defines where the write ahead log should be stored for this streaming query.
While this is a required setting for most streaming queries, Jaguar will automatically configure the checkpoint location for streaming queries using a convention based format if spark.sql.streaming.checkpointLocation
is set (e.g., ${spark.sql.streaming.checkpointLocation}/$jobId/$queryId
).
SAVE STREAM foo
TO JSON
OPTIONS(
'checkpointLocation'='/path/to/write-ahead-log'
);
pstl.retry.enabled
Defines whether exponential backoff is enabled for this streaming query. If true
, we will attempt to restart a failed streaming query based on the calculated exponential backoff delay. If false
, a failed streaming query will remain stopped permanently unless user intervention is taken.
If not specified, defaults to spark.pstl.retry.enabled
. If spark.pstl.retry.enabled
is not specified, defaults to true
.
SAVE STREAM foo
TO CONSOLE
OPTIONS(
'pstl.retry.enabled'='false'
);
pstl.retry.max.attempts
Defines the maximum number of times to attempt to restart a failed streaming query. If a streaming query can not be successfully restarted after the defined number of attempts, the streaming query will be considered permanently failed.
TODO: describe valid values better
If not specified, defaults to spark.pstl.retry.max.attempts
. If spark.pstl.retry.max.attempts
is not specified, defaults to 2147483647
.
SAVE STREAM foo
TO CONSOLE
OPTIONS(
'pstl.retry.max.attempts'='3'
);
pstl.retry.max.backoff
Defines the maximum amount of time to wait before attempting to restart a failed streaming query.
Provide values in the format 50s
, 100ms
, or 250us
, etc. Note there is no space between the number and units. Valid units include:
us
: microsecondsms
: millisecondss
: secondsm
: minutesmin
: minutesh
: hoursd
: days
If not specified, defaults to spark.pstl.retry.max.backoff
. If spark.pstl.retry.max.backoff
is not specified, defaults to 1h
.
SAVE STREAM foo
TO CONSOLE
OPTIONS(
'pstl.retry.max.backoff'='1d'
);
pstl.retry.min.backoff
Defines the minimum amount of time to wait before attempting to restart a failed streaming query.
Provide values in the format 50s
, 100ms
, or 250us
, etc. Note there is no space between the number and units. Valid units include:
us
: microsecondsms
: millisecondss
: secondsm
: minutesmin
: minutesh
: hoursd
: days
If not specified, defaults to spark.pstl.retry.min.backoff
. If spark.pstl.retry.min.backoff
is not specified, defaults to 1s
.
SAVE STREAM foo
TO CONSOLE
OPTIONS(
'pstl.retry.min.backoff'='1m'
);
pstl.retry.random.factor
Defines the amount of entropy to introduce when calculating the amount of time to wait before restarting a failed streaming query. While the minimum and maximum amount of time to wait before attempting to restart a failed streaming query is dictated by the minimum and maximum backoff, random factor adds a degree of randomness to the calculation. This randomness helps ensure if multiple streaming queries failed at exactly the same point in time, due to the same underlying cause, their attempted restarts are slightly staggered to help prevent bursty restarts. You can think of random factor as an additive percent multiplier to the calculated delay when attempting to restart a failed streaming query. Valid values include any floating point number greater than or equal to zero.
If not specified, defaults to spark.pstl.retry.random.factor
. If spark.pstl.retry.random.factor
is not specified, defaults to 0
.
SAVE STREAM foo
TO CONSOLE
OPTIONS(
'pstl.retry.random.factor'='0.2'
);
Next Steps
Feel free to look into the specifics of each sink. Below is a list of all available sinks you can use when creating job definitions: