PDP 39 (Key Value Tables Beta 1) - derekm/pravega GitHub Wiki

Status: COMPLETE & MERGED. Discussion and comments in Issue 4562.

Important: This refers to Beta 1 of Key-Value Tables. For Beta 2, please visit PDP-48.

Table of Contents:

  1. Motivation
  2. Requirements
  3. Design
  4. Server-Side Changes
  5. Client API

Motivation

Streams are currently the only primitive that Pravega exposes. The unit of data in a Stream is an Event, which is ingested via a Writer and consumed by means of a Reader. Due to the append-only nature of the data being ingested, Pravega provides excellent write performance. It also provides good read performance when Events are accessed (read) in sequence, however locating a single Event is not so easily done using the current API (we either have to save its Event Position in an external database or scan through the entire Stream until we find it).

Real-world analytical pipelines are seldom made up of just Stream Processing. Many applications require storing and retrieving state, and some applications even need to store results in some sort of database (whether structured or not). Such applications require complex deployments involving multiple different types of storage systems, each having its own deployment and runtime requirements. This can easily lead to having significant operational burdens.

We therefore propose the implementation of Key-Value Tables (KVT): a new primitive that organizes data as indexed Key-Value Pairs (KVPs) using nothing else but existing features already implemented in Pravega at this time. A KVT is a distributed Key-Value Store that associates arbitrary Keys to Values and enables common operations on them, such as insertion, retrieval and removal. By providing Key-Value Tables as another primitive in Pravega alongside Streams, we aim to simplify the operational burden of analytical pipelines by not having to maintain disparate storage systems - both KVTs and Streams share the same underlying infrastructure powered by Pravega Segments.

Requirements

KVTs should be able to support the following:

  • Define a number of partitions.
    • The KVT should then uniformly distribute the KVPs among these partitions.
  • Optionally group Keys in Key Families.
    • All KVPs within the same Key Family will be assigned to the same Partition.
    • KVPs are not required to be assigned to a Key Family.
  • Operations for all KVPs (that may or may not belong to a Key Family):
    • Insert, update or remove one KVP at a time.
    • Conditionally insert, update or remove one KVP at a time.
    • Retrieve one or more KVPs at a time.
      • The lookup will be by Key.
  • Operations only for KVPs that belong to the same Key Family:
    • Atomically insert, update or remove one or more KVPs at a time.
    • Atomically, conditionally insert, update or remove one or more KVPs at a time.
      • If more than one item, all items with conditions must be satisfied in order for the batch to be accepted.
    • Iterate through all Keys or KVPs within a Key Family.
  • Conditional Operations
    • The Condition can be based on Versions. Each Key will have a version that is changed every time its associated value is changed.
    • The Condition can be obtained either as a result of an insert/update operation or when a Key is retrieved.

Out of scope:

  • Auto-scaling.
    • For this PDP, we do not aim to implement any sort of scale-up/scale-down of KVTs.
    • Support for scaling may be implemented in the future and may include input such as update frequency or partition load (number of KVPs per partition).
  • Efficient KVP Iterators.
    • Iterators will make use of Table Segment iterators, which do not return KVPs in any particular order nor do they support range iterators at this time.
    • The only way iterators can be made now is by iterating through all KVPs in a Table Segment and filtering out Keys that do not match the filter.
      • A subsequent implementation of a "Sorted Table Segment" (out of scope for this PDP) would enable range iterators and make this operation significantly more efficient.

Design

Key-Value Tables will build on the existing Table Segments (TS). As an analogy, KVTs are to Table Segments what Streams are to Stream Segments.

A quick recap of what Table Segments are:

  • Non-distributed Key-Value Store backed by a single Pravega Segment
  • Makes use of low-level Segment APIs (not exposed externally) to provide ACID characteristics.
  • APIs
    • put: inserts or updates one or more KVPs. An update will either be accepted or rejected in its entirety (no partial updates). Conditional updates are also supported.
    • remove removes one or more KVPs, based on Key. A removal will either be accepted or rejected in its entirety (no partial removals). Conditional removals are also supported.
    • get retrieves one more more KVPs, based on Key.
    • iterator (over Keys or KVPs). Iterates over all Keys or all KVPs.

Key-Value Tables can take advantage of the same techniques employed today by Pravega to shard KVPs across the KVTs TSs.

Here is the proposal on how they may be implemented:

