Designing Data Intensive Applications: Distributed Data - gouthamv03/notes GitHub Wiki

Topics

Scaling to Higher Load

Replication

Partitioning

Scaling to Higher Load

Reasons to have data distributed on multiple machines:

  1. Scalability: Increase in load can be handled by spinning more machines up
  2. Fault tolerance: Failure in one or more machines, doesn't bring down the system
  3. Latency: Geographically distributed data centers provide faster access in different regions

How to scale?

Vertical scaling

Add more CPU, memory, disk to build a single large machine. Single OS manages all these.

Shared memory architecture:

All components treated as a single machine. Drawbacks:

  1. Cost doesn't scale linearly
  2. Bottlenecks makes it difficult to be twice as fast even with twice the specs
  3. Can hot-swap some components without shutting down the system. But it's not geographically distributed

Shared disk architecture: Disk is distributed and connected over fast network. Contention and lock-overhead limit the scalability

Horizontal scaling

Shared nothing architecture:

Several nodes exist, each with their own system configurations and connected over the network. Coordination between these systems happens on a software level. They can be spawned independently to provide scale and can be geo-distributed. Drawbacks:

  1. Need to be careful with application design. DB can't hide some constraints of such systems

Replication Versus Partitioning

  • Replication: Make a copy of the data in different nodes
  • Partitioning: Split the data into several partitions and assign to different nodes called shards

Eg. System that has data split into partitions, with each partition replicated one or more times.

Replication

Replication means copying over the entire dataset to each node. The main challenge is in handling changes to the replicated data or keeping nodes in sync. Three popular algorithms for replicating changes between nodes:

  • Single-leader
  • Multi-leader
  • Leaderless replication

Leaders and followers

Each node that stores a copy of the database is called a replica. With multiple replicas, we need to update each replica on a change. Most common solution is Leader-based replication (AKA leader/follower or active/passive or master/slave).

  1. Leader is chosen and handles all write requests. During a write, leader updates its local storage and then streams the change log to the Follower nodes.
  2. Follower nodes process the writes in the same order they are received.
  3. Client can send read requests to Leader or Followers. Used by Postgres, Mysql, Mongo and also message-brokers Kafka, RabbitMq.

Synchronous vs Asynchronous replication

  • Synchronous: Leader waits for Follower to process the change before returning OK to the client. Advantage is that all nodes have the newest copy. Disadvantage is the delay and possible halt if followers don't respond.
  • Asynchronous: Leader does not wait for Followers to process the change before returning OK to the client. Advantage is the fast response time. Disadvantage is that data might be completely lost if Leader fails or Followers are not able to update. This is still widely used.
  • Semi-Synchronous: Leader waits for one follower to respond. It is impractical for all followers to be synchronous: any one node outage would cause the whole system to grind to a halt. In practice, if you enable synchronous replication on a database, it usually means that one of the followers is synchronous, and the others are asynchronous. If the synchronous follower becomes unavailable or slow, one of the asynchronous followers is made synchronous. This guarantees that you have an up-to-date copy of the data on at least two nodes: the leader and one synchronous follower.

Setting up new followers

When a follower node is added to handle additional load, it needs to replicate data on the Leader. We can't lock the database to further updates while this copy is happening. So this is done by copying a snapshot of the leader's database at a certain time. Then requesting the Leader for any updates since the snapshot-time. Once the follower has caught-up, it can process updates normally.

Handling Node Outages

  • Follower node goes down: Catch-up recovery: Request leader for any updates since the last time data was received. Catch-up to the current writes and process normally
  • Leader node goes down: Failover: Promote a Follower to be a Leader. Client writes should be pointed to the new leader. Followers start processing data from the new Leader.

An automatic failover process usually consists of the following steps:

  1. Determining that the leader has failed: Several things can go wrong. Monitor for those and assume no response in some time interval implies that the Leader is dead
  2. Choosing a new leader: Can be done by election between Follower nodes or by a separate Controller node. Best candidate is the one with latest writes.
  3. Reconfigure system to use new leader: Clients should send data to the new leader. Old leader should not come back up and behave like a leader. It should recognize the new leader and become a Follower node.

