Implementation - accandme/openplum GitHub Wiki
This section describes our implementation details of the algorithms presented in the Design section, giving the details on how we communicate with our nodes and an overview of the whole query execution pipeline in order.
We implemented our system as a Java application using PostgreSQL worker nodes. We decided to use Java because we already had some code that we had written for our Advanced Databases course project. We chose PostgreSQL because it is an open-source DBMS which we eventually plan to integrate our system into.
Our code can be found in the subpackages of ch.epfl.data.distribdb
.
-
ch.epfl.data.distribdb.lowlevel
contains all the low-level classes that we took from our Advanced Databases course project. This package contains the DatabaseManager class, which is responsible for sending queries to the various nodes and receiving data back from them. -
ch.epfl.data.distribdb.parsing
contains all the logic for parsing of SQL queries. -
ch.epfl.data.distribdb.tackling
consists of classes used to build the query graph. -
ch.epfl.data.distribdb.execution
contains the logic for building and executing a distributed query plan for a query graph. -
ch.epfl.data.distribdb.app
contains classes responsible for the user interface of our front-end application(s).
The Database Manager is a unified proxy for all JDBC accesses from our application. All our interaction with the master and the workers goes through its API instead of making direct JDBC calls. Therefore, it is a clean abstraction of all our nodes such that they appear as one simple entity to higher-level modules. With this abstraction, we could focus on the main logic of our system without worrying much about the details of JDBC and parallelization across multiple nodes.
The design also allows different implementations when multiple nodes are involved: sequential versus parallel executions. This characteristic was extremely helpful during the development phase, as our program could be tested using the sequential implementation first (presumably simpler to realize) before moving to the parallel one without changes to the application code.
The versatility of this module comes from the plethora of API “flavors” for execute
, fetch
, and copyTable
functions. The basic differences among these flavors lie in four aspects:
- Number of queries: one/multiple.
- Number of source nodes (where the queries are executed): one/multiple.
- Name and schema of the result table: one/multiple.
- Number of destination nodes (where results are stored): one/multiple.
Note that there is always some intrinsic logic involved in every flavor of the API. This is necessary so that code repetitions can be avoided in higher-level modules and most work can be pushed to database server nodes.
As seen from the UML class diagram above, the AbstractDatabaseManager
class provides a basic implementation for DatabaseManager
interface which defines the API. All generic JDBC operations, common configurations, as well as API methods agnostic to sequential or parallel execution are implemented here.
SequentialDatabaseManager
extends AbstractDatabaseManager
and implements the rest of the API in a naïve but straightforward manner: using for-loops when multiple inputs for some parameter are encountered. ParallelDatabaseManager
extends AbstractDatabaseManager
and implements the rest of the API in a parallel fashion using Java’s built-in thread pool facility. The transition from SequentialDatabaseManager
to ParallelDatabaseManager
is intuitive: all for-loops are converted to constructions, submissions and invocations of tasks on the thread pools (the process analogous to the map function in Monad algebra). The calling thread awaits the completion of all tasks before proceeding to subsequent steps. Java’s built-in ExecutorService
provides a neat way to realize all these thread pool features.
We execute a given query by following the following five steps in order, where the output of one is the input of the next:
- Parsing (output:
QueryRelation
) - Graph construction (output:
QueryGraph
) - Graph processing (output:
List<ExecStep>
) - Step execution (output:
ResultSet
) - Printing of results and cleanup
We use the General SQL Parser library to parse a query string, and we convert the query tree produced by the parser to our own parse tree. The library seems accurate with two more or less significant drawbacks:
- The parsing error messages produced by the parser are not very informative.
- The parser does not perform sanity checks on the queries (e.g. allows aggregate fields to appear in GROUP BY clause).
Because we use a trial version of the library, it is limited to parsing only 10,000 characters. However, this limit does not seem to pose problems for our purposes.
The Parser
class is responsible for parsing the query strings, and produces as output a parse tree represented by the QueryRelation
class. The QueryRelation
object is composed of several other objects representing the different aspects of the query:
- Projected fields, represented by a list of Field objects (an abstract class containing several flavors of extensions, such as
NamedField
,AggregateField
,LiteralField
, etc.). - Relations, represented by the
NamedRelation
(physical relation) andQueryRelation
classes derived from the abstractRelation
class. - WHERE conditions (connected by conjunction), represented by a list of
Qualifier
objects, where each contains a reference to anOperator
of the condition and a list ofOperand
objects (interface implemented by theField
andRelation
classes). - GROUP BY fields, represented by a list of
Field
objects. - HAVING conditions, represented by another list of
Qualifier
objects. - ORDER BY fields, represented by a list of
OrderingItem
objects, where each references aField
and anOrderingType
(an enum that can take the values ofASC
orDESC
).
The QueryGraph class represents a query graph. It takes in its constructor a QueryRelation
object and constructs the corresponding graph. The graph construction takes place in two steps: first all vertices are recursively extracted from the parse tree, creating a set of QueryVertex
objects, and then edges are recursively built, adding a set of QueryEdge
objects to each vertex. Each vertex of the graph has access to its own query string, so that each vertex can be executed until completion, as explained before.
Edges that we build are directed. Edge direction was initially intended to be used to indicate nesting in correlated queries, such that the direction implies which of the two vertices is nested and which is not. Vertices at the same level of the query would have bi-directional edges. Since we do not support correlated queries at this point, all our edges are bi-directional.
Currently we only build edges when we encounter equality conditions in the WHERE clause between two fields belonging to two different relations (i.e. equijoin conditions). Theoretically, fields connected by the IN operator can also be considered as equijoin conditions, but currently we do not do that for two reasons:
- not building an edge does not produce incorrect results, and
- any non-correlated query using the IN operator can be rewritten such that the IN operator’s subquery is moved to the outer query’s FROM clause as an additional nested relation.
The GraphProcessor
class contains the logic that comes up with a distributed query plan for a query. It takes as input the query graph that was generated for this query, and processes the graph using heuristics for optimization. The output is a sequence of execution steps that are fed to the step executor in order to execute them.
The execution steps are represented by the ExecStep
abstract class, from which the following three classes derive, each corresponding to the three execution steps introduced previously:
-
StepSuperDuper
represents the elementary step that consists in shipping tuples around, between worker nodes, according to the semantics described previously. For this, we keep track of thefromRelation
andfromColumn
,toRelation
andtoColumn
, andoutRelation
. We also store a booleandistributeOnly
that is used to distinguish the fullSuperDuper
step (where we ship from all worker nodes to all worker nodes) from the distribute-only version of SuperDuper where thefromRelation
is gathered on the master node. -
StepGather
represents the elementary step that gathers a distributed table on the master node. For this purpose, we keep a handle to thefromRelation
(name of the distributed table to be gathered) andoutRelation
(name of the new gathered table on master). -
StepRunSubquery
represents the elementary execution step responsible for executing a (sub)query, either on all worker nodes or on the master. Besides storing the place where to execute the query instepPlace
, we also keep track of the string representation of the actual subquery in query, and the name of the relation to hold the results inoutRelation
. We also keep a flag that indicates whether the query is aggregate or not; but this is currently only useful for debugging purposes.
The implementation of GraphProcessor
follows the design described previously. The key method is process
; it contains the main logic. It takes a supervertex and returns the physical vertex that should replace it after having recursively processed it. In parallel with processing each vertex, it also stores the corresponding steps in the execSteps
member variable.
This class contains helper functions too. pickConnectedPhysical
and pickConnectedND
, and eatAllEdgesPhysical
and eatAllEdgesND
are all used to pick a connected (D)istributed or (N)on-(D)istributed vertex as the destination of a SuperDuper step, and to eat all edges (and generated corresponding SuperDuper steps) between two (D)istributed or (N)on-(D)istributed vertices respectively. Initially we thought of always picking the largest relation as the destination of a SuperDuper step, in order to minimize the number of shipped tuples. Currently this is not implemented (we pick the first relation), but should be easy to implement given the current state: the two picker functions should simply be modified to check the size of the tables on the fly.
The StepExecutor
class is used to execute the distributed query plan generated by the GraphProcessor
class. It takes as input (using the executeSteps
method) the list of execution steps constituting the plan to be executed. This method performs the execution according to the logic described in the Design section of this document, and computes the final result, and returns it as a ResultSet
. The implementation is as simple as a for-loop that traverses the list of execution steps. For each one, according to its type, the class extracts the relevant fields, and either issues requests directly to the Database Manager or instantiates lower-level constructs such as SuperDuper
.
The SuperDuper
class performs a SuperDuper operation on two relations. It sends tuples from the table fromRelation
which is distributed on fromNodeIds
to toNodeIds
, such that a tuple is sent to node (i) if it can join with a tuple in the chunk of table toRelation on node (i).
This is achieved by computing bloom filters on each toRelation
chunk on all nodes in toNodesIds
and sending them to all nodes in fromNodeIds
, then filtering the fromRelation
chunks with every received bloom filter and sending each result to the corresponding node. The join condition is an equijoin on the fromColumn
of fromRelation
and the toColumn
of toRelation
. The shipped tuples are stored in a temporary table called outRelation
, on all nodes of the toNodeIds
list.
The SuperDuper
class uses a pool of threads to parallelize the job. The shutDown
method should be called after the operation finishes, in order to shut the pool down. Failing to call this function may result in the main program hanging (not exiting).
We have two dedicated classes to manage the printing and cleanup of temporary tables.
The TablePrinter
class contains a simple static function printTableData
that is able to print a ResultSet
to the console output in a nicely-formatted way. It automatically takes care of paginating the output and smartly adjusts the column widths according to the data displayed in the current page.
TableManager
is the class responsible for allocating names for temporary tables, keeping track of who is using which temporary table, and cleaning up after the execution terminates. Higher-level classes such as GraphProcessor
and SuperDuper
use this class to request new table names using the generateTmpTblName
method. The class makes sure that there is no duplication in the names, and keeps track of the requested names in the tempTblNames
list, so that after finishing the execution the application (in our case the CommandLine
class) calls cleanTempTables
to delete all the temporary tables used as intermediary holders to run the specified queries.
Our current system only supports SELECT queries. Data has to be loaded onto the worker nodes and partitioned separately before our system can be used. Ideally, our system would support all features offered by a standard PostgreSQL node, which is the end goal of this project. At this stage of its development, however, we focused only on the basic functionality, which is no doubt a building block towards the ideal system.
Because of limited time, several SELECT query constructs were also dropped from our implementation, as outlined below. Support for these is expected to be added in the future.
Our system was initially aimed at supporting correlated subqueries, and was designed with this support in mind. In fact, we were sure we supported them up until the last few days of the implementation, when we were applying finishing touches and trying the system out. Our initial idea was that correlated subqueries do not pose a significant problem; we were sure that we can ignore correlation altogether when building a query graph. That turned out to not be the case.
The query that proved our QueryTackler algorithm incompatible with correlated subqueries was this query using the Student-Took-Course schema introduced previously, which returns the students who took all courses:
SELECT S.name
FROM S
WHERE NOT EXISTS (
SELECT C.id /* any field */
FROM C
WHERE NOT EXISTS (
SELECT T.sid /* any field */
FROM T
WHERE T.sid = S.id AND
T.cid = C.id
)
)
For this query, our current algorithm produces the same graph as in Figure 3 in the Design section. However, SuperDupering (S) to (T) and then (C) to (T) is not enough to properly execute this query. What is needed instead is SuperDupering (T) to (S) and then gathering and distributing (C) to all nodes.
Because we discovered this bug late, we did not have time to come up with a generalization of how to execute such queries, and had to drop support for correlated queries altogether. However, we conjure that we can execute some correlated queries if we introduce a priority field to our graph vertices (indicating the level of nesting), forcing the GraphProcessor
algorithm to pick lower-priority vertices first. Because we could not find any counter-examples, we believe that this extension would enable us to execute correlated queries that use EXISTS and IN operators, but not their negations.
Our current implementation lacks support for the following elements of query syntax:
- * fields: because our parse tree structure needs to know all fields and because we cannot expand * fields, we currently do not support them (except the special case of COUNT(*)).
- JOIN clauses: our system currently only supports inner joins, and they should be expressed via the WHERE clause.
- OR and NOT operators in qualifiers: our qualifiers (both in WHERE and in HAVING clauses) are currently connected by conjunctions; theoretically it should be possible to implement OR and NOT as qualifier
Operator
s, in which case theQualifier
class would have to implement theOperand
interface, but that would introduce recursion. - IN lists.
- IF and CASE clauses.
- Compound queries (UNION, MINUS, etc.).
- Other esoteric syntax and vendor-specific SQL extensions.