Control Path (Create, Delete)

  • We create a KeyValueTableManager in the Client (similar to the StreamManager) that can be employed by a user to manage KVTs.
  • The KeyValueTableManager will invoke Controller APIs (via GRPC), and the Controller will update its internal metadata to record the Creation or Deletion of a KVT.
  • The Controller will also instruct the Segment Store(s) to create/delete the Table Segment(s) that are part of the KVT.
  • We extend the Controller interface in the Client with the following APIs. These are copycats of some of the ones already used for Streams:
    • CompletableFuture<Boolean> createKeyValueTable(final String scope, final String kvtName, final KeyValueTableConfiguration kvtConfig)
    • AsyncIterator<Stream> listKeyValueTables(final String scopeName)
    • CompletableFuture<Boolean> updateKeyValueTable(final String scope, final String kvtName, final KeyValueTableConfiguration kvtConfig)
    • CompletableFuture<Boolean> deleteKeyValueTable(final String scope, final String kvtName)
    • CompletableFuture<KeyValueTableSegments> getCurrentSegmentsForKeyValueTable(final String scope, final String kvtName)

Data Path

User-visible API:

  • We create a KeyValueTable in the Client. This will expose the data-path APIs that are exposed to the user. See Data Plane API below.
  • The KeyValueTable will keep instances of the TableSegment interface (one per owned/active TableSegment) to which it will delegate the requests.
  • The TableSegment interface currently under implementation Issue 4333. It will be built based on the existing code in SegmentHelper.java class in the Controller. This PDP will not change the SegmentHelper.java to use this new API.

KVT Partitions

  • A KVT Partition is made up of exactly one Table Segment.
    • That is, if a KVT is made of n partitions, then the KVT will have n Table Segments, with each such Table Segment being mapped to a single Partition.
  • A KVT Partition represents a contiguous key hash space . No two KVT Partitions may overlap in their hashing spaces.
  • A KVP is hashed based on some input to a continuous, uniform key hash space and mapped to a KVT Partition
    • We will use a uniform hashing function, similar to how we hash Stream Event Routing Keys to Stream Segments.

Mapping KVPs to Partitions/Table Segments

  • Addressing a KVP can be done in one of two ways: by KeyFamily & Key or by Key alone.
    • As defined above, all KVPs belonging to the same Key Family must be in the same Partition, but this constraint does not apply to KVPs that do not belong to Key Families.
  • We use the following addressing scheme
    • If we have a KeyFamily provided, we use that KeyFamily as a hashing key to locate the KVT Partition/TableSegment and use both KeyFamily and Key as a Table Key into that Partition. Else, we use the Key itself as a KeyFamily, which ensures a uniform distribution. Some care has to be taken for partitions that are prefixes of each other, which is why the actual scheme is slightly more complex. Let's see this explicitly, below:
    • For KeyFamily & Key
      1. We convert the KeyFamily (String) to its UTF8 byte representation and prefix it with its 2-byte length. Call this KF.
      2. We hash KF and use it to map the KVP to a KVT Partition P.
      3. We concatenate KF and the serialization of Key into TK and use that as a Table Key into P for whatever action we are executing.
    • For Key alone
      1. We serialize Key and prefix it with its 2-byte length (which is negated), Call this KF.
      2. We hash KF and use it to map the KVP to a KVT Partition P.
      3. We use KF as a Table Key into P for whatever action we are executing.
  • Constraints
    • The total length of KeyFamily and Key must not exceed 8190. This is because a Table Segment Key can be at most 8KB (8192 bytes), and we reserve 2 bytes for the KeyFamily length.

API Implementation

  • Single-key/Single-entry APIs (put, get, remove)
    • These are delegated directly to the Table Segment that is mapped to the Key's/Key Family's partition (see addressing scheme above)
  • Multi-key/Multi-entry update APIs (batch put, batch remove)
    • These are only supported for keys within the same Key Family. The Client API below enables this constraint.
    • These are delegated directly to the Table Segment that is mapped to the Key Family partition. The Table Segment supports these operations natively.
  • Multi-key retrieval APIs
    • The Keys will be grouped by Partition.
    • For each group, a single get request will be issued to the Table Segment mapped to their partition. The Table Segment natively supports multi-get.
    • Results are aggregated client-side and returned to the user.
  • Key/Entry Iterators
    • These are only supported across a Key Family.
    • The Iterator call is delegated to the Table Segment associated with the Partition to which the Key Family hashes.
    • The Table Segment supports key/entry iterators natively, but only for the all the keys/entries in the Table Segment.
    • We will need to implement a filter for this operation. It is TBD whether to add this filter down into the Table Segment (which would require an API addition and Wire Protocol change) or filter in the client.
      • It would be preferable to do this in the Table Segment itself since we can implement any further server-side optimizations there without having to do much work in the client.

Server-side Changes

Segment Store

No changes are anticipated as part of this PDP. All existing Table Segment APIs should be sufficient to support this PDP's requirements.