Possible problems include:

  1. If Follower does updates asynchronously, data from Leader may not have been updated. In the meantime, new leader will have accepted writes that could conflict with old leader. In this case, old leader's data is usually just dropped.
  2. When data is shared with other dependent systems via primary keys, for eg, inconsistencies can happen. Previous leader in this DB writes some data and another DB keeps track using primary key. Previous leader dies and its data is thrown. New leader uses same primary keys which may point to inconsistent data in the other DB.
  3. Two nodes start acting as leaders causing data corruption. Called split-brain. Need to convert one leader to follower safely.
  4. Wait time before declaring a leader dead should be balanced. Short timeout can cause unnecessary timeouts during heavy system or network load.

Implementation of Replication Logs

Statement-based replication : Send the write statement to the followers. For a relational database, this means that every INSERT, UPDATE, or DELETE statement is forwarded to followers who process the statements. There are problems with statements like RAND or NOW that may cause different execution outputs on each Follower. Statements that use an conditional like WHERE should be executed exactly in sequence. This is a problem when there are multiple concurrently executing transactions. Other replication techniques are preferred.

Write-Ahead Log (WAL) shipping : Log file is used as an append-only data store for different databases. This can be streamed to followers, who can process and save data in the same way as the leader. The disadvantage is that WAL is low-level and describes which disk blocks changed. When database storage format changes between DB versions, followers and leaders on different versions won't interpret the WAL correctly. It would result in downtimes during upgrade

Logical (row-based) log replication : An alternative is to use different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals. This logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row. Since it is decoupled upgrades can happen without downtime and this data can even be shipped to external systems.

Trigger-based replication : A trigger lets you register custom application code that is automatically executed when a data change happens. That external process can then apply any necessary application logic and replicate the data change to another system. It has a higher cost, so may be only for specific use-cases.

Replication Lag

When leader handles writes and replicates data on followers, we can redirect read requests to the followers. In case of asynchronous replication, there is a lag between a leader receiving a write and a follower getting the update. So older data may be returned by the follower. This inconsistency is just a temporary state, the followers will eventually catch up and become consistent with the leader. Hence, the term eventual consistency.

Reading Your Own Writes

Sometimes a user wants to update a piece of info and see it updated on his page. This is called Read-after-write consistency. With a scheme like eventual consistency, writes are to a leader may not have propagated to followers. So reads may not return the latest change. To avoid this:

  1. Application can read from the leader for the user who is writing data
  2. Application can track when the last update happens and always read from leader for the next minute
  3. Application can pass a timestamp of its last write. Reads are to be done only from followers who have an updated timestamp of this time. The timestamp could be a logical timestamp (something that indicates ordering of writes, such as the log sequence number) or the actual system clock.
  4. If datacenters are spread out, data needs to get routed to the leader. If Read-after-write needs to be maintained across devices, we can't use a timestamp read from the client because the clients are different

Monotonic Reads

If a user reads from replicas with different lags, we may see information moving back in time. Comments disappear, pages change etc. To avoid this, a schema called Monotonic reads is used. This doesn't guarantee strong consistency, but atleast guarantees that data moves forward in time. One way of achieving monotonic reads is to make sure that each user always makes their reads from the same replica (different users can read from different replicas).

Consistent Prefix Reads

Another anomaly could be where lag causes causality to not be met. Eg. Answer showing up before a Question. This requires a guarantee of consistent prefix reads. This guarantee says that if a sequence of writes happens in a certain order, then anyone reading those writes will see them appear in the same order. This is a particular problem in partitioned (sharded) databases. Different partitions operate independently, so there is no global ordering of writes. One solution is to make sure that writes that are related to each other are written to the same partition.

Solutions for Replication Lag Transactions exist: they are a way for a database to provide stronger guarantees so that the application can be simpler. These have been there on single-node systems for a long time, but have been abandoned in favor of performance and availability.

Multi-Leader Replication

When a single leader manages all writes, that becomes a performance bottle-neck and single point of failure. Alternatively, multi-leader systems can be used. All leaders act as followers to each other. Generally considered risky. Use case is mainly for geo-distributed DB with nodes in multiple datacenters. We can configure 1 leader in each datacenter and avoid issues like network latency for writes. An extension of this can be a client device that has its own offline database but syncs data to the cloud whenever it comes online. Eg.CouchDB. Applications for parallel editing like Google docs also face a similar problem. This collaboration model is equivalent to single-leader replication with transactions on the leader.

Handling Write Conflicts

Managing conflicts is the biggest problem with multi-leader replication systems.

