PDP 19 (Pravega Retention) - derekm/pravega GitHub Wiki

Proposal for retention in Pravega

Status: Under discussion

Related issues: Issue 284, Issue 1215

Motivation

Retention is an integral part of any data system. It is important to enable the application to remove their data for at least three reasons:

  1. To reclaim storage space
  2. To comply with legislation
  3. To comply with user requests

We envision in Pravega two broad ways to implement retention:

  1. Manual: The client API provides a call to truncate the data of a stream at a given point. This point is given by an abstraction that we call a stream cut. A stream cut instance is an opaque object that internally contains a mapping of segments to offsets.
  2. Automated: A stream can be configured to be truncated automatically based on either time or size.

This document proposes a set of changes to the components of Pravega to implement retention, both manual and automated.

Client API changes

There are two ways to implement retention:

  • By manually requesting Pravega to trim a stream using a stream cut.
  • By configuring the stream to automatically truncate the stream data according to either size or age.

Manually trimming a stream

We need to new API calls:

  • CompletableFuture<StreamCut> getStreamCut()
  • CompletableFuture<Void> trimStream(String stream, StreamCut cut)

The call to get a stream cut obtains a stream cut object that the application can use to trim it. An application can invoke such method and obtain a cut periodically. Using these cuts, the application can, for example, create a mapping of its own notion of time to cuts, and trim accordingly.

We can have such a call available to both readers and writers, but it belongs naturally on the reader side as cut can represent some point up to which all has been read. A reader group can use a state synchronizer to coordinate the construction of a stream cut that is consistent with the progress of all readers. If we decide to provide such a call on the writer side as well, then we will need to either:

  • Coordinate writers and have some notion of writer group
  • Have the controller putting such a stream cut object together

The trimStream call truncates the stream data using the cut as a reference for the offsets in the stream to cut from. Note that it is responsibility of the application to obtain and store stream cuts.

For the automated implementation of retention, there is an interface currently that enables the configuration of a stream to implement retention by size (total number of bytes) or age (time):

io.pravega.client.stream.RetentionPolicy;

We do not propose to change that interface here.

Controller changes

The controller is the main component implementing the truncation of streams, either manual or automated. There are two mechanisms that the controller needs to implement:

  1. Given a stream cut, coordinate the truncation of the stream;
  2. For automated retention, the controller is responsible for generating and persisting stream cuts to be used for truncation.

Truncation of a stream given a cut consists of selecting the appropriate segments and either eliminating them completely (completely within the cut) or truncating them in the case they are part of the cut. Once started, the operation needs to complete eventually and calls to truncate must be idempotent.

For age-based truncation, we propose that the controller implements a timer wheel or similar data structure and set a timer for a stream according to the stream configuration. For example, if a stream s sets its retention to be 2 days, then every two days, the controller truncates s according to the previous cut and generates a new cut using the offsets of the current segments.

For size-based truncation, the controller periodically produces a cut and obtains the total size of the stream. Once it overflows, the controller performs a truncation and it uses the closest cut above the threshold, the lowest upper bound. Note that this is an approximation rather an exact implementation of size-based truncation.

REST API changes

We need a REST API call for trimStream.

Segment store changes

To be able to truncate streams as described in the previous section, the controller must be able to request the segment store to delete segments and truncate segments. The controller must also be able to obtain the length in bytes of segments to compute the overall size of stream.

