Design - accandme/openplum GitHub Wiki

To describe the design of our system, we first present its architecture. We then give an overview of distributed query execution and describe how it is different from the planning stage of any non-distributed query execution pipeline. We then state our major assumptions about our system, describe the major algorithms we came up with, and finally describe how we go about executing a given query.

System Architecture

The architecture we decided to use for our system is a simple two-tiered architecture where a single master node controls a set of worker nodes that each runs its own (single-node) RDBMS instance. The worker nodes are hidden from the user, who only interacts with the master, and to whom the master appears as a fully-functional RDBMS itself. The worker nodes, on the other hand, operate independently of each other and are not aware of each other, as all interaction goes through the master. This enables us to have an arbitrary number of worker nodes, as long as the master is aware of them.

In other words, our system can be viewed as a layer on top of a set of RDBMS instances. This architecture was inspired by Milestone 2 of our Advanced Databases course project from the Spring 2012 semester. The requirement there was to use a set of eight worker MySQL nodes and build a system that executes two queries on a database partitioned on these nodes. In fact, the implementation of the current system is based on our implementation of Milestone 2.

Major Assumptions

While building our system, we made several major assumptions on our system and on the partitioning scheme, which we would like to disclose next.

First of all, we assume that the more work we push down to our RDBMS worker nodes (rather than do in our code), the better performance we can achieve. We believe this is true mainly because of two reasons: (1) the data and operations on the data are kept in one place, and (2) the optimizer already available in the underlying RDBMS is used and need not be reinvented.

Second, we assume that the data is partitioned on the worker nodes uniformly and without overlap. That is we assume that no part of any relation is replicated, and that horizontal (and not vertical) partitioning is employed. We made this assumption in order to avoid having to handle duplicate tuples that may come from different worker nodes. However, we understand that replication can provide significant boost to query execution, and support for it is something that could be incorporated in the future.

Finally, we assume that we achieve the best performance when each node produces its share of the query results given the underlying partitioning (we make sure the node has access to all the data it needs), rather than a small subset of nodes doing a lot more work. We believe this is true because each node would perform more or less the same amount of work, and thus the query execution would be maximally parallelized.

Overview of Distributed Query Execution

Any modern DBMS, when executing a query on a single node, typically performs the following three steps for a given query:

  1. Parsing
  2. Planning
  3. Execution

In the parsing stage, a given query string is parsed and transformed into a formal data structure called a parse tree. This parse tree is then given to the query planner, which, using the system catalog that has information about each relation, decides on an efficient way to execute the query and builds another data structure called a query plan. This query plan is then handed off to the query executor which executes it. Given our chosen system architecture model, the master node is the one that must ensure that the given query gets executed. To that end, it does produce a parse tree, but instead of producing a typical query plan reminiscent of regular RDBMS query plans, it produces a distributed query plan. In other words, it comes up with a set of steps that must be executed on our given worker nodes in order to produce the correct final result. The master then executes these steps by sending SQL queries to the worker nodes. We would like to stress that distributed query planning is different from non-distributed query planning in the sense that the operators are not the same. Although our system does not do this at this stage of development, distributed query planning could also choose among several query plans, given how the underlying data is partitioned. While building our system, we tried to make it work for any underlying partitioning, and hence we do not take it into account. Query optimization was not our primary focus, but rather something that could be added in the future.

Aggregation

In our architecture model, if every worker node has all the data necessary to execute the query (given its partitioned share of the database), all we have to do is to execute the query on all nodes and then combine the results. This is true even if a query contains nested queries. However, aggregated queries pose a slight problem. Without nested queries, if our given query is an aggregate query (e.g. it contains an aggregate function such as SUM), we can still pass it to the workers, but combining the results would be more difficult. This is because, with some aggregate functions, the nodes need to return not the final result of the aggregate function, but rather its final state. This is easily illustrated by the AVG function, which maintains as its state a sum of elements and their count. If our query contains this aggregate, we need our nodes to return the final states of this aggregate so that we can combine them and produce the final aggregate value.

Note that the function producing the final aggregate state on each node and the function combining the different aggregate states are also aggregate, but they are different from their regular, non-distributed variant of the aggregate function. We refer to the two functions as intermediate and final variants of a given aggregate function.