Synchronous vs Asynchronous

In a single-leader database, the second writer will either block and wait for the first write to complete, or abort the second write transaction, forcing the user to retry the write. On the other hand, in a multi-leader setup, both writes are successful, and the conflict is only detected asynchronously at some later point in time. At that time, it may be too late to ask the user to resolve the conflict. In principle, this can be made synchronous. If you want synchronous conflict detection, you might as well just use single-leader replication.

Conflict avoidance

The simplest strategy for dealing with conflicts is to avoid them: if the application can ensure that all writes for a particular record go through the same leader, then conflicts cannot occur. Since many implementations of multi-leader replication handle conflicts quite poorly, avoiding conflicts is a frequently recommended approach.

Converging toward a consistent state

There are various ways of achieving convergent conflict resolution:

  1. Give each write a unique id or timestamp. The one with latest timestamp or greatest UUID wins.
  2. Merge values somehow
  3. Store both values and let app/human resolve it (Can be done during write to db or read from db. In both cases, some handler is invoked to resolve)

Automatic Conflict Resolution

Conflict-free replicated datatypes (CRDT): family of data structures for sets, maps, ordered lists, counters, etc. that can be concurrently edited by multiple users, and which automatically resolve conflicts in sensible ways. CRDTs use 2-way merges

Mergeable persistent data structures: track history explicitly, similarly to the Git version control system, and use a three-way merge function

Operational transformation: collaborative editing like Google docs. Designed particularly for concurrent editing of an ordered list of items, such as the list of characters that constitute a text document

Multi-Leader Replication Topologies

Circular, Star or All-to-All topology. In Circular topology, nodes are arranged in a ring and writes are passed from one node to the other. In Star topology, nodes are arranged in a tree structure and writes are passed from root to leaves. **Each node maintains a unique id that it appends to a write. If it sees it's own id in a write packet, it ignores it since it was already processed. The negative is that if one node fails, message doesn't get through to the ones that depend on it to pass the message. No such problem in All-to-All topology.

In All-to-All the problem of replication lag or the causality problem can happen if one leader propagates writes faster than another. Using Timestamp is not foolproof since Leaders may not be sufficiently in sync. To order these events correctly, a technique called version vectors can be used.

Leaderless Replication

In Single and Multi leader approaches, we send writes to one or more leaders who eventually propagate the data. In Leaderless replication, writes are pushed to all nodes by the client or a co-ordinator node. Eg. Amazon dynamo, Cassandra.

When a DB node is down, other nodes can accept writes. If a Quorum is reached, the write is considered successful. But if a client requests data from a node that was down, it will return old data. To avoid this, client read requests are also sent to several nodes in parallel.

When the node comes up again, it needs to catch-up with data from the other nodes. It can do this by:

  1. Read repair: When client sees mismatching data, it can write back the latest data to the node with old data
  2. Anti-entropy process: Separate thread that constantly looks for differences in the data between replicas and copies any missing data from one replica to another

Quorum

This defines how many nodes will respond OK for a write to be considered successful. Similarly, at least these many nodes need to be read to be sure that an up to date entry is seen. The quorum condition, w + r > n, allows the system to tolerate unavailable nodes. w is num of writes for quorum, r is num of reads for quorum, n is total num of nodes. If fewer than the required w or r nodes are available, writes or reads return an error.

With a smaller w and r you are more likely to read stale values, because it’s more likely that the read didn’t include the node with the latest value. On the upside, this configuration allows lower latency and higher availability.

If a write succeeded on some replicas but failed on others (for example because the disks on some nodes are full), and overall succeeded on fewer than w replicas, it is not rolled back on the replicas where it succeeded. This means that if a write was reported as failed, subsequent reads may or may not return the value from that write.

Monitoring staleness

Something like replication lag can be measured for Single or Multi-leader environments. However, measuring staleness in Leaderless replication is harder. Can be measured in terms of stale entries seen during Read repair.

Sloppy Quorums and Hinted Handoff Sometimes the number of nodes you need for a quorum may not be reachable or available. The whole write need not fail in this case. We can update the available nodes to reach the quorum count even if they are nodes on which the value usually resides. This is called sloppy quorum. Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate “home” nodes. This is called hinted handoff. In this way, it helps with write availability.

Leaderless replication is also suitable for multi-datacenter operation. In one form, data can be written to nodes across data centers and checked for quorum. In another form, quorum can be checked only for a local datacenter and separate sync with a remote datacenter managed like in multi-leader replication.