The segment store already provides a call to delete entire segments. In the case we also opt to truncate open segments rather than only deleting entire, sealed segments, then we need an additional call to truncate at a given offset (handled in Issue 1215

The SegmentProperties interface (returned by StreamSegmentStore.getStreamSegmentInfo) contains the length of the segment and whether it is sealed or not. A call to get the segment info seems sufficient for the needs of this proposal.

Some additional APIs that may be needed on the Segment Store in order to make the operations smoother:

  • Bulk GetStreamSegmentInfo: able to get info for multiple segments with one call, which should reduce the need for extra over-the-network round trips. This will aid in the implementation of getStreamCut
    • CompletableFuture<Map<String, SegmentProperties>> getStreamSegmentInfo(Collection<String> streamSegmentNames, Duration timeout);
  • Bulk DeleteStreamSegment: in order to delete multiple segments without having to do extra network round trips.
    • CompletableFuture<Void> deleteStreamSegment(Collection<String> streamSegmentNames, Duration timeout);

Tier2 Data truncation.

No current Tier2 Storage implementation currently supports truncation. The main reason for that is because no backing implementation natively supports truncating from the beginning of a file. FileSystem and HDFS support truncation, but that's just to reduce the length of the file, and the reduction is from the end. The only solution to truncate a file from the beginning in Tier2 would involve creating a new file and copying over all remaining data to it, via the Segment Store client - this is not a scalable solution especially as segments grow large.

There are two major approaches:

Controller Managed

Controller rolls over segments: it will seal existing active segments and begin new ones (1:1 split) when the segments reach a certain size or some time elapsed.

  • Pros: already a mechanism for this in the Controller, with most steps already implemented
  • Cons:
    • How to handle open transactions? How do we roll them over. We can't simply rename them, since that means they'd be owned by a different Container and it's possible there's unflushed data in Tier2 which could lead to data loss.
    • Some special (internal) Streams need exactly one segment. They cannot function with more than one.

Segment Store Managed

The easiest solution to conceptualize would be to have each Tier2 implementation do it in its own way. In this way, should their backing storages actually support this natively, it would be done very quickly. However, there is no guarantee of that, or that it will be done according to our specs, or that there won't be some other system-set limit (such as maximum number of times a file can be truncated). In addition, it would have to be implemented in each existing Tier2 storage and would greatly increase the amount of effort required to implement a new Tier2 Storage.

We therefore propose the following approach:

  • (Context) All Tier2 implementations meet a certain, generic contract (they implement the Storage interface). Their goal is to translate Pravega SegmentStore verbs to commands that can be issued on their respective backing stored, and viceversa - they should translate incoming messages and errors into something Pravega can digest.
  • We create a new Storage implementation, named RollingStorage.
    • Wraps around an existing Tier2 implementation (Storage instance),
    • Takes in a rolling policy (roll each segment based on size). Alternatively, we can do per-segment policy, since some system-level segments may have other types of retention needs, with different granularity levels.
    • It is responsible with breaking a Pravega Segment into multiple SubSegments based on the policy and abstracting out all this operations. To the upper layers, this exposes Segments. For the lower layers, we use sub-segments as our target.
  • File layout. A Segment is made of the following
    • Header file: an append-only, human readable, delimited text file, containing sorted pairs of Offsets and Files. Each offset points to a Data File which contains data that begins at that offset in the Segment
    • Data Files: contain actual data.
    • Example: Segment "abc", with rolling policy of 100, of size 249, would likely have:
      • abc$header (contents: 0:abc$offset_0;100:abc$offset_100;200:abc$offset_200;)
      • abc$offset_0 (contains offsets 0-99, and is currently sealed)
      • abc$offset_100 (contains offsets 100-199, and is currently sealed)
      • abc$offset_200 (active file contains offsets 200-249, and is non-sealed)
  • Operation Implementation
    • Create: Attempt to create header file. If failed, re-throw, otherwise delete any existing data files. We use the header file create atomicity; if we didn't have a header file wed would make this a multi-step process and it would be hard to guarantee correctness during concurrent calls.
    • Delete: delete existing data files, then delete header file
    • GetInfo: read header file; compute total length based on last entry; compute sealed status based on header file sealed status.
    • OpenWrite/OpenRead: fetch contents of header file and cache it. For OpenWrite, also open-fence the active file. The underlying Tier2 implementation will do the fencing work for us
    • Write: write to active file. Once we exceed max file size, seal it, create a new one, and update header file.
    • Concat: append contents of source header file to target header file, while adjusting offsets, then delete source file. No need to rename source files. No need for expensive copying of data. This is a universal concat.
      • We may choose to actually concat the source file into our active file if it's small enough, to prevent having too many files
    • Read: use cached info in ReadHandle to locate data files.If sought offset is greater than the one in ReadHandle, refresh once, in case the segment changed from underneath us.
    • Seal: seal header file and active file
    • Truncate: delete data files that are prior to the truncation offset
      • Optionally, we may decide to rewrite the entire header file, to clean it up (it's possible that for certain segments it will get pretty long)
  • This solution will work with any Tier2 implementation, will not add any sort of overhead on the hot path, and may even make some back-end (StorageWriter) operations faster (such as concat).
  • The downside is that we add yet another layer of complexity to our already complex code.

Open questions

  1. Are we only deleting sealed segments or do we plan to truncate open segments as well?
    • Due to implementation details, there is no longer a constraint that we can only truncate sealed segments. From a design POV, the Segment Store can truncate non-sealed segments as well.
  2. At the Segment Store level, what guarantees can we provide with respect to the data being completely removed from the system? For example, if a user requests its information from the system, then the application must be able to comply and remove such data permanently.
    • The Segment Store, once it processes the Truncate request for a segment, will disallow access to any part of the segment before that offset. However, the data may still linger around in Tier 1 for a while until the BookKeeper Log is properly truncated. As for Tier2, please refer to the Tier2 Truncation section above.

Discarded approaches

Index in the segment store

We have considered having a index that maps time to offsets of segments. We have discarded this option to avoid introducing the notion of time to the segment store. Guaranteeing even some minimum degree of time accuracy is difficult and consequently it is preferable to avoid this complexity. It is also preferable to let the application introduce its own notion of time, which can be done with the concept of stream cuts.

⚠️ **GitHub.com Fallback** ⚠️