Queries containing nested aggregate queries pose a further problem. Because the result of the outer query depends on the result of the nested query, and because, as shown above, the result of any aggregate query must be computed as a combination of the results from all worker nodes, we have to execute the nested aggregate query first and notify the workers of its result before attempting to execute the outer query on them.

SuperDuper Algorithm

The fundamental algorithm of our system is called the SuperDuper algorithm. It is based on Bloom Join, a well-known algorithm for performing equijoins in distributed databases. Before we go into the description of our SuperDuper algorithm, we give a short reminder of Bloom Join.

Bloom Join

Bloom Join is an equijoin algorithm for joining two relations stored at different sites based on an equality between a field in each. Assuming relations (R) and (S) where (S) is the smaller relation, and an equijoin condition (R.a = S.b), the algorithm works as follows:

On the site with (R):

  1. Choose a hash function (hash) with range ([0; k-1]), where (k) is an integer in the order of the cardinality of (S).
  2. Construct a vector of (k) bits and set each bit to 0.
  3. For each tuple (r \in R), compute (h = hash(r.a)) and set the (h)th bit in the bit vector to 1.
  4. Ship the final bit vector to the site with (S) and wait for results.
  5. Upon receiving the result set, perform an equijoin locally.

On the site with (S):

  1. Receive the bit vector from the site with (R).
  2. Construct an empty result set (S*).
  3. For each tuple (s \in S), compute (h = hash(s.b)) using the same hash function (hash). If the (h)th bit is set in the bit vector, append the tuple to (S*).
  4. Ship (S*) to the site with (R).

The Bloom Join algorithm proves very efficient because the amount of data shipped is very small: a bit vector is cheap to ship, and (S*) contains little useless data if a good hash function is chosen.

Node State Independency

To do the work along the lines of our third major assumption, where each node should produce its share of query results, we first have to ensure that each node is capable of doing so, i.e. it has access to all the data it needs. We say about a node that satisfies this criterion for a particular query that its state is independent of the states of the other nodes.

