Home - achalsoni81/Jarvis- GitHub Wiki

Jarvis

An intelligent layer on top of Hadoop

Hadoop is an industry-leading, open source, distributed computing framework based on the simple yet elegant processing paradigm of MapReduce and built on top of the Hadoop Distributed File System (HDFS). Hadoop has been making waves in the field of data processing, number crunching, data mining, and a whole lot of other purposes being employed in various different organizations and companies. Hadoop has also fostered a strong industry-wide community that has also led to the creation of several related projects that are built on top of and around Hadoop. Jarvis seeks to join these related projects by building on top of Hadoop and adding functionality to make it a more complete solution, and is built on an older instance of Hadoop (version 1.0.3).

Jarvis - Goals

Jarvis started out as a way to provide low latency random access functionality on data living in HDFS. At many Web companies, who have large datasets that they process every day for various purposes, data was being duplicated and transferred into a secondary serving system - such as Cassandra (NoSQL) or MySQL databases.

Over time, Jarvis has evolved in it's vision. While a lot of work is left and only some goals have been met in the current prototype (project progress), there is nevertheless a set of features/functionality that Jarvis hopes to accomplish. They are the following:

  • Provide low latency, random access reads
  • Use indexing and other methods for smarter, more intelligent queries
  • Allow for streaming writes into a Hadoop cluster
  • Allow for instantaneous processing of data (no need to bulk ingest data)
  • Provide a Storm-like real-time processing framework that allows for data analytics to be tied between incremental data and data on the cluster
  • Provide for a data manager so that files/indexes are managed and abstracted out so that an end user just has to provide a flow for the data and everything else is handled
  • Investigate file formats/storage paradigms to allow for fast processing (column store) yet maintains efficient record serving
  • Look at ways to provide a more automated and user friendly experience to data processing (providing a functional web interface, integrating with other technologies such as Hive and Pig, and automated workflows and such)

Project Background

Here is some background on how the project came to be and those who made it happen.

Hadoop Architecture

There are several key components in the version of Hadoop that Jarvis is built on. The logical relations amongst these components are outlined below: Hadoop Architecture

Hadoop has two fundamental components - the MapReduce processing aspect and the distributed file system aspect (HDFS). A client may interact with and control a Hadoop cluster through two means - one for each respective component.

All interaction relating to the creation and maintanence of data, files, and filesystem is achieved by communicating with the NameNode, which holds all the information about the data in a given Hadoop cluster. This communication is encompassed in an Distributed File Object, which is wrapper for an underlying DFSClient object, that does RPC calls with a corresponding server on the NameNode in a cluster.

The data itself is stored on the slave nodes of the cluster. Each of the nodes has software running that handles the data management aspect of the cluster. This facet of the slaves nodes is referred to as the DataNode component of a cluster. In Hadoop, data is split up into data chunks of a certain block size (traditionally 64 mb). The NameNode decides where to place the different data chunks (on which DataNodes) and has the information about where each block is located (so that when that data is requested, it can point the client to the correct DataNode).

Any interaction for actually submitting and running MapReduce jobs can be done in a few different ways - via a JobConf object in Java code for instance, or via a command line operation. Regardless of the submission method, the necessary information for a job (the mapper and reducer code, the input and output files, etc.) are all routed to the JobTracker, which is responsible for the scheduling, maintaining, and running of jobs.

In most conventional uses of Hadoop, work is broken up and parallelized by respecting the fundamental idea of data chunks and scheduling a Map task per block. These Map tasks, and subsequently Reduce tasks, are fired off by the JobTracker by creating a TaskTracker that does the actual processing work on that respective block of data. The JobTracker seeks to maintain data locality to the best of its capability and place the TaskTrackers on the DataNodes where the data lives (sometimes the input split size covers data that lives in multiple locations, in which case it will try to schedule it on the DataNode that contributes the most).

Jarvis Architecture

Hadoop was created with a mindset of providing a platform for end users to easily write code to process massive amounts of data that was being generated without having to worry about the different, vast problems of parallelizing the work and processing the data in a distributed manner. Unfortunately, this translates into a system that has certain limitations and lack of certain functionality.

One of the most glaring missing aspects of the Hadoop framework is the inability to randomly access a certain {key, value} pair (which is how all data is stored and processed in Hadoop), let alone with the ultimate goal of low latency. In order to actually retrieve a single {key, value} pair or a group of {key, value} pairs, you would have to generate a MapReduce job that would crunch through each tuple and filter out the ones we are interested in. However, in many use cases, most significantly those found at web companies, we don't want to only process the data, but we also want to be able to access the data easily and randomly as well.

