Some thoughts on the design - normenmueller/f-p GitHub Wiki

Draft proposal for fault tolerance and API design

The following document is an exposition of the key issues that I find more significant for the design of the function-passing model. It is just a draft specification whose goal is to shed some light on the topic to help me to fully understand the system requirements, defining basic properties that the system should comply. Consequently, some parts of the document may be incomplete or wrong.

My purpose is to make a relevant and helpful document. Particularly, one issue to address is the backend of our system. The most well-known and plausible ones are Netty and Akka. As the choice is hard and some points or preferences of the system are not fully specified (to the best of my understanding), this document tries to give more details about the pros and cons of each option.

While Netty may be the go-to option because it is the underlying backend of Akka and hence theoretically more efficient, the latter one provides us most of the fault-tolerance features defined in this document. There is a clear tradeoff between speed and time/effort. In mi view, an implementation based on Netty will require more time and effort but the outcome could have better performance which is a good thing for our target users (e.g. Spark). However, this will imply to reinvent the wheel. By using Akka, a prompt basic implementation with all these features could be easily obtained with a high probability of success (not introducing implementation bugs, as Akka is reliable and extensively used). A choice has to be made between these two options or any extra framework that fits our use cases. I leave this issue open to the decision of other members of the team.

API and fault-tolerance notes on the F-P model

One key issue of the function-passing is fault tolerance. Since the f-p model builds a framework to create an abstraction of the interaction between several nodes, multiple failures may happen. Hence, handling failures is a must. This should be as smooth and flexible as possible since the user will specify the recovery process for most of the failures that the framework cannot solve on its own.

The function-passing should be error-friendly, namely the framework has to provide easy-to-use tools to handle failure. Following the available fault avoidance strategies in Akka [1], the function-passing model should have the largest subset of these properties:

  • Fault isolation - a fault should be contained to a part of the system and not escalate to a total crash
  • Structure - a clear structure should help to isolate the active and faulty parts of a system
  • Redundancy - a backup component should be able to take over when a component fails
  • Replacement - a faulty component should be replaced by an active one
  • Reboot - a component should provide the ability to reset the faulty state
  • Component lifecycle - if a component cannot be recovered, it should be terminated and removed from the system

Of course, some of these properties are very Akka-oriented, specially the last two ones. However, they are desirable since its a good way to deal with faulty nodes. The others are quite generic and lay the foundations for a good fault tolerance support in the framework.

In the following document, I make reference to client and node. A client refers to the machine orchestrating and sending commands to the nodes. The nodes only store silos. Even though this terminology could not always true, it is simpler and clearer for the purpose of this draft.

The symbol [*] means that I may be omitting/missing something.

A first draft of the Fault-Tolerance API

Let's define a Silo in our system (already defined).

/* Note that the List is implicitly converted to the underlying Silo representation */
val siloRef1 = nodes.fromFun(_ => List(1,2,3,4,5,6,7,8))(backupNode1, backupNode2)

In this declaration, there are three parts. The first one is nodes, the main nodes of the Silo, where the data is stored. The second one is the anonymous function that will create the collection in nodes1. The third one is the list of backup nodes that will keep a copy of the resulting DAG.

The main node will keep track of the transformations over the data and will update the distributed DAGs once they have been persisted (kept safe in memory, not actually applied to the data). Hence, it becomes the single source of truth for that given silo. It is important that the main node always shares the lineages because the backup nodes should be able to replay its role.

Now, let's modify siloRef1.

siloRef1.map(spore {_.grouped(2)})(
  spore { (error, context) => 
    hostInfo.warn(s"Following error $error has occurred in this context $context") 
  }
)

In the last snippet, the content of SiloRef is grouped in pairs. The first spore corresponds to the success case, namely when the node successfully executes this spore. The second spore is meant to handle the failure. It's a callback and it will be run when an error/exception is caught. It accepts two parameters, error and context (both properly modeled in the framework via case classes). Note: an error encapsulates an exception. An error is meant to be more rich and provide more information. However. this may not be necessary at all.

The context stores meta-information. Some ideas are:

  • Host IP and name (from the machine that sent the command)
  • Timestamp
  • Set of SiloRefs
  • Set of backup nodes
  • Network stats (e.g. speed of connection, ip location)
  • Failure stats (e.g. number of failures*)
  • etc.

* These stats need a weight parameter to properly update the current probability of failure. For instance, if a node has had a high probability of failure in the past and there has not been failures for a week, these stats should reflect an optimistic probability of success.

