Sources - rambabu-chamakuri/PSTL-DOC GitHub Wiki
Table of Contents
Overview
A Source is a fundamental building block for a Jaguar Job. Sources serve data incrementally, in a streaming fashion, to a specific Sink. Between a Source and a Sink, there may optionally be Transforms provided by the user to manipulate data from the Source as desired, prior to feeding output to the Sink. Stated differently, a Source must have a monotonically increasing notion of progress that can be represented as an Offset. Spark will regularly query each Source to see if any more data is available for processing.
WARNING: Do not confuse our usage of Offset
with Kafka. Internally an Offset
is an arbitrary object which represents a monotonically increasing notion of progress for a specific Source.
If you are familiar with Spark's execution model, a Source is lazy. That is, a Source is meaningless until it is referenced by a Sink. If you are not familiar with Sinks, feel free to read the Sink Guide after finishing this guide.
Sources are defined with the following syntax:
CREATE STREAM foo
FROM JSON;
In the above example, foo
is the name of this unique Source we wish to define. All defined sources automatically register a temporary view based on their unique name (e.g., foo
in this case). Multiple operations following the definition of this unique Source can reference foo
. Each Source must come from somewhere (e.g., files, kafka, etc.). The example above specifies we will fetch data from JSON
(e.g., json files). Specific sources can define a short, human-readable name for reference. Alternatively, we can also reference a specific source by a fully qualified class name. When referencing a fully qualified class name, the class must implement StreamSourceProvider.
CREATE STREAM bar
FROM org.apache.spark.sql.kafka010.KafkaSourceProvider;
You will find most sources reference the short, human-readable name for reference, rather than the fully qualified class name. If the StreamSourceProvider also implements DataSourceRegister and provides the necessary ServiceLoader metadata in their packaging format (e.g., JAR), the short, human-readable name is a less verbose reference to the fully qualified class name.
Some sources must be configured with a user provided schema, as they do not have enough runtime information to assemble a schema for the underlying table. Other sources optionally accept a user provided schema if the user is only interested in exposing a subset of the underlying data. In either case, schemas can be provided with the following syntax:
CREATE STREAM foo(a int, b string)
FROM JSON;
In the above example, the underlying table foo
, would have the following schema:
column_name | data_type | nullable |
---|---|---|
a | int | true |
b | string | true |
Most sources also accept a set of configuration options. In most cases, sources have a set of required options which must be provided for them to function correctly. Options are simply a set of key / value pairs which allow the user to provide specific settings for their use case. Options vary depending on the source, so always reference the documentation for the specific source you are using if you have questions. Options can be provided with the following syntax:
CREATE STREAM foo
FROM JSON
OPTIONS(
'path'='/path/to/json'
);
In the above example, we specify the path
option to the JSON source, with a value of /path/to/json
. To provide multiple options to a source, simply pass them as a comma separated list:
CREATE STREAM foo
FROM JSON
OPTIONS(
'path'='/path/to/json',
'dateFormat'='MM-dd-yyyy'
);
Features
Rate Limiting
Certain streaming query sources support optional rate limiting. Rate limiting allows the user to influence how much data a source should serve per micro-batch. Typically, rate limiting is advantageous for use cases where micro-batches require stable end-to-end micro-batch latency. Certain workloads may develop a natural ebb and flow which can sometimes be smoothed with rate limiting. Rate limiting can also help reduce "risk" in some situations. If a source has a considerable backlog to process, rate limiting will produce many, smaller micro-batches rather than fewer, large micro-batches. Although we sacrifice some throughput with smaller micro-batches, since there is a marginally larger percentage of time spent book-keeping on the driver, we carry less risk. If we were processing one large micro-batch, a recurrent failure may prevent any data from being committed forward to the sink. Smaller micro-batches will commit data to the sink more frequently, so incremental progress on the underlying sink will be visible to users before the failure occurs.
Since rate limiting is optionally supported based on functionality present in the source's infrastructure, users will configure rate limiting via an option on the source. Please refer to each source's documentation to learn more about rate limiting for that specific source.
Schema
Certain streaming query sources either require a user-defined schema, or allow the user to influence the schema with a user-defined schema. User's can provide schema definitions to sources by specifying a list of column names and their respective data types. Refer to the syntax as needed. An example source with a user-defined schema follows:
CREATE STREAM foo(a int, b string)
FROM JSON
OPTIONS(...);
Please refer to each source's documentation to learn more about whether schema definitions are required or optionally supported.
Write Ahead Log
TODO: discuss source specific write-ahead-log implementation details
Syntax
CREATE STREAM table-name[ (column datatype, ...) ]
FROM streaming-query-source
[ OPTIONS('key'='value', ...) ]
Next Steps
With your new found knowledge about sources, feel free to look into the specifics of each source implementation. Below is a list of all available sources you can use when creating job definitions.