The ability to serve data straight out of a Hadoop cluster is one of the main focuses of Jarvis. In many organizations, data is duplicated into an entirely separate storage solution, in either SQL solutions like a MySQL server or in a NoSQL solution, such as Cassandra. Jarvis however, takes a different approach, and avoids the entire process of duplicating data. Unfortunately, this has the implication that another solution must be found to compensate for the decision to not replicate and store data in a separate system. The solution that Jarvis takes is to store the underlying data in a sorted manner. By keeping data in a sorted manner, Jarvis is able to adopt one of the most widespread data structures in relational databases - B trees.

The following is an architectural model of Jarvis, outlining it's various components and shows how it builds right on top of the Hadoop model. Jarvis Architecture

###Indexing - Model and Creation


Even with the furious growth in the memory industry, RAM remains to be the scarcest resource in any machine. When the question of providing low latency random access to data arose, it was understood that a creative solution that worked around and understood the memory constraints was necessary. This led Jarvis to choose between two solutions:

  1. Store metadata for each {key, value} pair that was necessary to retrieve it in the index and store the index on a front end machine.
  2. Store some of the metadata for each {key, value} pair on the backend with the actual data and store some of the metadata on the frontend machine to create a somewhat two tiered system.

The question then became mainly of where the data caching would be done. If we chose the first solution, then most of the caching would be done on the backend machine, whereas if we went with the second solution, then most of the caching would be done on the frontend machine. Jarvis went with the second solution for a few reasons:

  1. If most of the caching is done on the frontend machines, then there is a much higher likelihood that the data a client requests will be on a front end machine. This means that in more cases than not, we will save two network calls (sending the request to the backend and then receiving that request).
  2. More caching on the front end machines would allow for a more holistic caching policy with regard to caching writes as well as a more overall view of what data is being requested across the entire dataset.
  3. Caching on the backend is a hard problem with many different processes running on those nodes - (Hadoop processes and now processes for serving and retrieving data) - and would most likely just simply be left up to the OS.
  4. Although it is much weaker, one more consideration was that the front end machines ought to be responsible for a lot more than simply a very explicit store for each and every {key, value} pair (which seemed to be overkill to be storing completely in memory).

####Index Model The indexing policy is very simple. The entire system is based off the idea of grouping together the {key, value} metadata in buckets. Imagine a traditional B tree with multiple layers, where the bottom-most last layer is the {key, metadata} values. The second bottom-most layer has {key, pointer} entries, where the pointers point to essentially groups, or buckets, of {key, metadata} values. In Jarvis, instead of holding the full B tree in memory, we essentially store everything but the bottom-most layer, which is in turn then delegated to the backend machines. This is best aided by a visual.

B Tree Index

This means that the last layer must be stored away from the front end machines, but rather on the backend machines. In Jarvis, because it is integrated right onto Hadoop, the backend machines are actually the DataNodes in the cluster! It is time to investigate how the index is created to further explain how the index actually works, is stored, and how read requests are processed.

Index Creation

For a B tree, the data must be sorted. This is the precedent that Jarvis has chosen to store it's data in. What is interesting to note is that the Hadoop framework lends itself to accomplish this naturally. This is so because in between the Map and Reduce phases of a job, there is a shuffle and sort stage where data outputted from the Map stage is sent to some Reducer, and for a given Reducer, all the data it processes is sorted. Therefore, Jarvis decides to use a MapReduce job to do the sorting of data (a small intricacy is that data is sorted per Reducer, so for a globally sorted file you must have only one reducer or use a range partitioner for the shuffle aspect of the shuffle and sort stage instead of a hash partitioner so that the multiple reducers can simply be appended together).

While the problem of sorting has been solved, what about creating the index? What metadata do we need? Where do we store this metadata? How do we even create this metadata? The answer lies in the very MapReduce job that is used to sort the data.

One of the most widespread ways of storing data in a Hadoop cluster is in the SequenceFile format. That is the file format Jarvis chooses to store data in as well. The SequenceFile has three ways of storing data - uncompressed, record compressed, and block compressed. For simplicity, we will examine only the first two, as their actual storage is in the same structure.

