PDP 26 (Ingestion Watermarks) - derekm/pravega GitHub Wiki
Introduction
This proposal introduces watermarks to Pravega in the ingestion time domain. A watermark is a coarse-grained indication of when event data was written to the stream, made available to readers to perform time-aware processing. A key goal is to support event time processing in Apache Flink, which works best with connector-provided watermarks.
Event time processing is considered an important/strategic feature to support. Existing solutions that defer watermark handling to the Flink application (ref) are inadequate, because:
- the reader group may process stream events out of order (with respect to the ingestion time of the event), complicating the watermark assigner.
- watermark assigners do not have access to the Flink state backend, further limiting complexity.
- the reader group may rebalance segment assignment, leading to major jumps (forward or backwards) in time; Flink requires that the watermark be strictly ascending for a given subtask.
Watermarks effectively encapsulate system internals, e.g. regarding the processing order of the segments that constitute a stream. Though Pravega guarantees the ordering of events for a given routing key, a total temporal ordering is not guaranteed. Events with older timestamps may arrive after those with newer timestamps, especially in historical stream processing scenarios. Thus an event time clock may not be based solely on event timestamps; the clock is best regulated with a watermark.
For more information on the concept of event time, see Streaming 101 by Tyler Akidau and the Flink documentation.
Status: Under discussion
Table of Contents
- Introduction
- Concepts
- Proposed Changes
- Prototype
Concepts
Ingestion Time
Temporal Operations
Certain operations on a segment (e.g. creates, appends, merges) constitute temporal operations whose time of occurrence is significant. The timestamp of the temporal operation is drawn from the so-called ingestion time domain, defined as the real-world time on the segment server at the time of occurrence.
Watermarking
The significance of the time of occurrence of a temporal operation is that it serves as a marker for the progression of time as a stream is written. A reader may then use such time information to advance a watermark. Since streams are an append-only structure, a watermark with timestamp X
indicates that no event shall follow with an ingestion time before or at time X
. Watermarks are useful for stream processing applications where the temporal order of events is significant.
The phrase "(exclusive) minimum ingestion time" is sometimes used to describe the watermark. Implementation-wise, this implies that recorded timestamps should have 1L
subtracted to formulate a watermark (e.g. to accommodate numerous operations occurring within one millisecond).
Watermark Coordination
A reader group coordinates its reads to process each stream event exactly once and to formulate a group-level watermark. The group-level watermark indicates the exclusive minimum ingestion time of any subsequent event across the entire group. Processors should use the group-level watermark, rather than the segment-level watermark, to advance the event time clock. This approach ensures that segment assignment may change dynamically without causing the watermark to jump back on any reader instance (a situation that Flink does not support).
Stream Idleness
It is important to advance the watermark even when no writes occur on a given segment, i.e. in tail reads. Otherwise, events may remain buffered indefinitely within the processor as it awaits the further passage of time.
Event Time
The concept of event time refers to the time when a given event occurred rather than when the event was committed to a Pravega stream. Stream processing frameworks now provide a sophisticated programming model based on the event time domain. For example, time-based aggregations may be produced based on the advancement of the event time clock (as opposed to the processing time clock or wall-clock on the processor). This capability is seen as the foundation for a unified programming model for historical and real-time stream processing with correct (rather than speculative) results.
The event time of a given event is typically stored as a component of the event data and is opaque to Pravega.
Relating Event Time to Ingestion Time
In concept the ingestion time of an event is wholly unrelated to its event time since event data is opaque. In practical applications they're likely closely related, the delta for a given event attributable to a combination of buffering and transmission time. For example, an IOT sensor might produce sensor readings that are sent to Pravega within a few minutes.
In such a scenario, a stream processor may easily drive an event-time clock using the ingestion time watermarks encountered in the course of reading the stream. A concrete approach is to subtract an application-specific event-time lag from the ingestion timestamp, thus asserting an upper bound on the buffering and transmission time of the event. For example, a lag time of five minutes would set the event time clock to 11:55 when the ingestion time clock reaches 12:00, thus asserting that all events prior to 11:55 have been read. Late-record processing logic could compensate for erroneous assertions.
Proposed Changes
Overview
The basic idea is to record the time of last occurrence of a segment operation (segment creation, append, merge) as a metadata attribute, to convey that information as a watermark upon segment read, to track it in the reader group state, and to emit a group-wide minimum watermark.
For example: In the below diagram, the stream consists of two segments. The reader has encountered a watermark of 12:00
on segment 1, and 12:01
on segment 2. We can therefore be certain that no subsequent event (in any segment) will occur before 12:00
.
β create
β
β β append β W = 11:55
β β β
β β β merge β β W = 12:00
β β β β β
βββββββΌββββΌββββΌββββ β β β W = 12:02
β β updateAttribute βΌβββββββββΌβββββββββββββΌβββ
β β (LAST_WRITE_TIME) Segment 1 β A ββ B βββ C ββ D β
β Operation Log β βββββββββββββββΊ βββββββββββββββββββββββββ βββββ
β β βββββββββββββββββββββββββ β
β β Segment 2 β A ββ B ββ C βββ .. β β
β β βββββββββββββββ²ββββββββββ β
βββββββββββββββββββ β β
β W = 12:01 β
β
ββββββββββββββββββββββ chkpt
β β βββββββββββββββββββββ β
βΌ updateGroupState β Reader Group β β
βββββββββββββββββββββ β β State β β
β Event Reader β β β β β
β W=12:00 β ββββββββββββ S1(O=2,W=12:00) ββββββββ
βββββββββββββββββββββ β S2(O=3,W=12:01) β
β β β
βββββββββββreadββββββββββ βββββββββββββββββββββ
βΌ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βW(11:55)ββ 1A ββ 2A ββ 1B ββ 2B ββ 2C ββW(12:00)βββ 1C ββ 1D ββW(12:01)ββ .. β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
Specific changes are described in the next sections.
Protocol
SegmentRead
Enhance the SegmentRead
response to include a watermark indicating the (exclusive) minimum ingestion time of any subsequent data. Subsequent means any data at an offset beyond the range covered by the SegmentRead
or in a successive segment.
Note that SegmentRead
may be returned with a zero length to communicate a watermark without associated data. This is useful for watermarking of
tail reads.
StreamSegmentInfo
Enhance the StreamSegmentInfo
response to include the segment creation time (in the ingestion time domain).
Note that StreamSegmentInfo
contains a lastModified
field that is unrelated to this proposal. Consider changing its semantics to be
based on the ingestion clock.
Client
EventRead Interface
Return a watermark via new methods on the EventRead
interface: isWatermark()
and getWatermark()
.
Provide a new property isEvent()
to improve the forward compatibility of client code.
Time Domain Configuration
For backwards compatibility reasons, the client should opt-in to watermark reads. Otherwise a client that isn't
watermark-aware may erroneously treat a watermark as an event (since isEvent
was not previously available).
Provide a reader configuration option for this purpose, expressing the 'time domain' that the client wishes to operate in. Expose two domains:
ProcessingTime
reflecting current behavior (default value).IngestionTime
reflecting new behavior (watermarks).
SegmentInputStream
Enhance the SegmentInputStreamImpl
to expose the current watermark, to be read after a call to readEventData
. The semantics are
that the watermark pertains to the most recent event (or EndOfSegmentException
), and indicates the (exclusive) minimum ingestion time of any
subsequent event.
Note that watermarks obtained via SegmentRead
responses are understood to indicate the watermark after the read response is fully consumed.
Since the response data is unaligned with respect to event data, the watermark must be internally buffered.
Segment Metadata Client
Enhance the metadata client to provide the creation time for a given segment. This information shall be used by the reader group manager to ascertain the minimum watermark of a segment (see Reader Group Manager section).
Reader Group State
Track the overall reader group watermark in reader group state:
- Store the per-segment watermark for all incomplete segments (i.e. all of unassigned, assigned, future).
- Calculate the global watermark as the minimum of all per-segment watermarks.
- Initialize watermarks from ReaderGroupStateInit
- Update the watermarks from CheckpointReader update (w/ updated watermarks for given position data)
- Update the watermarks from SegmentCompleted update (w/ successive watermarks)
Sort the unassigned readers by watermark (ascending) to prioritize the oldest stream data.
Reader Group Manager
Use the stream metadata client to ascertain the watermark at a given position. Use the information to update the reader group state as mentioned above.
Use the creation time of the segment as a short-term alternative to obtain a pessimistic watermark. An improvement would be to use the ReadIndex to obtain a more accurate watermark.
EventReaderImpl
Track the watermarks obtained from segment input streams and checkpoint the watermarks.
Track the latest watermark and return the watermark at event read time whenever the value is increased due to a group state change.
Mock Segment Streams
Enhance the mock classes (e.g. MockSegmentIoStreams
) to support watermarks based on a synthetic clock. This is to facilitate
unit testing of the reader classes (e.g. to verify watermark progression).
Contracts
Segment Metadata Attributes
Introduce a LAST_WRITE_TIME
attribute with which to record the timestamp of the most recent temporal operation.
Consider renaming this attribute to LAST_OP_TIME
.
ReadResultEntry
Introduce a watermark property with implementation-specific semantics. For FutureReadResultEntry
, the watermark indicates the watermark of the data immediately preceding the future read range, for use with tail reads (see below). For EndOfStreamSegmentReadResultEntry
, the watermark indicates the watermark of the data at the tail of the segment.
ReadResultEntryContents
Introduce a watermark property indicating the watermark of subsequent (potentially future) content.
Segment Store
Time handling is encapsulated in the segment store, details follow.
Ingestion Time Clock
Introduce a clock to authoritatively determine the current time in the ingestion time domain. Construct the clock in the service builder, and provide it to the operation log and other components that process temporal operations.
Operation Processor
The operation processor serializes updates to a given segment and shall be responsible for assigning timestamps to temporal operations (using the ingestion time clock). A good reason to perform this function in the processor is to ensure that the timestamp monotonically increases. Details follow.
Appends
An append shall cause an update to the LAST_WRITE_TIME
attribute.
Transaction Merges
A transaction merge is semantically equivalent to a single, large append at the merge point. The timestamp of operations that produced the transaction segment are irrelevant; record a new timestamp for LAST_WRITE_TIME
when the merge is accepted.
Attribute Updates
An attribute update shall cause an update to the LAST_WRITE_TIME
attribute. The watermark advancer (discussed below) shall use a trivial attribute update operation (discussed below).
Segment Container
The segment container is responsible for segment creation (including transaction segments). Segment creation is not handled by the operation log.
Initialize the CREATION_TIME
and LAST_WRITE_TIME
attributes for a new segment (using the ingestion time clock).
Watermark Advancer
The watermark must advance on any segment being read, even when no writes are forthcoming. We'll refer to such segments as 'idle', not to be confused with 'inactive' which relates to eviction (see below). A helper service shall be introduced to periodically drive this process.
Open Issue: How should the watermark of inactive segments be handled? Ideally the watermark would be advanced as soon as the segment is reactivated.
Configuration: Max Watermark Lag
Provide a container-level configuration option to control the maximum lag on the watermark of active segments. For example, a configured value of one minute indicates that the watermark should be advanced on any segment that has been idle for at least a minute. The choice of value will determine how much overhead is incurred for idle segments. Suggested default value: 10 seconds.
Configuration: Polling Period
Provide a container-level configuration option to control the rate at which active segments are evaluated for idleness. Suggested value: 1 second.
Identifying Candidates
The advancer shall examine the container's active segments to identify segments whose watermark should be advanced (i.e. 'idle' segments). The criteria shall be:
- non-transaction segments,
- that are not sealed, and
- whose
LAST_WRITE_TIME
timestamp is in the past (according to the ingestion time clock) by more thanmaxWatermarkLag
.
LAST_WRITE_TIME
Updating To advance the watermark on a given idle segment, the container shall emit a trivial attibute update to the operation log. Since attribute updates are considered temporal operations, the LAST_WRITE_TIME
shall be updated automatically.
Eviction of Inactive Segments
Segments that haven't been used (read or written) in a while are evicted from the container's in-memory state, to minimize resource usage. The advancement of watermarks on idle segments mustn't interfere with this.
Read Index
The read index integrates reads across cache and storage and tracks future reads. Future reads are typically tail reads to be completed when new data is appended, but are also used when the cache is backfilled from storage.
Watermark Index
The read index shall maintain a list of known watermarks, indexed by offset. The index entry at a given offset conveys the (exclusive) minimum ingestion time of any data existing after that offset. For example, an entry of (42L, 12:00)
indicates that data at offset 43L
or beyond has an ingestion time after 12:00
. A special-case entry at offset -1
records the initial watermark of the segment.
Appends
The read index is updated as data is committed to the segment. Use the LAST_WRITE_TIME
attribute (as updated by the operation log) to establish a new watermark at (lastOffset, LAST_WRITE_TIME - 1)
.
Transaction Merges
The merge operation produces a new value for the LAST_WRITE_TIME
attribute, update the watermark index accordingly.
Attribute Updates
Enhance the read index to be informed of attribute updates, so as to read the LAST_WRITE_TIME
attribute and update the watermark index.
This is necessarily only if AttributeUpdateOperation
is used to advance the watermark on idle streams. Adapt accordingly.
Future Reads
Future reads should complete upon an update to the relevant watermark, to inform the reader which is waiting for more data. Introduce a watermark property on FutureReadResultEntry
to track the watermark value that existed when the future read was initiated; compare this value to the updated watermark to determine whether the read should complete.
Keep in mind that not all future reads are tail reads.
Redirected Reads
Redirected reads obtain their content from a child (e.g. transaction) segment. The watermark to be associated with the content must be obtained from the parent segment, since the merge transaction updates the LAST_WRITE_TIME
after the child segment is sealed.
Request Processor
readSegment
The processor produces SegmentRead
responses from the ReadResult
and its collection of entries. The processor must return the watermark
associated with the last entry's content (i.e. the cache, storage, or future result). The ReadIndex
determines the watermark value. When aggregating cache entries, use the last entry's watermark.
getStreamSegmentInfo
Enhance to return the creation time of the segment (in the ingestion time domain, as recorded with the CREATION_TIME
metadata attribute).
createTransaction
Consolidate the logic related to establishing the CREATION_TIME
attribute to the segment container.
Flink Streaming Data Source
The connector shall be enhanced to use the ingestion watermarks as the basis for event time watermarks.
Maximum Event Time Lag
The data source shall expose a configurable property called maxEventTimeLag
, with which to specify the expected delta between the ingestion time (as provided by Pravega) and the event time of a given event (as obtained via a Flink TimestampAssigner
).
The data source shall subtract the configured value from any received watermark, and then emit the adjusted watermark to Flink.
Timestamp Assigner
The data source shall accept a TimestampAssigner
with which to assign timestamps to incoming events following deserialization. This is a convenience to eliminate the need for a transformation operator solely dedicated to assigning timestamps.
Prototype
A end-to-end prototype was developed to demonstrate the viability of the proposal, using Apache Flink as the stream processor.
Source Code
The prototype is split into two pull requests:
- Pravega: pravega/pravega/branches/pdp-26-ingestion-watermark
- Flink Connector: pravega/flink-connectors/branches/issue-41-ingestion-watermark
Incomplete Features and Potential Improvements
Segment Eviction
The mechanism by which the watermark is advanced on idle segments is probably undermining segment eviction.
System Streams
The watermark is advanced on all streams including the system streams (e.g. _system/_commitStream
). Should the system streams be avoided?
Reader Checkpoint State (External)
- Include watermarks in reader checkpoint state. Current solution is to fetch the watermarks at restore time.
Sealed Segments
The AsyncSegmentInputStream
synthesizes a SegmentRead
response when a SegmentSealed
error is received, with a watermark
value of Long.MAX_VALUE
. Improve the behavior to use the actual watermark.
Recovery
Issue: Validate the design for potential recovery issues.
Prototype Output
For reference, output from the EventTimeITCase
class (source code) within the Flink connector test suite. Selected output highlighting the handling of three specific events, from an on-time (A), early (B), and late (C) sensor. Observe that the events have been effectively reordered into event-time order.
DEBUG i.p.c.flink.FlinkPravegaReader - Advancing the event time watermark: Watermark @ 1510626708681
...
DEBUG i.p.c.f.u.AbstractStreamBasedWriter - Writing: (1510626750230,B)
DEBUG i.p.c.f.u.AbstractStreamBasedWriter - Writing: (1510626719197,A)
DEBUG i.p.c.f.u.AbstractStreamBasedWriter - Writing: (1510626691235,C)
DEBUG i.p.c.f.EventTimeOrderingOperator - Discarded a late record: (1510626691235,C)
...
DEBUG i.p.c.flink.FlinkPravegaReader - Advancing the event time watermark: Watermark @ 1510626726273
(1510626719197,A)
...
DEBUG i.p.c.flink.FlinkPravegaReader - Advancing the event time watermark: Watermark @ 1510626754349
(1510626750230,B)