Google Summer of Code 2020 Report - uccross/skyhookdm-ceph GitHub Wiki

Google Summer of Code 2020: Final Work Product Submission

Organisation: CERN-HSF

Project Mentors: Jeff LeFevre, Ivo Jimenez, Aaron Chu, Noah Watkins

Project Title: Extend SkyhookDM programmable object storage with statistics, sort/aggregate or data compaction functions

Project Page: https://summerofcode.withgoogle.com/projects/#6547073815543808

Submitted by: Aditi Gupta

Phase-0 (Community Bonding Period)

Objective

To familiarise with the community, codebase and work style.

Work done

  1. Introduced myself to the CROSS community and had a hearty welcome.

  2. Familiarised with the high-level concepts and architecture of Skyhook and Ceph.

  3. Read up on the documentations of Apache Arrow and Google Flatbuffers which are two of the major disk storage formats used by Skyhook.

  4. Attempted to migrate Skyhook from Ceph Luminous (v12.2.0) to Ceph Nautilus (v14.2.11) (This was successfully completed by Ivo, thanks!)

  5. Replaced deprecated Arrow v0.15 functions when v0.17 was released.

Pull Requests/Merged Code

  1. [Merged] Unused variable warning

  2. [Merged] Fix deprecated Arrow function

  3. [Merged] Fix finish() function in arrow builder

  4. Update skyhook to ceph-nautilus

Challenges faced

Apache Arrow released v0.17.0 on April 20, 2020, rendering some Skyhook functions to fail as the version did not promise backward compatibility. This caused unexpected errors.

Phase-1

Objective