The SequenceFile format packs at the beginning of the file a variety of information in the form of a header. It then begins to write {key, value} pairs by writing an int for record length (key length + value length), then an int for key length and then the key and value themselves using the Hadoop Writable serialization framework. seq file Jarvis, however, needs to have it's own file format for the MapReduce job that not only stores the data in a SequenceFile but also creates the index meta-info as well. For this task, Jarvis employs a custom file format called HashIndexFileFormat. The generation of the meta-info is relatively simple. In the Reduce stage of the MapReduce job, the TaskTracker creates a RecordWriter that takes {key, value} pairs and writes it out in the file format specified. In the HashIndexFileFormat, the following are the steps taken when writing out a {key, value} pair:

  • The {key, value} pair is written to a SequenceFile. When the MapReduce job is run to create the globally sorted file and for the index generation, the output is not a file, but rather a directory. In this directory, there is a SequenceFile that holds all the data. This is the SequenceFile that the {key, value} pair is written to.
  • The file offset inside the SequenceFile is determined when the {key, value} pair is written. This file offset however is the position of the record inside the entire file, but we are interested rather in the offset in the current block. Therefore we keep a counter of what is block byte boundary so far, and simply subtract it from the file offset (the block byte boundary is initially 0, then will increment by 64 mb, etc.)
  • There is always a hash table that the {key, block offset} pair is written to. When a heuristic determines that the hash table is full (usually when it reaches the byte size of one disk IO - traditionally 4096 bytes) the hash table is written out to a separate file that also lives in the Hadoop cluster, called HashIndexi, where i is the block we are on. Whenever we serialize a hash table, we also write the first key that goes into it along with it's offset in the HashIndexi file into a separate file called HashMetaInfo.
  • If the block offset is greater than or equal to 64mb, that means that the record we just wrote is in a new block of the file. Therefore, we serialize out the hash table we've been writing to, close that file, increment i by 1, and create a new file in the cluster called HashIndexi. We then increment the block byte boundary by 64mb. We then write this key along with the value -1 into HashMetaInfo to indicate that this key marks the beginning of a new block.

After the end of the MapReduce job we are left with the following:

  • A SequenceFile that has all of our data stored sorted in a standard, industry wide format for easy transportation of the data outside of Jarvis.
  • One file per block in the SequenceFile (HashIndexi) that stores hash tables packed next to each other. These hash tables store the metadata that will be necessary for serving data. It has the {key, block offset} for each key in the SequenceFile and the block offset of course corresponds to the block the file is associated with.
  • One file that has the first key that went into each hash table and the offset into the HashIndexi file that it belongs to. This file also indicates which keys corresponding to the beginning of a new block. This file is used for creating the B tree; in a sense, the B tree really just indexes the hash tables, which are the buckets.

hash index

The index must now be created on the front end machine. This is done by having the front end machine copy the HashMetaInfo file from Hadoop to it's local file system. It then reads the {key, offset} pairs for each hash table bucket into the index (the index is an array of arrays, with a specified fanout so {key, offset} pairs are stored and every fanout pairs a new entry is created in the parent array, and this is done iteratively until there is only one parent). If the offset is -1, then that means that the key is the beginning of a new block. This key is then added to an array that is responsible for keeping information about the blocks as a whole.

Now that we have an index created that tells us that for a particular key which hashtable bucket to look into, we need to gain some information about how to actually look into the bucket.

The NameNode code has actually been modified such that the HashIndexi files are co-located with the i th blocks they correspond to. When data is being written into Hadoop, NameNode is splitting the data up into blocks. When it creates a new block, it chooses which DataNodes to place the data at. When using the HashIndexFileFormat, data is being written into a SequenceFile. When we write a {key, value} pair that falls into a brand new block, HashIndexFileFormat creates a new HashIndexi file by incrementing i and telling the FileSystem to create a file with that name. The NameNode code in that instance, instead of using it's default target policy for choosing DataNodes where the HashIndexi should be located, places them exactly where the block it corresponds to lives.

This modification to NameNode is very important because while Jarvis does indeed live on top of Hadoop, it attempts to eliminate as much interaction with Hadoop as it possibly can. Once we decide to put data into Hadoop, we wish to create and collect as much metadata as possible and then not bother Hadoop at all. All read requests can then be accomplished through Jarvis' own metadata and serving mechanisms. We need these HashIndexi files to live on the same DataNodes as the block that they index because when we get a read request, we will figure out which DataNode to go to to retrieve that data, and our hashtables that give us the index meta-info must be on that DataNode as well.

After building up the index that will tell us for a given key what bucket to look into, we have to figure out that for that given key, which DataNode to route our request to. If you examine how the B tree is created, you will see that we have an entirely separate data structure (an array) for keeping track of keys that start new blocks. So after the B tree has been created, we will have an array the size of how many blocks we have in the data file, that hold the first key that belong in each block (this is perfect for figuring out the ranges of each block that is a function of the sorted nature of the data).

