File Sink - rambabu-chamakuri/PSTL-DOC GitHub Wiki

File Sink(s) write results from a streaming query to files at the specified path. There are many types of File Sinks, though they share common behavior. File Sinks typically vary behaviorally based on the underlying file format they support.

Below are a list of File Sink implementations:

Features

Output Mode Append

TODO: describe append semantics

Partitioning

All File Sinks support Hive-like partitioning when generating files. For example, a user may have the following table:

id name year month day
0 foo 2018 01 01
1 bar 2018 01 02

The user may wish to partition on year, month, and day. Partitioning on these columns will reduce the amount of data they need to scan when running queries for a specific time range (e.g., partition pruning). To partition on these columns, the user would simply provide SQL as below:

SAVE STREAM foo
TO JSON
PARTITIONED ON(year, month, day)
OPTIONS(
  'path'='/tmp/data'
);

In this case, given our example data, the user would get a directory structure like below:

/tmp/data
└── year=2018
    └── month=01
        ├── day=01
        └── day=02

Options

path

spark.sql.streaming.fileSink.log.cleanupDelay

If spark.sql.streaming.fileSink.log.deletion is enabled, how long to keep old write-ahead-log files around.

Provide values in the format 50s, 100ms, or 250us, etc. Note there is no space between the number and units. Valid units include:

  • us: microseconds
  • ms: milliseconds
  • s: seconds
  • m: minutes
  • min: minutes
  • h: hours
  • d: days

Defaults to 10m.

-- spark.properties: spark.sql.streaming.fileSink.log.cleanupDelay=1h
SAVE STREAM foo
TO TEXT
OPTIONS(...);

spark.sql.streaming.fileSink.log.compactInterval

Specifies the frequency, in micro-batches, the file sink compacts older write-ahead-log files (e.g., every 10th micro-batch). To prevent creating too many small files within the write-ahead-log, file sinks periodically merge older write-ahead-log entries into a larger file.

Defaults to 10.

-- spark.properties: spark.sql.streaming.fileSink.log.compactInterval=20
CREATE STREAM foo
FROM TEXT
OPTIONS(...);

spark.sql.streaming.fileSink.log.deletion

Whether older write-ahead-log files are eligible for deletion. To be deleted, write-ahead-log entries must be older than spark.sql.streaming.fileSink.log.compactInterval and spark.sql.streaming.fileSink.log.cleanupDelay.

Defaults to true.

-- spark.properties: spark.sql.streaming.filSink.log.deletion=false
CREATE STREAM foo
FROM TEXT
OPTIONS(...);