New spark config - animeshtrivedi/notes GitHub Wiki
New spark parameters:
Shuffle
spark.io.netty.maxDefaultThreads 8
This is set in SparkTransportConf
spark.crail.shuffle.discard false
If set to true, this opens shuffle output streams with discard flags. CrailBufferedStream
then discards whatever is written to it.
Serializer
spark.sql.serializer.exhaustItr true|false
spark.sql.serializer.exhaustCrail true | false
These parameter are used to consume data from crail or as from the iterator as fast as possible. This is for benchmarking only. The default for these items is false;
spark.sql.serializer UnsafeCrailSerializer
To use crail serializer or not. For now it does not do much apart from the right file type casting.
spark.sql.serializer.buffer true
spark.sql.deserializer.buffer true
spark.sql.serializer.buffer.size 1048576
In the serializer, while reading the data if we want to buffer the input data from CrailMultiStream
or go to the stream every time when there is a read call. If yes (i.e., true) then what should be the buffer size. This buffer is internally maintained in a cache. The same logic about output writing during the shuffle.
spark.crail.shuffle.useBuffering false
This is spark-io specific parameter, which buffers the value from the passed iterator and then classify them separately. This is intended to decouple the classify logic from reading. Of course, when buffering the end-to-end runtime suffers.
Input/Output
When using the AtrFileFormat for input, you can specify some options (a) how many rows should it give to per iterator; (b) what should be the schema; (c) should files be splitable. If files are not splitable then there will be 1 task per file; (d) what is the payload size; (e) what should be the key int range.
Splitable is spark.sql.input.AtrFileFormat.splitable
is a spark parameter. Others are DataFrameReader parameter and can be used as
val reader = spark.read.format("org.apache.spark.sql.execution.datasources.atr.AtrFileFormat")
reader.option("schema", "ParquetExample")
.option("inputRows", "1024")
.option("payloadSize", "128")
.option("intRange", Int.MAX)
.load("dummyfile")
spark.sql.input.PhiFileFormat.inputRows 200000000
spark.sql.input.PhiFileFormat.schema ParquetExample
spark.sql.input.PhiFileFormat.schema.payloadSize 1024
spark.sql.input.PhiFileFormat.splitable false
Dump of the config
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
#for workers
#spark.eventLog.enabled true
#spark.eventLog.dir hdfs://flex11-40g0:9000/shared-spark-logs
#for webserver
#spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
#spark.history.fs.logDirectory hdfs://flex11-40g0:9000/shared-spark-logs
# JVM options
spark.executor.extraJavaOptions "-Dsun.nio.PageAlignDirectMemory=true -Xmn16G -XX:MaxDirectMemorySize=64G"
spark.driver.extraJavaOptions "-Dsun.nio.PageAlignDirectMemory=true -Xmn16G -XX:MaxDirectMemorySize=64G"
spark.rdd.compress false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer 1mb
spark.shuffle.sort.bypassMergeThreshold 10
spark.broadcast.factory org.apache.spark.broadcast.CrailBroadcastFactory
spark.broadcast.compress false
spark.shuffle.manager org.apache.spark.shuffle.crail.CrailShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.shuffle.spill false
spark.io.netty.maxDefaultThreads 32
spark.shuffle.io.serverThreads 16
spark.shuffle.io.clientThreads 16
spark.shuffle.io.threads 16
spark.rpc.io.serverThreads 16
spark.rpc.io.clientThreads 16
spark.rpc.io.threads 16
spark.files.io.serverThreads 16
spark.files.io.clientThreads 16
spark.files.io.threads 16
spark.reducer.maxSizeInFlight 1024p
spark.driver.maxResultSize 64g
#spark.crail.shuffle.sorter org.apache.spark.shuffle.crail.CrailSparkShuffleSorter
#Terasort specific
spark.crail.shuffle.sorter com.ibm.crail.terasort.sorter.CrailShuffleNativeRadixSorter
spark.crail.shuffle.serializer com.ibm.crail.terasort.serializer.F22Serializer
#spark.crail.shuffle.serializer org.apache.spark.shuffle.crail.CrailSparkShuffleSerializer
#GroupBy specific
#spark.crail.shuffle.serializer com.ibm.atr.F22Serializer
#pagerank specific
#spark.crail.shuffle.serializer com.ibm.crail.pagerank.serializer.CrailKryoSerializer
spark.locality.wait 0
spark.locality.wait.node 0
spark.locality.wait.process 0
spark.locality.wait.rack 0
spark.crail.shuffle.affinity true
spark.crail.shuffle.outstanding 10
#spark.crail.deleteonclose false
spark.crail.deleteonstart true
#spark.crail.preallocate 32768 - does not work because driver also needs this much resources!
#spark.crail.preallocate 16384
#spark.crail.preallocate 8192
#spark.crail.shuffleCycle 6
spark.crail.writeAhead 2147483640
spark.crail.mapCloseOptimization true
#spark.crail.debug true
#-------------- SQL stuff -------------------
#spark.default.parallelism 10
#SQL specific settings (defaults)
# spark.sql.files.maxPartitionBytes 134217728
# spark.sql.files.openCostInBytes 4194304
# spark.sql.broadcastTimeout 300
# spark.sql.autoBroadcastJoinThreshold 10485760
# spark.sql.shuffle.partitions 200
#spark.sql.parquet.splitable false
spark.sql.crossJoin.enabled true
spark.sql.shuffle.partitions 128
spark.sql.files.openCostInBytes 0
#spark.sql.files.maxPartitionBytes 8388608
#spark.sql.files.maxPartitionBytes 1073741824
#spark.sql.files.maxPartitionBytes 1280255769
spark.sql.parquet.compression.codec uncompressed
spark.sql.codegen.wholeStage true
spark.sql.input.AtrFileFormat.inputRows 200000000
spark.sql.input.AtrFileFormat.schema ParquetExample
spark.sql.input.AtrFileFormat.schema.payloadSize 1024
spark.sql.input.AtrFileFormat.splitable false
spark.sql.serializer UnsafeCrailSerializer
spark.sql.serializer.buffer true
spark.sql.deserializer.buffer true
spark.sql.serializer.buffer.size 1048576
#spark.sql.serializer.buffer.size 33554432
#spark.sql.serializer.buffer.size 2147480000
#spark.crail.shuffle.discard true
#spark.crail.shuffle.useBuffering false
#spark.shuffle.sort.initialBufferSize 1048576
spark.shuffle.sort.initialBufferSize 4194304