how to sql join - animeshtrivedi/notes GitHub Wiki

How to run a simple and balanced SQL join on Spark

In this post I am going to explain how to run a simple SQL JOIN in Spark in a balanced mode.

The general recommendations are :

  1. Use number of executors as the number of machines.
  2. Use number of cores something like 1/2 actual cores, for a 16 cores machine 8 is good.
  3. Try to give each core its own file to read to avoid contention.

There are some spark SQL related settings you should pay attention to, like

i) Broadcast settings

spark.broadcast.compress	false
spark.broadcast.checksum	false

ii) SQL I/O related settings

spark.sql.shuffle.partitions		200
spark.sql.files.openCostInBytes		0
spark.sql.files.maxPartitionBytes  	1280255769

A sample spark-defaults.conf is here: https://gist.github.com/animeshtrivedi/03917931208bad19ab6c2e62ecb33ce5

How to generate data

For the summit we first have to generate data in a balanced state as

./bin/spark-submit -v --master yarn-client --num-executors 8 --executor-cores 8 --executor-memory 48G --driver-memory 48G --class com.ibm.crail.spark.tools.ParquetGenerator ./apps/jars-atr/parquet-generator-1.0.jar -c IntWithPayload -s 4096 -o crail://zhcc004-ib0:9060/sql/data1.pq -p 64 -r 16000000 -t 64 -R 16000000 -a

lets break it down. -s says the binary payload size is 4096 bytes. -o is the file location. -p says that there should be 64 files which eventually will be given one-to-one to 8*8 cores. -r says in total there are 16 million rows. -t says this data can be generated by 64 tasks. -R says that integer range for this data should be between [0, 16000000]. -a says that do fast generation of binary payload (instead of pure random). Key things to remember is, have 1 file for each core in your cluster, and keep the size sensible (large enough). So in our case, each core's file size would be total amount of data (16million * 4kB) / number of cores (64) = 1024,000,000 (around a GB mark).

You can generate two file (lets say data1.pq and data2.pq). And two warmup files (say w1.pq and w2.pq). Warmup files do not have to be of the full size. You can generate just 1 million rows. Warmup files will be use to exercise the code path and amortize the first-execution costs.

How to execute the query

./bin/spark-submit -v --master yarn-client --num-executors 8 --executor-cores 8 --executor-memory 48G --driver-memory 48G --class com.ibm.crail.benchmarks.Main ./apps/jars-atr/sql-benchmarks-1.0.jar -t equiJoin -i crail://zhcc004-ib0:9060/sql/data1.pq,crail://zhcc004-ib0:9060/sql/data2.pq -a save,crail://zhcc004-ib0:9060/sql/output.pq -w crail://zhcc004-ib0:9060/sql/w1.pq,crail://zhcc004-ib0:9060/sql/w2.pq   

here, -t says that run equiJoin experiment. -i gives the two input files. -a says that save the output to this location. -w gives the warmup files. The benchmark will execute exactly the same sequence of commands for warm-up files before it runs for the -i files. This is necessary as Spark does heavy metadata caching, so we cannot use the same -i files for -w files.