Optimizing and Tuning Spark Applications - datacouch-io/spark-java GitHub Wiki
Static vs. Dynamic resource allocation
You can configure Spark to use dynamic resource allocation with the spark.dynamicAllocation.enabled
property. This feature scales the number of executors up and down based on the workload. It's beneficial for use cases like streaming data processing or on-demand analytics that require more resources during peak hours. However, keep in mind that in a multi-tenant environment, Spark might consume resources from other applications.
Here's how to configure dynamic allocation in Spark:
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
spark.dynamicAllocation.enabled
enables dynamic allocation.spark.dynamicAllocation.minExecutors
specifies the minimum number of executors to create initially.spark.dynamicAllocation.schedulerBacklogTimeout
defines the timeout for pending tasks before requesting new executors.spark.dynamicAllocation.maxExecutors
sets the maximum number of executors.spark.dynamicAllocation.executorIdleTimeout
determines the time before an idle executor is terminated.
Configuring Spark Executors' Memory and Shuffle Service
The memory allocated to each executor can be controlled using the spark.executor.memory
property. This memory is divided into three sections: Execution Memory, Storage Memory, and Reserved Memory. By default, Spark divides this memory as 60% for execution, 40% for storage, and reserves 300MB for safety against Out-Of-Memory (OOM) errors.
- Execution memory is used for shuffles, joins, sorts, and aggregations.
- Storage memory primarily caches user data structures and DataFrame partitions.
- Spark performs significant I/O activity during map and shuffle operations, so optimizing I/O can help.
To configure Spark for heavy workloads and reduce I/O bottlenecks, you can set properties like spark.driver.memory
, spark.shuffle.file.buffer
, spark.file.transferTo
, spark.shuffle.unsafe.file.output.buffer
, spark.io.compression.lz4.blockSize
, spark.shuffle.service.index.cache.size
, and spark.shuffle.registration.timeout
.
Spark Parallelism
To maximize resource utilization and parallelism in Spark, aim to have at least as many partitions as there are cores on the executor. Partitions are created based on the layout of data on disk, with a default size of 128MB in HDFS and S3. Decreasing the partition file size too much can lead to the "small file problem" and increased disk I/O.
Shuffle partitions can be reduced from the default 200 to match the number of cores or executors in smaller workloads. When shuffling, Spark writes and reads from the local disk's shuffle file, making SSDs essential for improved performance.
When writing data, you can control the number of records per partition file using the maxRecordsPerFile
option to mitigate small or very large file issues.
Caching and Persistence of Data
Caching and persisting DataFrames can significantly improve performance in Spark:
-
Dataframe.cache()
: Stores as many partitions as memory allows and can be fractional. A DataFrame is fully cached when an action likecount
processes all partitions. -
Dataframe.persist()
: Provides control over data storage throughStorageLevel
. Common levels include MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, DISK_ONLY, and OFF_HEAP.
Cache or persist large datasets when you need to access them repeatedly for queries and transformations. Avoid caching when DataFrames are too big for memory or when performing infrequent, inexpensive transformations.
Statistics Collection
Collect and maintain statistics for tables or columns that Spark's cost-based query optimizer will use for optimization decisions. Here are ways to collect statistics:
Table Level:
ANALYZE TABLE table_name COMPUTE STATISTICS
Column Level:
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS column_name1, column_name2, ...
Column-level statistics can be slower to collect but provide more optimization information for the cost-based optimizer, especially for joins, aggregations, and filters.
Spark Joins
Broadcast Hash Join
- Also known as map-side only join.
- Spark uses broadcast join by default when the smaller dataset is less than 10MB.
- Use broadcast join when:
- Each key within the smaller and larger datasets is hashed to the same partition.
- One dataset is significantly smaller than the other.
- Network bandwidth usage and OOM errors are manageable as the smaller dataset is broadcast to all executors.
Shuffle Sort Merge Join
- Suitable for joining over a common key that is sortable, unique, and can be stored in the same partition.
- It involves sorting and merging phases where each key in each dataset is merged if they match.
- To optimize shuffle sort merge joins, create partitioned buckets for common sorted keys, or use bucketing for high-cardinality columns.
- Use shuffle sort merge join when:
- Each key within two large datasets can be sorted and hashed to the same partition.
- You only need to perform equi-joins based on matching sorted keys.
- You want to prevent Exchange and Sort operations for efficient network and CPU usage.
Remember that it's often beneficial to keep the largest DataFrame on the left side of the join, follow good partitioning strategies, filter data as early as possible, and use the same partitioner between DataFrames for joins.
Additional Notes
- Task scheduling options in Spark include FIFO and Fair Scheduling.
- Serialization choice (Java or Kryo) affects performance and memory usage.
- Be mindful of garbage collection time and its impact on Spark jobs.
Databricks Delta
Databricks Delta introduces optimizations to improve query performance, including:
Compaction (Bin Packing)
- This optimization coalesces smaller files into larger files to speed up queries.
- It can be triggered using the
OPTIMIZE
command. - Bin packing is idempotent, evenly balances file size, and helps eliminate small files.
Data Skipping
- Data skipping improves query performance for comparisons like
column 'op' literal
, whereop
can be>
,<
,=
,like
,and
,or
, etc. - Stats are generated for the first 32 columns by default.
- Long string columns can be skipped using
delta.dataSkippingNumIndexedCols
.
Z-Ordering (Multi-dimensional Clustering)
- Z-ordering colocates related information in the same files.
- Effective for high-cardinality columns.
- It's not idempotent and requires statistics like min, max, count for data columns.
- Z-ordering helps eliminate skew in joins and queries based on sorted keys.
Remember that efficient Spark cluster hardware, like c5d instances, is recommended for operations like OPTIMIZE
due to the significant Parquet decoding and encoding involved.