Controller

  • Support for Key-Value Tables (as a sibling to Streams).
    • KVTs are made of Table Segments
    • Be able to Create, Delete and Seal a KVT.
    • Be able to locate a Table Segment (which Segment Store hosts it) that is part of a KVT.
  • Steps:
    1. API for KVT - create/delete with configuration type.
    2. API to get location of Table Segment.
    3. Define metadata for KVT and abstract their access and storage in MetadataStore interface.
    4. Create-Table workflow that will orchestrate creating different metadata and Table Segments
    5. Delete-Table workflow that will orchestrate deleting different metadata and Table Segments.
  • The distribution and mapping to segment container require no change
  • Support the Control Path APIs listed above
    • The client-side can only perform basic validation (i.e., Scope or KVT name doesn't contain illegal characters).
    • The Controller will need to perform server-side validation, such as:
      • Attempts to create KVTs that already exist
      • Attempts to delete or update inexistent KVTs
      • Attempts to update a KVT config with a different partition count. In the future we may allow other configuration items so we want to keep this API; but since we only allow a fixed partition count, we should disallow changing the partition count after the KVT is created.

Client API

Control plane

i.p.client.admin.KeyValueTableManager

  • Similar to its sibling StreamManager, it allows creating, deleting and managing KeyValueTables

Data Plane

i.p.client.tables.KeyVersion

  • Encodes versioning information about a TableKey. This can be used for conditional updates.

i.p.client.tables.TableKey<KeyType>

  • Defines a Key in a KVT.
  • Has an optional KeyVersion:
    • For retrievals, the user can find the latest version of a Key.
    • For updates/removals: if set, this will trigger a conditional update (otherwise it will be unconditional).
  • KeyType is a user-defined type; user must supply a Serializer<KeyType> to KeyValueTableFactory to convert this to bytes.

i.p.client.tables.TableEntry<KeyType, ValueType>

  • Defines a Table Entry in a KVT.
  • Contains a TableKey and a Value.
  • KeyType and ValueType are user-defined types; user must supply Serializer<> for each to KeyValueTableFactory to convert these to bytes.

i.p.client.tables.KeyValueTable

  • API for a single KeyValueTable
  • Has generic types for Key and Value (we'll require a Serializer<> for each)
  • APIs take either an optional or mandatory keyFamily. If optional, and not provided, this will indicate the sought key does not belong to a key family.
  • Inserts/Updates:
    • CompletableFuture<KeyVersion> put(@Nullable String keyFamily, @NonNull KeyT key, @NonNull ValueT value)
      • Unconditional, single-key insert/update.
    • CompletableFuture<KeyVersion> putIfAbsent(@Nullable String keyFamily, @NonNull KeyT key, @NonNull ValueT value)
      • Conditional, single-key insert.
    • CompletableFuture<List<KeyVersion>> putAll(@NonNull String keyFamily, @NonNull Iterable<Map.Entry<KeyT, ValueT>> entries)
      • Unconditional, multi-key insert/update.
    • CompletableFuture<KeyVersion> replace(@Nullable String keyFamily, @NonNull KeyT key, @NonNull ValueT value, @NonNull KeyVersion version);
      • Conditional, single-key update.
    • CompletableFuture<List<KeyVersion>> replaceAll(@NonNull String keyFamily, @NonNull Iterable<TableEntry<KeyT, ValueT>> entries)
      • Batch update. This allows multiple types of updates with mixed conditions; each is encoded in the TableEntry instances passed in.
  • Removals:
    • CompletableFuture<Void> remove(@Nullable String keyFamily, @NonNull KeyT key)
      • Unconditional, single-key removal.
    • CompletableFuture<Void> remove(@Nullable String keyFamily, @NonNull KeyT key, @NonNull KeyVersion version)
      • Conditional, single-key removal.
    • CompletableFuture<Void> removeAll(@Nullable String keyFamily, @NonNull Iterable<TableKey<KeyT>> keys)
      • Batch removal. This allows multiple types of removals with mixed conditions; each is encoded in the TableKey instances passed in.
  • Retrieval:
    • CompletableFuture<TableEntry<KeyT, ValueT>> get(@Nullable String keyFamily, @NonNull KeyT key)
      • Single-key retrieval.
    • CompletableFuture<List<TableEntry<KeyT, ValueT>>> getAll(@Nullable String keyFamily, @NonNull Iterable<KeyT> keys)
      • Multi-key retrieval.
    • AsyncIterator<IteratorItem<TableKey<KeyT>>> keyIterator(@NonNull String keyFamily, int maxKeysAtOnce, @Nullable IteratorState state)
      • Key Iterator within a Key Family.
    • AsyncIterator<IteratorItem<TableEntry<KeyT, ValueT>>> entryIterator(@NonNull String keyFamily, int maxEntriesAtOnce, @Nullable IteratorState state)
      • Entry Iterator within a Key Family.

i.p.client.tables.impl.*

  • Internal implementation for KeyValueTable; not public.
  • TableSegmentImpl: API to manage a single Table Segment.
    • This somewhat mirrors what's in the Controller's SegmentHelper, but it's tailored for a more generic purpose.

i.p.client.KeyValueTableFactory

  • This will be creating instances of KeyValueTable which can be used (similar to EventStreamClientFactory).
⚠️ **GitHub.com Fallback** ⚠️