Detecting Concurrent Writes

Whenever you have two operations A and B, there are three possibilities: either A happened before B, or B happened before A, or A and B are concurrent. If multiple leaders accept writes, the same record can be updated in parallel. For defining concurrency, exact time doesn’t matter: we simply call two operations concurrent if they are both unaware of each other.

Some techniques for conflict resolution:

Last write wins (discarding concurrent writes)

We can attach a timestamp to each write, pick the biggest timestamp as the most recent, and discard any writes with an earlier timestamp. This conflict resolution algorithm, called last write wins (LWW). If losing data is not acceptable, LWW is a poor choice for conflict resolution.

The “happens-before” relationship and concurrency

If B is doing an increment on an a value being currently written by A, then there is causality. A should happen-before B. Algo to determine if operations are concurrent:

  1. As the DB, track each write operation on the DB with a version and return all the values currently held in DB along with version.
  2. As a client, merge the new values returned from DB on a write
  3. As a client, mention the version number you last received when writing some data
  4. As the DB, update versions to newer ones when merged values are returned to the client
  5. Merge values by Last version or Last timestamp or Union of all values.
  6. Process deletes with tombstones

Version Vectors

Similar to above flow but extended for multi-leader topologies.

Need to use a version number per replica as well as per key. Each replica increments its own version number when processing a write, and also keeps track of the version numbers it has seen from each of the other replicas. The collection of version numbers from all the replicas is called a version vector.

Partitioning

Partitioning is a way of intentionally breaking a large database down into smaller ones. The main reason for wanting to partition data is scalability. Different partitions can be placed on different nodes in a shared-nothing (horizontally-scaled) cluster.

Partitioning and Replication

Partitioning is usually combined with replication so that copies of each partition are stored on multiple nodes. This means that, even though each record belongs to exactly one partition, it may still be stored on several different nodes for fault tolerance.

A node may store more than one partition. If a leader–follower replication model is used, each partition’s leader is assigned to one node, and its followers are assigned to other nodes. Each node may be the leader for some partitions and a follower for other partitions. So a write to node1 will propagate to node2 and node3 which accept this data as followers. In turn data written to node2 will propagate to node1 and node3 which accept it as followers.

Partitioning of Key-Value Data

Partitioning requires that we spread data out to several nodes. If the partitioning is unfair, so that some partitions have more data or queries than others, we call it skewed. A partition with disproportionately high load is called a hot spot. Some ways of partitioning:

Random Assignment

This may lead to an even load, but we don't know which partition to read data from.

Partitioning by Key Range

Assign a continuous range of keys to each partition. The ranges of keys are not necessarily evenly spaced, because your data may not be evenly distributed. The partition boundaries might be chosen manually by an administrator, or the database can choose them automatically.

Within each partition, we can keep keys in sorted order making range scans and bulk reads easier. However, the downside of key range partitioning is that certain access patterns can lead to hot spots. Eg. Queries by date-range may keep today's partition extra busy while others are idle. Simple optimization is to make the Key not just a timestamp but also order by another field like name.

Partitioning by Hash of Key

Because of this risk of skew and hot spots, many distributed datastores use a hash function to determine the partition for a given key. A good hash function takes skewed data and makes it uniformly distributed. Eg. Cassandra and MongoDB use MD5. Once you have a suitable hash function for keys, you can assign each partition a range of hashes. The partition boundaries can be evenly spaced, or they can be chosen pseudorandomly (called consistent hashing, this avoids the need for central control or distributed consensus. Better name is hash partitioning).

The hashing technique has the drawback that it loses the ability to do efficient range-querying as data is spread out. So queries need to go out to multiple partitions. Cassandra achieves a compromise between the two partitioning strategies. It uses the primary key as a hash function and the secondary key for range queries.

This could still rarely cause a skewed workload. The application then needs to step in and manage the problem by adding a small offset in the key for eg.

Partitioning and Secondary Indexes

Secondary indexes are used to search for a certain property in entries. They don’t map neatly to partitions. There are two main approaches to partitioning a database with secondary indexes:

  • Document-based partitioning
  • Term-based partitioning

Partitioning Secondary Indexes by Document