This information is useful to make decisions when handling failures. For instance, the set of backup nodes could allow to model more complex use cases. Hence, exposing this kind of information to the failure callback looks like a sensible choice. This is just a rough idea of the potential information that the context could store. Thanks to it, a lot of functionality could be provided by default handlers in the framework (via already defined functions/classes/traits), as they depend on data that is always passed as a parameter. See, for instance, a retry mechanism or sending failures to a distributed logger. Both cases could be implemented and exposed to the user via traits that need some kind of user configuration.

Passing failure handlers modeled by traits/classes is another potential idea. The benefits of this approach are:

  1. Reusability and consequent reduction of boilerplate code (by reusing functionality in other traits/classes).
  2. Easier extension of default fault-tolerance mechanisms. E.g. users can roll out a more fine-grained mechanism by extending/overriding an already given one.

Implicits

One aspect to be considered is the use of implicits in the API. It could be beneficial since they could allow the omission of the second parameter and an in-context mechanism to get a locally-defined fault-tolerance handler. Nevertheless, I have my doubts here as it could led to bugs or unexpected behavior of the framework (getting the wrong error handler, for example). Furthermore, it is paramount to have a predictable behavior of the logic. It could be more important and appropriate to make explicit the failure logic.

Failures

The function-passing model has to cope with a broad range of failures, from power failures to network latency. The former is the most serious one as it causes data loss. To overcome it, we need persistence.

Power failures

Suppose that the system crashes in the midst of the execution. It can happen at anytime, so the function-passing model has to store and persist the state as soon as a message is received, i.e. when a user applies a transformation over a silo. The state is defined by:

  • The subset of all the known nodes to the machine
  • The silos that they store
  • The up-to-date DAGs of each Silo
  • The state of sent commands (is a message waiting for an ack?)
  • [*]

The bottom line is that any failure affecting the state of the system has to be resolved by persistence. When the system is recovered, the last state is read and all the parts are updated with that information. This means that any action that changes the state has to be persisted in a safe environment, be it a hard disk or a web service -providing the expected reliability-.

For instance, let's say that a node which is parent of several silos is crashed. The framework needs to react against this event. First, it has to choose the best backup node (based on stats and meta-data *). Second, the client will redirect the commands to the substitute. When the failed machine is recovered, it will receive the last DAG for that Silo. There are two ways of doing this: by negotiation, discovering the backup node and asking for it, or by letting the client to pass the updated DAG to the crashed node. Then, the crashed node will be in charge again. The drawback of this approach is that it relies completely on the availability of the client. Consequently, a more complex solution is needed. An idea: if there is no response from the client, try to get the most up-to-date DAG of all the backup nodes. Therefore any DAG should carry a timestamp.

* There are some situations in which we can improve the behavior of this simple mechanism. I will discuss them later on, in the Heuristics section.

Network failures

They can happen in two situations: when sending commands to machines or when receiving confirmation (ack messages) from them.

This poses the problem that messages could be lost in both directions. To deal with this, we need to define a strategy. I propose to follow an exactly-one delivery strategy which means that a message is only processed once but sent or received as many times as necessary.

Let's say that the client sends a command to a server and that command is lost. The client will wait for confirmation for a given period of time (to be defined) and, if no acknowledgment has been received, it will resend it. However, this poses another problem. What if the server receives a duplicated message? The strategy to deal with this is to only accept messages whose id is equal to the expected id. The expected id is a counter that keeps track of the next expected id and it is incremented when a message is received and it matches. Any duplicated messages with the same id will be rejected because their id is lower than the expected one. This is known to be the ACK-RETRY protocol. If a message with a greater id is received, it will be stored in a map. When the system processes the expected one, it will check if the message with the next expected id it is stored locally. If it is, it will be processed. The procedure will continue until the next expected id is not found and then it will wait for it. While this is not the most efficient way of processing messages in a mailbox, it is the only way that I have found to guarantee the FIFO or in-order delivery.

Other possible failures

There is a subset of failures that, yet very very unlikely to happen, can affect the correctness of the system. Such sort of failure can appear depending upon transient bugs or the platform in which the system is running on. Whereas they could be ignored, I will propose a way to handle them for the sake of completeness.

It is proposed on the paper 'End-to-end arguments in System Design' [3] that a high-level reliability layer has to be implemented just to prevent the casual and unlikely appearance of failures in the low-level layers, e.g. buffer-copy errors. Furthermore, the function-passing model relies heavily on the right behavior of the IO layer, recollecting data from multiple places in order to process it. Therefore, in order to prevent misbehaviors of the low-level API, it is recommended to use a CRC of all the messages sent by the wire. That way, we can detect anomalies in the transport of the messages and be confident that such errors won't occur.

Modeling the delivery

