Relays - grantr/shortbus GitHub Wiki

Relays receive transactions from producers and transmit them to consumers. The path from producer to consumer never touches the disk to achieve low latency. High availability is achieved by quorum writes, and timeline consistency is handled by an anti-entropy mechanism.

Communication

Relays communicate over a fully connected topology of ZeroMQ pubsub sockets. Currently the list of relays is configured statically, but dynamic configuration ala Cassandra and Riak may be added in the future.

Subscriptions

Relays use subscriptions to partition the stream namespace. These subscriptions are also configured statically, but dynamic balancing and partitioning could be added.

For example, in a 3-node relay network, each relay would subscribe to all streams. This would allow any single node to go down without disrupting the system. Make that a 5-node network and 2 nodes can go down without disruption. Partitioning can be used to increase throughput by adding nodes which only subscribe to a portion of the stream namespace. To ensure fault tolerance each stream should be subscribed to by at least 3 nodes.

Channels

Nodes subscribing to a stream listen to 3 channels related to that stream:

  • The incoming transaction channel. New transactions are published on this channel.
  • The write channel. Successful writes (to an in-memory buffer, not to disk) are reported on this channel.
  • The anti-entropy channel. This channel is used to resolve inconsistencies in streams that might result from network issues or a lost message.

Quorums

Relays use quorums to ensure that stream timelines remain consistent in the face of failures. Quorums are used in two places: When a transaction is received from a producer, and when transactions are sent to consumers.

When a transaction is received

The receiving node (the coordinator) assigns the transaction a sequence number. This sequence number is time-based with a suffix, like the ids generated by flake. Sequence numbers will be k-sorted as long as clocks are synchronized (preferably by NTP).

After assigning the sequence number, the coordinator publishes a message to the stream's incoming message pubsub channel. The message includes the transaction and the sequence number assigned to the message by the coordinator.

Nodes subscribing to that stream's messages (possibly including the coordinator) receive the message and add it to an in-memory buffer. They then send a message to the stream's write channel indicating that the message was stored.

The coordinator waits for a quorum of write responses before returning success to the client. If there aren't enough write responses within the time limit, an error is returned. This doesn't mean that the message was not received and will not be delivered to clients, it just means that the coordinator cannot guarantee that it will be delivered. The producer can retry if desired.

When a transaction is delivered

Nodes receiving messages from coordinators store them in a circular buffer, but do not deliver them to clients until receiving a quorum of responses (including its own) on the write channel. These responses are the same as those used by the coordinator to respond with success or failure to the producer, so assuming there are no failures, each message is ready to be delivered to clients at the same time that the producer receives a response.

Anti-entropy

Relays cannot guarantee that every message will be received. Network issues and dropped messages at either the sender or the receiver could cause timeline inconsistency in streams.

To avoid this problem, relays generate a rolling digest of each stream such that the digest of a particular transaction includes the digests of all previous transactions. This is similar to the way git ensures repository integrity.

The write responses that relays send to coordinators include the sequence number of the transaction written and the digest calculated for that transaction. The digest, being a hash of the content of the current transaction and the previous digest, is effectively a digest of the entire stream. (TODO: how do relays without previous transactions calculate the digest? They need to do read repair or get the most recent transaction from persistent delta storage)

TODO break out anti-entropy process by phase If a coordinator receives multiple values for the digest, it must wait until a quorum has been reached agreeing upon a single value. Nodes with incorrect values will perform read repair using the anti-entropy process. If there is no quorum agreement, the coordinator must respond with an error to the producer.

If a node receives a digest value that is different than the digest it calculated, it initiates the anti-entropy process. The node first determines which transaction was the most recent to have been correctly verified. Normally this will be the immediately previous transaction. The node sends a message to the anti-entropy channel requesting all messages in the stream starting with the last-verified transaction up to the most recently received transaction.

While anti-entropy is in progress, further transactions received are queued awaiting verification, effectively pausing all clients reading from that stream. Received transactions are still written and digested during read repair to avoid pausing producers as well, although performance will be impacted as the digests from the repairing node will not count towards a quorum.

Other nodes subscribed to the stream publish all transactions they have within the two sequences (including digests) to the anti-entropy stream. The receiving node rebuilds the stream from the transactions received. When it has caught up to the most recently received transaction and its digests agree, the anti-entropy process is complete. The node releases verified transactions to clients and returns to its normal state.

(TODO anti-entropy needs more explanation. What if there is no quorum for the digest? What if the stream needs to be composed from multiple nodes? Do all nodes start the anti-entropy process or only the one disagreeing with the quorum?)

Fencing

Nodes send heartbeats to other nodes on a pubsub channel. If a node determines that it has lost its connection to a plurality of nodes such that consistency cannot be guaranteed, it fences itself until heartbeats have recovered. Consumers are disconnected and producers receive errors when attempting to publish. Both clients should attempt to connect to other nodes.

This ensures that when network partitions occur, only one cluster remains online.

Consumers

Consumers can attempt to consume from any node, but only nodes subscribing to the requested stream can serve the request. If a consumer connect to a node not currently subscribed to the requested stream, that node will determine which node is subscribed and redirect the client to that node.

Producers

Producers can publish to any node.

TODO k-sorted sequence number generation