In this approach, each partition is completely separate: each partition maintains its own secondary indexes, covering only the documents in that partition. So Partition1 has secondary indexes for documents in Partition1, Partition2 has secondary indexes for documents in Partition2. When we query, we need to read all partitions and combine the results. This approach to querying a partitioned database is sometimes known as scatter/gather. This is more expensive in reads and prone to latency amplification.

Partitioning Secondary Indexes by Term

Rather than each partition keeping secondary indexes for its own document, a global index can be created by search terms. This global index is then split across partitions. Eg. Ids of Red cars from all partitions will be saved on a single partition, while Black cars might be on another. These are partitioned by the term and hence the name. This is more expensive for writes, especially to update data on other partitions but makes reads faster.

Rebalancing Partitions

The process of moving load from one node in the cluster to another is called rebalancing. Eg. Nodes need more CPU, RAM or disk, DB should be able to handle one of the nodes failing.

Outcomes of rebalancing:

  1. Nodes have equal distribution of data
  2. Data transfer happens in the most efficient fashion, so only the minimum reqd. amount of data moves around
  3. DB continues working while rebalancing happens

Note that rebalancing can be done manually or automatically. Auto is faster, but might trigger rebalances when they are not really necessary, like when a node is having temporary data overload.

Strategies for Rebalancing

How not to do it: hash mod N

If we do Hash mod N with N being number of nodes, we will start with a particular partition for each data point. If the number of nodes changes the position of these data points needs to be adjusted again. Such frequent moves make rebalancing excessively expensive.

Fixed number of partitions

Create many more partitions than there are nodes, and assign several partitions to each node. For example, a database running on a cluster of 10 nodes may be split into 1,000 partitions from the outset so that approximately 100 partitions are assigned to each node. if new nodes are added, existing partitions are moved to the new node.

Only entire partitions are moved between nodes. The number of partitions does not change, nor does the assignment of keys to partitions. The only thing that changes is the assignment of partitions to nodes. Used by ElasticSearch. Finding the right balance of number of nodes and partitions important - can't have too few or too much of either.

Dynamic partitioning

Dynamically adjust size of partitions. When a partition exceeds a configured size, it is split into two partitions so that approximately half of the data ends up on each side of the split. Each partition is assigned to one node, and each node can handle multiple partitions. An advantage of dynamic partitioning is that the number of partitions adapts to the total data volume.

Partitioning proportionally to nodes

With both fixed and dynamic partitioning, the number of partitions is proportional to the size of the dataset, not to the number of nodes. The other option is to make the number of partitions proportional to the number of nodes.

Maintain a fixed number of partitions per node. If nodes are constant, partitions grow in size. If a new node is added, there are a fixed number of new partitions created for that node and load redistributed from other partitions, so their size goes down. If a node is deleted, partitions in other nodes take on more load. Eg, Cassandra

Request Routing

This is a problem of Service Discovery. Where do we find the data needed? Which Partition in which node? As both data and partitions can move over time, this needs to be tracked. These approaches are possible:

  1. Clients make requests to any node. Node returns data if it is in the local partition, or forwards request to the appropriate partition and passes the reply back to the client.
  2. Maintain a routing layer that handles only routing (no data saved) and acts as a partition-aware load balancer
  3. Require clients to know where data is located

Many distributed data systems rely on a separate coordination service such as ZooKeeper to keep track of this cluster metadata. Each node registers itself in ZooKeeper, and ZooKeeper maintains the authoritative mapping of partitions to nodes. Other actors, such as the routing tier or the partitioning-aware client, can subscribe to this information in ZooKeeper. When nodes or added or removed, ZooKeeper is updated. Used in Kafka, HBase. Eg. Zookeeper can maintain a table of Partition key range : Partition number : Node number : IP address.

Cassandra uses Gossip protocol so that nodes communicate among each other (approach1). This model puts more complexity in the database nodes but avoids the dependency on an external coordination service such as ZooKeeper.

Couchbase doesn't rebalance and nodes directly keep the routing layer updated on latest state.

Parallel Query Execution

Massively parallel processing (MPP) relational database products, often used for analytics, are much more sophisticated in the types of queries they support. A typical data warehouse query contains several join, filtering, grouping, and aggregation operations. The MPP query optimizer breaks this complex query into a number of execution stages and partitions, many of which can be executed in parallel on different nodes of the database cluster. Queries that involve scanning over large parts of the dataset particularly benefit from such parallel execution.