There are three different delivery states: Failure (error when sending), Sent (command has been sent but it's unconfirmed), Ack (command has been confirmed and persisted by the node). As this delivery is asynchronous, the scala.concurrent.Future library needs to be used. We could model the timeout by forcing a future to throw a TimeoutException when the waiting period has finished. Implementation note: if using Akka, this would be trivial with the after method in the ask pattern. Otherwise, we could try to port this function or use a ScheduledExecutorService. There are other possibilities but most of them are blocking..

There are errors or exceptions that the system does not know how to handle. Instead, the user will specify such behavior through the callbacks according to her business model. I propose to encapsulate all the possible results of a spore using an ADT (Algebraic Data Type) to have a better control over the errors. Any error is passed to the failure callback, allowing to detect when a callback is not reacting to all the possible errors* and therefore emitting a warning at compile-time.

* This is done via the compiler which is capable of detecting that a certain pattern has not been satisfied in a pattern matching block.

Fault-tolerance key properties on Netty

As previously mentioned, there is a set of features that helps to bring fault-tolerance to a system. Akka is built atop them, so I will only discuss how to achieve them in the scope of the Netty implementation.

A remainder about the discussed properties

  • Fault isolation - a fault should be contained to a part of the system and not escalate to a total crash
  • Structure - a clear structure should help to isolate the active and faulty parts of a system
  • Redundancy - a backup component should be able to take over when a component fails
  • Replacement - a faulty component should be replaced by an active one
  • Reboot - a component should provide the ability to reset the faulty state
  • Component lifecycle - if a component cannot be recovered, it should be terminated and removed from the system

The first three are rather simple to add to the framework. In fact, they automatically come from the logic structure of the system--nodes spread out in a network and communicating between them. This is more noticeable in the structure property. Besides, fault isolation and structure is explicit through the definition of a rich API that introduces fault-tolerance as a first citizen. With those primitives, the system will take care of failure containment and redundancy. Using Netty, these would be implemented by detecting errors early, disabling connections to faulty nodes and redirecting them to the backup nodes, which implies the fourth property, replacement.

The last two are very related and are performed by defining a messaging particular protocol between the nodes. This protocol has already been defined and therefore it should be extended. Supposing no network errors, a client (its supervisor) should remove a node from the system by sending it a message. In case of a communication error, the client could avoid its reincorporation (discussed in the next section). Then, we end up with a system that complies with all the fault-tolerance requirements.

Heuristics and further improvements

Heuristics and improvements may be used to adapt the system to a real-world scenario. The control over the nodes granted to the client must be total and, in order to avoid complex settings, defaults should be used.

Decision of the next backup node

This is a complex decision that greatly depends on the use cases. Suppose that we recollect a great amount of data and statistics of the running system. Since we want easy user configuration, users should determine which variables of the recollected data are more important. Let's say, for instance, that we have an equation that computes the probability of a node to be the next backup node. Concrete variables are present in the equation: failure probability* (which could be split into the different variables representing each kind of failure), average network throughput, average minimum time of response, etc. If every variable has a weight parameter in that equation, a user could change that weight at their own convenience and affect the resulting probability. These settings could be obtained via implicits and be specified via case classes.

Besides, as it is handy to provide a default for the weights, heuristics could be used to find a proper combination of weights that generally works. It will be difficult to find such a combination, but in my view it's worth to try it.

* Remember that the recollection of data and statistics may be relative to a time variable.

Application of transformations

For large data collections, it is recommendable to reduce the times we go through all the elements to apply transformations. Say we have to apply two map operations in a row. If some node is requested to apply them, it would be useful to compose them into a singular transformation, hence avoiding the need to go through all the elements twice, each time for each transformation.

Decision of the reincorporation of a given node

Imagine that a node is failing constantly, which could damage the efficiency and expected behavior of the system. A new mechanism to control the reincorporation of nodes once they have failed is desirable. Should a node has failed more than 10 times (let's name it the failure threshold), a client could reject its reincorporation and rely completely on the backup server. This is a handy feature since a node that has already failed several time has a high probability to fail again.

To be continued...

Questions lacking answers

  • What should the function-passing framework do if the client is crashed? Imagine that some node crashes and the negotiation for redirecting all the traffic is started. Now, suddenly, the client which is the responsible for that negotiation crashes and it's not able to hold it. The answer to this question should give exact information about what is the state of the system at that given moment and what action should be performed when the client reboots or starts working again.
  • Which framework will be used for the final version of the system?

Bibliography

[ 1 ] Extracted from the book Akka in Action, Raymond Roestenburg, Rob Bakker, and Rob Williams, Manning

[ 2 ] Wikipedia - Single source of Truth

[ 3 ] End-to-end arguments in System Design. J.H.Saltzer, D.P. Reed and D.D. Clark

[ 4 ] The Ask pattern in Akka - Akka Docs