Maintaining Persistance in DHT - hydra-hoard/hydra GitHub Wiki
How many times have you implemented a tree or a graph in C++ to solve a tricky algorithmic problem ?
Or how many time have you implemented a new data structure like an exciting Segment Tree or a Trie to brush up on those computer science skills ? I’m guessing quite a bit.
Learning these data structures and algorithms is very important. However, one thing that tutorial sites don’t teach is how to persist that data structure.
Persisting a data structure means keeping the data structure synced with your main memory. So ,if your application crashes, you don’t lose your data.
Almost any software has persistance built in. Some cool examples are:
- __Redis __— An in memory persistant key value store
- MongoDB — Keeps a B-Tree like structure saved on disk
- Literally Every Database in the world
So yeah, if you’re building a database. You kinda need to save that data for the user. Go figure.
So as we have established that persistance is required, let’s get into how we would go on about actually implementing something like this.
A persistant data structure has two features generally:
- Transaction Log
- Periodic Snapshotting Let’s call our data structure A.
Transaction Log
A transaction log for a data structure is a record of all the write queries performed on it. The idea is that if one were to replay all these queries on an empty A, we would get the current state of A. A transaction log keeps a track of the history of A.
Hence, the idea behind a transaction log is two fold.
- Keeps track of the history of A.
- Instead of saving the data structure to disk at each write query, we simply append the write query to our log. This method is substantially faster and more memory efficient.
So how do we implement this? Lets’s look into some sample code.
Please note that this code can change in the Hydra codebase.
package persistance
import (
"encoding/binary"
"fmt"
"hydra-dht/constants"
pb "hydra-dht/protobuf/node"
"hydra-dht/structures"
"os"
"github.com/golang/protobuf/proto"
)
var (
logFile *os.File
filePosition int64
)
/*
convertNumberToBytes is a helper function to AppendToLog. It converts a uint64 number
to a byte array . The size of byte array is 10. This is required to keep track of size of
logObject in the log as that object can be of any size.
Hence , we first store the size of log Object in bytes, and then store the byte representation
of the Log Object.
Arguments:
1. i = The uint64 number , the size of logObject
Returns
1. byte[] = The byte array representation of i
2. int = The actual number of bytes , the number i took. The max number of bytes it can take is 10
*/
func convertNumberToBytes(i uint64) ([]byte, int) {
buf := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(buf, i)
return buf, constants.LOG_OBJECT_BYTE_SIZE
}
/*
AppendToLog appends a DHT entry into the persistent transaction log.
The function first adds the number of bytes the LogObject is of, then inserts
the LogObject into the log file.
This method helps when we're reading the contents of the log back into memory.
As the Log Object does not have a fixed size.
Arguments:
1. node : Node object inside the DHT
2. dhtIndex: Bucket Index of DHT table into which to put the node into
3. listIndex: The index in the Bucket List of DHT in which the Node is added to.
Returns:
1. error: Returns error , if no error then error is nil
*/
func AppendToLog(node structures.Node, dhtIndex int32, listIndex int32) error {
// the data structure that contains information on our write query
// we want to store this data structure into our log
logObject := &pb.LogNode{
Node: &pb.Node{
NodeId: node.Key[:],
Domain: node.Domain,
Port: int32(node.Port),
},
DhtIndex: dhtIndex,
ListIndex: listIndex,
}
// converts out log Object into bytes
// as data in a file is stored in bytes.
// we are serialising/ marshalling our object
out, err := proto.Marshal(logObject)
if err != nil {
return err
}
// the number of bytes consisting of logObject
i := uint64(len(out))
// As the size of byte object can be indeterminate, we store the size of log Object
// in bytes before appending our logObject. hence while reading,
// we find out how many bytes to read to get our entire log Object.
// the size of log Object is aof fixed size. That size is 10 bytes.
// 10 bytes is the size of an uint64 integer.
buf, _ := convertNumberToBytes(i)
// Appending the size and logObject and writing to file
ob := append(buf, out...)
n2, err := logFile.Write(ob)
if err != nil {
return err
}
fmt.Printf("Wrote %d bytes for LogObject\n", n2)
// Flushing that write to disk.
err = logFile.Sync()
return err
}
Appending to log
In this example, we are creating a write query object called logObject. This logObject can be of invarient size, hence to properly read this object from file. The size of logObject is also stored. This size is fixed, 10 bytes. This is the size of the uint64 data type in bytes.
Hence , while reading the transaction log back into memory, we perform three iteratively.
- Read size of logObject i.e, read first 10 bytes in file. Retrieve size. Let’s call this size n
- Read the n bytes from the file to retrieve the logObject.
- Repeat above two steps , until pointer has reached end of file.
The code for this is given below:
package persistance
import (
"encoding/binary"
"fmt"
"hydra-dht/constants"
pb "hydra-dht/protobuf/node"
"hydra-dht/structures"
"os"
"github.com/golang/protobuf/proto"
)
var (
logFile *os.File
filePosition int64
)
/*
ReadLatestObjectFromLog reads and sends the latest / last log Object inside
the log file.
Arguments:
None
Returns:
1. logNode = The Log node object retrieved from the log file. It will be empty
if there is an error
2. error = Error object. Will be nil if no error.
*/
func ReadLatestObjectFromLog() (*pb.LogNode, error) {
logFile.Seek(filePosition, 0)
sizeOfLogObjectBuffer := make([]byte, constants.LOG_OBJECT_BYTE_SIZE)
logFile.Read(sizeOfLogObjectBuffer)
filePosition += constants.LOG_OBJECT_BYTE_SIZE
sizeOfLogObject, _ := binary.Uvarint(sizeOfLogObjectBuffer)
filePosition += int64(sizeOfLogObject)
logObjectBuffer := make([]byte, sizeOfLogObject)
logFile.Read(logObjectBuffer)
logObject := &pb.LogNode{}
err := proto.Unmarshal(logObjectBuffer, logObject)
return logObject, err
}
/*
addToDHT is a helper function to the LoadDHT function. It adds the value to the
DHT. If required it appends the value to list, else it inserts the value at the
said position.
Arguments:
1. dht - The DHT in which value is added
2. logObject - The object which contains value to be added, along with dhtIndex
(the bucket in which the value is to be inserted) and bucket list index in
which the value is to inserted.
Please check proto Log Object defination to know more about the LogNode variable.
*/
func addToDHT(dht *structures.DHT, logObject *pb.LogNode) {
row := logObject.DhtIndex
col := logObject.ListIndex
var nodeID structures.NodeID
copy(nodeID[:], logObject.Node.NodeId)
n := &structures.Node{
Key: nodeID,
Port: int(logObject.Node.Port),
Domain: logObject.Node.Domain,
}
if len(dht.Lists[row]) < int(col+1) {
dht.Lists[row] = append(dht.Lists[row], *n)
fmt.Println(dht.Lists[row])
} else {
dht.Lists[row][col] = *n
}
}
/*
LoadDHT loads the DHT by replaying the transaction Log entirely.
It performs all the operations on the DHT that have occurred in the past, hence
getting DHT to the same state as before the crash/shut down
Arguments: None
Returns:
1. DHT - Returns the DHDT loaded from log, empty if error occurs.
2. error - Non nil value if some error occured in between the program.
*/
func LoadDHT() (*structures.DHT, error) {
var dht structures.DHT
fi, err := logFile.Stat()
filePosition = 0
for {
if filePosition >= fi.Size() {
break
}
logObject, err := ReadLatestObjectFromLog()
if err != nil {
return &dht, err
}
addToDHT(&dht, logObject)
}
return &dht, err
}
A Word on Concurrency
What happens when multiple threads want to append some data to the log. Luckily for us, the file systems in Linux are thread safe. that means that writing to a file is atomic. Only one write at a time takes place.
However, code in your language is not thread safe. Hence, one must take proper care in writing to a log to prevent unexpected behaviour. A way to maintain predictability in your logging application is to use a mutex lock.
Go provides concurrency primitives called goroutines which are great for communicating data.
In the Hydra application, the Distributed Hash Table appends to a log in such a way that write records among different lists are independent. Hence, no locking mechanism is required. That makes it quite fast.
We shall talk about the second part of implementing a persistant data stucture i.e periodic snapshotting in the future.
Note: Watch this space for updates to the Hydra Persistance module.
Periodic Syncing Of DHT
After some time has passed, we want to flush the DHT to disk. This is useful as , we don't want to grow the log indefinitely. The log would have a lot of redundant information, and the recover process will take a longer time as the log size turn out to be very big.
Hence , to implement this periodic syncing of DHT to disk we do the following.
- After a fixed interval of time, say after one hour.
- A snapshot of the DHT is taken and passed to a function.
- This function first changes the log file pointer to a new empty file, let's call it log-2. The previous log file was called log-1.
- It flushes the DHT to disk, in a file called dht-1 , and after successful flushing, deletes the log-1 file. And any previous dht files are deleted too.
- The next time this function is called a log-3 file and dht-2 file will be created and log-2 and dht-1 will be deleted.
This process is carried on indefinitely after a fixed time interval. The naming scheme is such that the
- If the index of the dht file and the log file are equal , the dht file represents the DHT after processing that log file.
Recovery From A Crash
If the Hydra program crashes/shuts down. When it boots up again, the recovery subroutine is called. This recovery routine looks at the DHT and LOG files available and seeks to recreate the DHT at the instant just before the crash.
It orders the log files and dht files in a sorted order as such.
log-1, log-2, log3, ...
and
dht-1, dht2, dht3, ...
Then according to our naming scheme , it finds out the DHT file with the highest index and loads that in memory. After loading it , it processes the log files ahead of it (with a higher index number).
So for example if we have lists of log files and dhts as such:
- LOG: log-1,log-2,log-3,log-4
- DHT. dht-1,dht-2,dht-3
Action: It will load dht-3 in memory and play log-4 operations over it.
If for some reason the file is corrupted, hydra shall look one step behind , i.e dht-2 . This process shall go backwards on each failure, and if all files are corrupted, an empty DHT is returned.