Spark Optimization on AWS EMR clusters - isgaur/AWS-BigData-Solutions GitHub Wiki

These configurations helps to optimize Spark application running on an AWS EMR cluster

Instance Type = r5.24xlarge vCPU = 96 Memory = 768

===============

PARAM 1 =>spark.executor.cores = 5

      Based on historical data, I suggest that you have five virtual cores for each executor to achieve optimal results in any sized cluster.

      spark.executor.cores=5

PARAM 2 => spark.executor.memory = 36

Calculation done as follows to calculate spark executor memory :

      Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

      Number of executors per instance = (96 - 1)/ 5 = 95 / 5 = 19 (rounded down)

      Total executor memory = total RAM per instance / number of executors per instance

      Total executor memory = 768/19 = 41 (rounded up)

      spark.executors.memory = total executor memory * 0.90
      spark.executors.memory = 41 * 0.90 = 36 (rounded down)

      spark.yarn.executor.memoryOverhead = total executor memory * 0.10
      spark.yarn.executor.memoryOverhead = 41 * 0.1 = 5 (rounded up)

      spark.executor.memory = 36

PARAM 3 =>spark.driver.memory = 36

      We recommend setting this to equal spark.executors.memory

      spark.driver.memory = spark.executors.memory = 36

PARAM 4 =>spark.driver.cores = 5

      We recommend setting this to equal spark.executors.cores.

      spark.driver.cores = spark.executor.cores
      spark.driver.cores = 5

PARAM 5 => spark.executor.instances = 265

      spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

      spark.executor.instances = 19 * 14 -1 = 265

PARAM 6 => spark.default.parallelism = 2650

      spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

      spark.default.parallelism = 265 * 5 * 2 = 2650

Warning: Although this calculation gives partitions of 2,650, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition. To determine the number of partitions in an RDD, you can always call rdd.partitions().size() and then determine the number of partitions in an RDD to set this above property.

PARAM 7 => spark.sql.shuffle.partitions

      In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

      spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations.Setting the right value for this param depends on the previous one. i.e. spark.default.parallelism

PARAM 8 => yarn.nodemanager.vmem-check-enabled and yarn.nodemanager.pmem-check-enabled and

spark.dynamicAllocation.enabled

      Even if all the Spark configuration properties are calculated and set correctly, virtual out-of-memory errors can still occur rarely as virtual memory is bumped up aggressively by the OS. To prevent these application failures, set the following flags in the YARN site xml.

      "yarn.nodemanager.vmem-check-enabled":"false",
      "yarn.nodemanager.pmem-check-enabled":"false"
       "spark.dynamicAllocation.enabled : "false"