Parquet partition calculation - animeshtrivedi/notes GitHub Wiki
How does spark calculate the partition size of your job (e.g. Parquet input)
Here is the calculation : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L470
It boils down to :
Min(defaultMinSplitSize (128MB, `spark.sql.files.maxPartitionBytes`,
Max(openCostInByte(8MB, `spark.sql.files.openCostInBytes`,
totalSize/defaultParallelism)
)
Total size is calculated as :
selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
which essentially calculates the total size of input source as : foreach file in your source, the cost of opening a file (think of if it takes 1 sec to open a file, how much data can we read in that second?) plus the size of the file itself. Suppose there are 10 files in your parquet directory, each with 10MB size, and the cost of opening a file 4MB (the default in Spark). So, the total size would be :
val actualData = 10 files * 10 MB
val overheads = 10 files * 4 MB
val totalBytes = 140MB
and if there are 10 nodes with 8 core each in your spark cluster, then each core will handle approx 140/80 = 1.75 MB of data. So your partition size becomes :
Min (128MB, Max(4MB, 1.75MB)) = 4MB
Hence when you import your data, spark will create 140MB/4MB = 35 tasks.
The reasoning behind formula can be explained as - one want to minimize the storage/IO overhead, hence whatever is bigger the cost of opening a file, or what a core will handle. And then on the other hand we want to maximize the parallelism in the cluster. Smaller partition size will lead to more tasks and hence more core utilization (but upto a limit, which is defined as 128MB partition split size). Hence, the final partition size becomes minimum of splitSize and max of bytesPerCpu.
TODO
Explain how task get a data range to process. For example - in case of Parquet read with ParquetExample
schema with 100m rows, one has 12 GB of data. With 1GB partitioning, there are 10 tasks with 1GB full single file, and then last 2 tasks read 200MB from 5 files each.