We next claim the following: the state of any node can be made independent of the states of other nodes along any one equijoin condition. To see why this is the case, consider two relations (R) and (S) in a distributed database, and an equijoin condition (R.a = S.b). We can assume that (R) and (S) are arbitrarily partitioned (horizontally and without overlap) on a set of nodes. It is easy to see that a node holding some partition (R' \subseteq R) is able to produce its set of the equijoin results if it has all the tuples from (S) that match the equijoin (or vice versa), thereby making its state independent. By the same logic, the state of each other node can be made independent.

The Algorithm

Node state independency is the goal of the SuperDuper algorithm. Given a set of nodes, two relations (R) and (S), and an equijoin condition (R.a = S.b), it uses the Bloom Join algorithm to make the states of each node independent along the condition. This is achieved by running the Bloom Join algorithm between each pair of nodes, shipping the necessary portions (S* \subseteq S) to the locations of their matching (R' \subseteq R). All steps of the Bloom Join algorithm are executed except the last (local equijoin), which is implicitly part of the query that will be executed on each node.

The name of the algorithm is derived from an earlier version of it where the last step of Bloom Join was not yet omitted. After running that version of the algorithm on relations (R) and (S), we would jokingly refer to the resulting relation as “Super (R)” (or “Super (S)”, depending on the direction of the algorithm). A further SuperDuper of Super (R) with a relation (T) would then produce a “Super Duper (R)”, a reference that eventually contributed to the algorithm’s name.

SQL Query As a Graph

Taking the idea of being able to execute equijoins in a distributed manner by using the SuperDuper algorithm, we can further express an SQL query as a graph of equijoins. In such a graph, each relation of the query would be represented by a vertex, with an edge between two vertices representing an equijoin condition. Vertices representing physical relations are themselves physical, whereas vertices representing nested relations (i.e. in the case of nested queries in the FROM clause) are represented by supervertices, which can in turn contain vertices and edges. These vertices are named after the relations they represent. In addition, we represent any aggregate (sub)query as an additional, anonymous supervertex. Because the graph may contain supervertices, we often refer to it as a supergraph. Moreover, because no vertex can belong to more than one supervertex, the graph is a simple supergraph.

While processing the query, we execute each supervertex until completion, i.e. we execute the query by converting each supervertex to a physical vertex. As explained before, we need to run aggregate queries until completion at all times. This is the reason aggregate queries are mapped to supervertices, even if they are nested in the WHERE clause where we normally would not create a supervertex.

Our query graph does not contain any other information about the query, such as the projected fields or operators used in the WHERE clause. Knowledge of these is not necessary to make node states independent, and the nodes will know what to do afterwards when we give them the actual query. Such a graph is helpful for running a series of SuperDuper instances, making the state of each node independent of the states of other nodes along every equijoin condition of the query.

Internally, we refer to the algorithm we use to convert an SQL parse tree to a graph as the QueryTackler algorithm because it “tackles” the (bulky) query to produce a more simplified representation of it.

Let’s consider a simple database on which we can show a couple of examples. This database contains three relations: (S)(tudents), (C)(ourses), and (T)(ook), where (S) contains student IDs and names, (C) contains course IDs and names, and (T) expresses the many-to-many relationship between students and courses by containing student IDs and course IDs as foreign keys, indicating that a specific student has taken a specific course. Without loss of generality, let’s assume also that each of the three relations is partitioned arbitrarily (i.e. independently of each other) on a set of nodes.

Consider now a simple query:

SELECT S.name
FROM S

The graph of this query is very simple and looks as follows:

Graph of a simple query

Figure 1. Graph of a simple query.

What this graph tells us is simply that there is nothing that needs to be done to make the state of each node independent of the states of the other nodes: since we assume non-overlapping partitions, S'S on each node is already independent of the others.

Now consider another query, returning all students who took the course with ID 5:

SELECT S.name
FROM S, T
WHERE S.id = T.sid AND
      T.cid = 5

The graph of this query is again quite simple and looks as follows:

Graph of another simple query

Figure 2. Graph of another simple query.

What this graph tells us is that we need to SuperDuper (S) to (T) (or vice versa) on the (S.id = T.sid) condition to make the state of each node independent. After that, we can run the query on each node and combine the results.

Now consider a slightly more complicated query, returning all students who took the Advanced Databases course:

SELECT S.name
FROM S, T, C
WHERE S.id = T.sid AND
      C.id = T.cid
      C.name = 'Advanced Databases'

This query’s graph looks like this:

Graph of a more complicated query

Figure 3. Graph of a more complicated query.

We now need to perform two SuperDupers: (S) to (T) and (C) to (T). Note that the direction of the first SuperDuper does not matter.

Finally, let’s consider an even more complicated query, a modified version of Query 7 from the TPC-H benchmark (the query has been modified to remove an OR operator):

SELECT shipping.supp_nation,
       shipping.cust_nation,
       shipping.l_year,
       SUM(shipping.volume) AS revenue
FROM (
      SELECT N1.n_name AS supp_nation,
             N2.n_name AS cust_nation,
             EXTRACT(YEAR FROM L.l_shipdate) AS l_year,
             L.l_extendedprice * (1 - L.l_discount) as volume
      FROM supplier S,
           lineitem L,
           orders O,
           customer C,
           nation N1,
           nation N2
      WHERE S.s_suppkey = L.l_suppkey AND
            O.o_orderkey = L.l_orderkey AND
            C.c_custkey = O.o_custkey AND
            S.s_nationkey = N1.n_nationkey AND
            C.c_nationkey = N2.n_nationkey AND
            N1.n_name = 'FRANCE' AND
            N2.n_name = 'GERMANY' AND
            L.l_shipdate BETWEEN '1995-01-01' AND '1996-12-31'
      ) shipping
GROUP BY shipping.supp_nation,
         shipping.cust_nation,
         shipping.l_year
ORDER BY shipping.supp_nation,
         shipping.cust_nation,
         shipping._year

While the query is obviously more complicated than the previous, its graph looks as follows:

Graph of TPC-H Query 7

Figure 4. Graph of TPC-H Query 7.

Legend:

  • nk = nationkey,
  • ck = custkey,
  • ok = orderkey,
  • sk = suppkey.

As can be seen, this graph does not look as complex as the query itself.

Query Graph Processing and Execution of the Query

Our goal is to come up with a distributed query plan for the query at hand. For solving this problem we devised two further algorithms in our query execution pipeline: the GraphProcessor algorithm and the StepExecutor algorithm.

GraphProcessor Algorithm

The GraphProcessor algorithm takes as input the query graph, as produced by the previous step in the pipeline, and produces as output a sequence of elementary execution steps, to be executed later by the StepExecutor algorithm. An execution step can be one of three types:

  • SuperDuper: performs a SuperDuper given an equijoin condition between two relations.
  • Gather: gathers a table partitioned on all nodes into a non-distributed table on the master node.
  • Run-Subquery: executes an actual (sub)query, either on all nodes or only on the master node.

With the above three constructs, the GraphProcessor algorithm produces the distributed query plan. To build the sequence of elementary steps, the GraphProcessor algorithm “consumes” the query graph. Its goal is to transform the graph into a single physical vertex, because if we have a graph consisting of a single physical vertex, then we already have the result of the query stored in the table corresponding to this vertex.

The GraphProcessor algorithm can process any graph. It does so step by step, at each step producing an action that should be performed in order to transform the graph from its old state to a new, simpler state. This action is nothing but one of the execution steps described earlier; it gets enqueued as a step in the sequence of steps constituting the output of the algorithm.

The GraphProcessor algorithm introduces a further distinction to the physical vertices of the graph. They can be of two types: distributed (D), i.e. those distributed (partitioned) among all worker nodes, or non-distributed (ND), i.e. those fully residing on the master node.

The GraphProcessor algorithm acts on the graph recursively. As mentioned in the previous section, our graph is a simple supergraph: a vertex cannot belong to two different supervertices. We can use this property to recursively traverse the graph, at every level performing three main operations:

  1. transform supervertices into physical vertices;
  2. “eat” all edges connecting any two physical vertices;
  3. generate the sequence of execution steps corresponding to the above transformations.

The above operations are achieved by a method called process. The method takes as input the super vertex and returns the physical vertex that should replace it in the graph. For every supervertex (sv) in the graph, the method transforms all (sv)’s supervertex children into physical vertices by processing them, after which we are left with a normal graph containing only physical vertices and edges. The next step consists of “eating” the edges. Since edges represent equijoin conditions, they are “eaten” by a SuperDuper step, which the algorithm adds to the sequence of execution steps. After this, the vertices attached to one end of the eaten edge get fused into the vertex on the other end. When there are no more edges, the graph consists of one or more physical vertices.

If the current graph has more than one vertex (i.e. the original graph was not fully connected), then a cross join needs to be performed between the vertices. The algorithm currently does not support performing cross joins in a distributed manner, so it will issue commands (enqueue execution steps) to ship all concerned relations to the master node and run the corresponding subquery there.

On the other hand, if the graph has only one vertex (i.e. the original graph was fully connected), then the algorithm runs the subquery on the data in place, and only gathers the result on the master node if necessary, e.g. if the subquery is aggregate. Four cases apply here (in the below “the vertex” refers to the single remaining vertex, and “the subquery” refers to the subquery attached to the super vertex we are currently processing):

  • the vertex is non-distributed and the subquery is not aggregate;
  • the vertex is non-distributed and the subquery is aggregate;
  • the vertex is distributed and the subquery is not aggregate.
  • the vertex is distributed and the subquery is aggregate.

In the first two cases where the vertex is non-distributed, we enqueue an execution step to run the subquery (being aggregate or not) directly on the master node. In the case where the vertex is distributed and the subquery in not aggregate, we just run the subquery on the worker nodes, and gather the result only if the current supervertex is the final one (the outermost in the graph).

In the last case, we need to execute the subquery on the worker nodes first, and then combine the results on the master, where the aggregation needs to be performed again. Hence, the query run on the worker nodes must output the final state of the aggregates rather than the final value, as explained before, and the query run on the master after gathering workers’ results must be able to combine the different states. We refer to these first query as the intermediate query and to the second as the final query. We enqueue three execution steps: one to run the intermediate query on the workers, then one to gather the result on the master node, and the third to run the final query on the master.

The query plan generated for the given query graph is given to the StepExecutor for execution.

StepExecutor Algorithm

The StepExecutor algorithm executes the steps (which can be of the three types mentioned earlier) in order. After successful execution, the result is found in a table on the master node, which corresponds to the non-distributed physical vertex that remained in the graph after it was completely processed.