[Issue #23] Extend current aggregation method to include GROUPBY and sort (ORDERBY) for an object’s formatted data partitions.

Work done

Designed --groupby and --orderby flags:

Skyhook supports user query of the form:

bin/run-query --num-objs 2 --pool tpchdata --oid-prefix "public" --table-name "TABLE_NAME" --select "col1,op1,val1;...;col2,op2,val2;" --project "col1,col2,...,coln" --groupby "col1,col2,...,coln" --orderby "col1,asc/desc;...;col2,asc/desc;"

wherein, --groupby flag is passed with the name of (one/more) columns on which aggregation is to be performed and --orderby flag is passed with (one/more) pairs of the form <column_name, asc/desc> to denote the order in which we would want the column to be sorted.

Explanation of query flow execution

  1. Query predicates are processed and classified as AggPreds (MIN, MAX, SUM, COUNT) and NonAggPreds (others). These are stored differently.

  2. Firstly, all rows are processed through NonAggPredsand the passing rows are stored for further processing.

  3. If there is a GROUP BY query, split-apply-combine method of implementing GROUP BY is used. (Read more about the method here).

    • [STEP-1] Split: Each row is mapped to a key based on the values of the GROUP BY columns. This step essentially groups the data based on some criteria.

    • [STEP-2] Apply: AggPreds are applied independently to each group of rows to obtain one consolidated result for each row group.

    • [STEP-3] Combine: The processed rows are hence combined and packed into a data structure (in case of Skyhook, bufferlist) to return to the caller.

    • Before packing into the final data structure, check if ORDER BY is present. If yes, sort the returning rows ASC/DESC as per the query before packing.

  4. If GROUP BY is not present, we skip Step-1 and continue from Step-2 by treating individual rows as groups.

TL, DR: Following flowchart summarises the above algorithm:

Query Execution Flow

Object data: Object data

Example of GROUP BY validation: Validating GROUP BY

Example of GROUP BY query: GROUP BY example

Example of ORDER BY query: ORDER BY example

Pull Requests/Merged Code

  1. [Merged] Add query validation for projection and aggregation
  2. [Merged] Implement GROUP BY and ORDER BY

Commits:

Future Work

  1. Current test data does not have NULL values, a good exercise will be to add support to handle NULL value cases. While GROUP BY without any aggregate treats NULL as a separate group, it behaves differently depending on the aggregate functions, since some of them allow NULL values while some do not. Reference: https://www.postgresql.org/docs/9.6/functions-aggregate.html
  2. [Important] Currently, the GROUP BY and ORDER BY is accumulated over individual objects. We aim to extend the functionality on global level to calculate aggregates across the objects.

Phase-2

Objective

[Issue #77] Implement a custom method for data statistics collection of an object’s formatted data partitions in the form of histograms.

Work done

  1. Designed runstats flag to pass 5 arguments required to run statistics on the table.
--runstats "column_name,min_value,max_value,number_of_buckets,sampling"
  1. Appropriate checks are performed to validate the passed arguments.
    • Number of arguments passed into --runstats should be exactly 5.

    • column_name should be a valid column present in the table and should have a valid data type, namely int, uint, float or double.

    • min_value and max_value should have the same data-type as column_name

    • number_of_buckets should be an integer.

    • sampling should be a float or double value in the range (0,1].

  2. After validating arguments, histogram is constructed by iterating through the rows in the table and pushing them into the appropriate bucket.

An example of histogram is as follows:

 bucket |   range   | freq |       bar       
--------+-----------+------+-----------------
      1 | [0,9]     | 2744 | ******
      2 | [10,19]   | 5630 | *************
      3 | [20,29]   | 6383 | ***************
      4 | [30,39]   | 1290 | ***
      5 | [40,49]   |  369 | *
      6 | [50,59]   | 3541 | ********
      7 | [60,69]   |  174 | 
      8 | [70,79]   |  313 | *
      9 | [80,89]   |  171 | 
     10 | [90,99]   |   65 | 
     11 | [100,109] | 2363 | ******
     12 | [110,119] |   51 | 	
     13 | [120,129] |  115 | 
     14 | [130,139] |   32 | 
     15 | [140,146] |   11 | 
     16 | [150,159] |  187 | 
     17 | [160,169] |   24 | 
     18 | [170,177] |   33 | 
     19 | [180,189] |   19 | 
     20 | [191,199] |   24 | 
     21 | [200,200] |  795 | **

Pull Requests/Merged Code

Pull Request: [Merged] Implement statistics collection

Commits:

  1. [Merged] Modify --runstats flag from bool to string
  2. [Merged] Change stats_op struct to accommodate parameters
  3. [Merged] Validate number of arguments for --runstats flag
  4. [Merged] Validate column in --runstats argument
  5. [Merged] Validate sampling parameter in --runstats argument
  6. [Merged] Check data type of number of buckets in --runstats argument
  7. [Merged] Pack --runstats arguments together
  8. [Merged] Read data from disk and add access class
  9. [Merged] Implement histogram
  10. [Merged] Push histogram into omap

Future Work

  1. Currently we store the histogram in an omap and print it to the user. However, it’s utility will be enhanced if it is stored in a PostgreSQL manner to assist in query optimisation.
  2. There is a scope of improvement in the runtime complexity of the building of the histogram. The current complexity is O((N / sampling) * K) where in, N = number of rows, K = number of buckets. This can be improved to O((N / sampling) * logK) if a std::lower_bound is performed on the bucket limits before assigning the rows to them.

For example,

Given, bucket_left_limits =  [1, 5, 9, 13] => bucket_ranges = {(1,5),(5,9),(9,13),(13,17)} 
Given, val = 8 
lower_bound(A, 8) => 1, so we increment in 2nd bucket (5,9)
  1. Current test data does not have NULL values, a good exercise will be to add support to handle NULL value cases. Options include to skip or count nulls.
  2. Apache Arrow released v1.0.0 on July 24, 2020, which now guarantees backward compatibility. A good exercise will be to migrate the existing code to comply with this version. (See this issue: https://github.com/uccross/skyhookdm-ceph-cls/issues/58)

Phase-3

Objective

[Issue #33] Implement compaction of multiple formatted sub-partitions within an object into a single partition.

Work Done

Developed object class method compact_arrow_tables_op that merges formatted data partitions within an object. Self-contained partitions are written (appended) to objects and over time objects may contain a sequence of independent formatted data structures as shown below:

Disk compaction

A compaction request (--compact-table) invokes this method which iterates over the data structures, combining (or splitting) them into a single larger data structure representing the complete data partition. In essences, this method performs a read-modify-write operation on the object's local data.

compact_arrow_tables_op focuses on reading a sequence of tables within the object and converting to a single table, then using cls_cxx_replace() to write back to the disk. It also handles dead rows by removing them.

Pull Requests/Merged Code

Pull Request: [Merged] Implement compaction for Arrow tables

Commits:

  1. [Merged] Implement compaction for Arrow tables
  2. [Merged] Move compaction inside transform_db_op
  3. [Merged] Implement Arrow table compaction

Benefits to the community

  1. [Phase-1] One of the important features of SkyhookDM is allowing common data processing tasks (like SELECT, PROJECT, AGGREGATE) to be performed directly at the object storage layer. Following the suit, incorporating support for GROUP BY and ORDER BY has further extended Skyhook’s capabilities to push down more and more queries to be tackled at object level.

  2. [Phase-2] Implementation of the data statistics collection tool, histogram, helps in query optimisation by letting the object locally perform access path selection by deciding whether to use indexing or table scans, and also the sequence of operations to be performed depending upon the cardinality of involved tuples. Statistics can also be stored locally to let the objects make local decisions as well. This is one of the advantages of programmable object storage in Skyhook. This in turn improvises the overall process of physical database design and tuning.

  3. [Phase-3] The compaction operation consolidates the non-contiguous table fragments stored in individual memory blocks for effective memory management. The implemented function merges sequences of independent Arrow tables into a single larger Arrow table and writes a clean partition back into the disk.

In addition to this, if collected statistics show that most of the rows are selected as a result of a query, the object can make the decision to not apply the select predicates at the object level and rather pass it back to the Application layer/database client to perform it, in the interest of time and cost. This may so happen in the case when the object layer is either busy or it is expensive to perform such an operation at the object layer as compared to a higher (Application) layer. This methodology is called pushback.

Conclusion

It has been a great journey and a wonderful experience. I learnt to do things independently and explain the rationale behind them. My favourite part was the independence and responsibility which I had, in converting the problem statement to design and ultimately into code.

There were times where figuring out some things got comparatively challenging than earlier. I was often presented with open-ended problems for which I analysed different possible solutions and discussed the pros and cons with my mentor, Jeff LeFevre. Jeff did an excellent job in guiding me throughout the program. I have had many fruitful discussions regarding the project and analysing its scope. I thank Jeff for his constant guidance and support which enabled and helped me to steer the project in the right direction.

I am very happy with the work I did, probably the one I am most proud of. :)