The Hadoop code has been modified that exposes a function that is absolutely necessary for Jarvis' functioning - a method that will tell us the local file name of a data block on a DataNode. Hadoop stores the data in HDFS in the form of blocks, but these blocks are just abstractions. In reality, at a byte level, these blocks are stored as local files on the DataNodes that they have been allocated to. When you read a HDFS file, the NameNode will go to appropriate DataNodes who will in turn return the data from these local files. In Jarvis, our objective is to figure out where the blocks live (which DataNodes) and what are the respective local file names, so that we can open them up and read from them at a specified offset (from the hashtables in the DataFinder) so that we can do all our random access serving entirely separate from HDFS. Once the B tree has been created, a simple for loop that iterates through the array responsible for holding information about the DataNodes and such is executed. In this for loop, for each block, we ask NameNode for the hostnames of the DataNodes that have the blocks and also ask for the local file names.

One last thing that must be taken care of is that the HashIndexi files are not meant to live on HDFS. The sole reason that Hadoop knows about those files is because that is a necessary byproduct of how they were generated (a MapReduce job). Our job is to then delete that meta-info from Hadoop (save some memory on the NameNode side) but ensure that those files are still there. The two ways to do that is to hack NameNode to only delete meta-info but not actually delete the file, or to figure out what the local file names are for the HashIndexi files themselves and tell the DataFinder on the appropriate DataNodes to copy and rename those files to the same local file name as the block they correspond to but also append "-index" to the name. Jarvis chooses to do the second as it is far simpler and makes requests easier as we will see. We are now set for the index creation.

Read Requests


Now that all the necessary constructs and meta-info for the index has been created, the final component of handling read requests must be addressed. To handle external requests coming from clients, the front end machines that have the IndexManagers also have running Jetty servers. When a HTTP GET request is received by the server, it asks the IndexManager to retrieve all relavent information for rerouting the request. The relevant information is the following:

  • The DataNode to route the request to
  • The local file name for the block on that DataNode
  • The offset of the hashtable in the HashIndexi file that contains the offset into the block file for the key we are interested in
  • The length of the aforementioned serialized hashtable

The first two pieces of information is obtained by a binary search on the separate array for DataNode-related information we created in the IndexManager. The second two pieces of information is obtained by traversing the B tree in the IndexManager. Once these pieces of information has been created, an HTTP connection is established with the appropriate DataFinder (it also has a Jetty server) by creating an URL that contains all of the relevant information (including the key we are interested in).

The DataFinder receives a HTTP GET request that contains all of the different parameters necessary for retrieving the value for a given a key and takes the following steps:

  • The DataFinder first looks at the local file name, and instead opens the file whose name is the given local file name + "-index".
  • It seeks to the specified offset, and from there loads the specified length (in bytes) into a byte array.
  • It uses an ObjectInputStream wrapped around these byte buffer to deserialize the hashtable.
  • It takes the given key and retrieves the offset that it has recorded for that key inside the block file.
  • It now opens up the local file name specified and seeks to the offset it just obtained from the hashtable.
  • It uses the SequenceFile way of reading in a {key, value} pair by reading the record length and the key length, and subsequently reads the key and value.
  • After a quick check to ensure that the key that was read is indeed the one we are interested in, it returns the value back to the front end machine that then completes the handling of the read request.

Range Queries


With the approach Jarvis has taken - sorted data and B tree indices - it is natural to use the IndexManagers to eliminate data that we may not want in terms of range queries. When it comes to Big Data, often at times the most time-costly aspect of running a job is just the sheer massive amount of data that has to be processed. Cutting down on the amount of data would result in significantly faster job. This task is actually much easier once an index has been created on a given dataset.

In Hadoop, the data is stored physically in blocks (usually 64MB in size). However, this is not necessarily the boundaries or splits that the Mappers operate on. For example, a MapReduce task can process the entire file in one Map task, or one Map task might only account for 10MB. The generation of the input splits that the Map tasks work on is in the InputFormat class that is specified when a job is run. The answer for choosing specific data to process lies in this.

Jarvis uses a custom file input format for generating the input splits into a MapReduce job. It requires a few non-default properties to be added to the job configuration object. This includes properties such as what is the file offset of the starting key and the ending offset. This information is something that can be easily obtained by adding some small functionality to the IndexManager and DataFinder aspects of Jarvis. After that, the file input split will return splits that will only cover data from the beginning and end offset (future functionality can be added for having multiple ranges instead of just one).