Submit Spark Applications Remotely - isgaur/AWS-BigData-Solutions GitHub Wiki
- Perform spark-submit remotely using local machine - submit_spark_from_local_machine.txt
- Perform spark-submit using Livy - spark_submit_using_livy.txt
To submit Spark jobs to an EMR cluster from a remote machine, the following must be true:
-
Network traffic is allowed from the remote machine to all cluster nodes.
-
All Spark and Hadoop binaries are installed on the remote machine.
-
The configuration files on the remote machine point to the EMR cluster.
Confirm that network traffic is allowed from the remote machine to all cluster nodes
If you are using an EC2 instance as a remote machine or edge node: Allow inbound traffic from that instance's security group to the security groups for each cluster node. If you are using your own machine: Allow inbound traffic from your machine's IP address to the security groups for each cluster node. Install the Spark and other dependent binaries on the remote machine
To install the binaries, copy the files from the EMR cluster's master node, as explained in the following steps. This is the easiest way to be sure that the same version is installed on both the EMR cluster and the remote machine.
- Run the following commands to create the folder structure on the remote machine:
sudo mkdir -p /var/aws/emr/ sudo mkdir -p /etc/hadoop/conf sudo mkdir -p /etc/spark/conf sudo mkdir -p /var/log/spark/user/ sudo chmod 777 -R /var/log/spark/ 2. Copy the following files from the EMR cluster's master node to the remote machine. Don't change the folder structure or file names. /etc/yum.repos.d/emr-apps.repo /var/aws/emr/repoPublicKey.txt
- Run following commands to install the Spark and Hadoop binaries:
sudo yum install -y hadoop-client sudo yum install -y hadoop-hdfs sudo yum install -y spark-core sudo yum install -y java-1.8.0-openjdk If you want to use the AWS Glue Data Catalog with Spark, run the following command on the remote machine to install the AWS Glue libraries:
sudo yum install -y libgssglue Create the configuration files and point them to the EMR cluster
Note: You can also tools such as rsync to copy the configuration files from EMR master node to remote instance.
- Run the following commands on the EMR cluster's master node to copy the configuration files to Amazon Simple Storage Service (Amazon S3). Replace yours3bucket with the name of the bucket that you want to use.
aws s3 cp /etc/spark/conf s3://yours3bucket/emrhadoop-conf/sparkconf/ --recursive aws s3 cp /etc/hadoop/conf s3://yours3bucket/emrhadoop-conf/hadoopconf/ --recursive 2. Download the configuration files from the S3 bucket to the remote machine by running the following commands on the core and task nodes. Replace yours3bucket with the name of the bucket that you used in previous step.
sudo aws s3 cp s3://yours3bucket/emrhadoop-conf/hadoopconf/ /etc/hadoop/conf/ --recursive sudo aws s3 cp s3://yours3bucket/emrhadoop-conf/sparkconf/ /etc/spark/conf/ --recursive 3. Create the HDFS home directory for the user who will submit the Spark job to the EMR cluster. In the following commands, replace sparkuser with the name of your user.
hdfs dfs –mkdir /user/sparkuser hdfs dfs -chown sparkuser:sparkuser /user/sparkuser The remote machine is now ready for a Spark job.
Run the following command to submit a Spark job to the EMR cluster. Replace these values: org.apache.spark.examples.SparkPi: the class that serves as the entry point for the job /usr/lib/spark/examples/jars/spark-examples.jar: the path to the Java .jar file
spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /usr/lib/spark/examples/jars/spark-examples.jar You can also access HDFS data from the remote machine using hdfs commands.
Common errors
Standalone mode
Amazon EMR doesn't support standalone mode for Spark. It's not possible to submit a Spark application to a remote Amazon EMR cluster with a command like this:
SparkConf conf = new SparkConf().setMaster("spark://master_url:7077”).setAppName("Word Count"); Instead, set up your local machine as explained earlier in this article. Then, submit the application using the spark-submit command.
java.lang.UnsupportedClassVersionError
The following error occurs when the remote EC2 instance is running Java version 1.7 and the EMR cluster is running Java 1.8:
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/spark/launcher/Main : Unsupported major.minor version 52.0 To resolve this error, run the following commands to upgrade the Java version on the EC2 instance:
sudo yum install java-1.8.0 sudo yum remove java-1.7.0-openjdk
What is Livy?
Livy is an open source REST interface for interacting withApache Spark from anywhere. It supports executing snippets of code, a Spark context that runs locally or in Apache Hadoop YARN. As Livy used in EMR then it takes default Yarn. It can be used to push different programs such as Scala, Python, R, Java. However, the main idea to use Livy is for Multiple users.
livy.impersonation.enabled = true
Please see "livy-env.sh" under "/etc/livy/conf" location
export SPARK_HOME=/usr/lib/spark export HADOOP_CONF_DIR=/etc/hadoop/conf
As said, Livy uses spark configuration by default.However, we can override the Spark configuration by setting the SPARK_CONF_DIR environment variable before starting Livy.
sudo stop livy-server
sudo start livy-server
As you aware spark jobs can be submitted in two ways. one is cluster mode and other one is client mode.
However, It is strongly recommended to configure Spark to submit applications in YARN cluster mode. That makes sure that user sessions have their resources properly accounted for in the YARN cluster, and that the host running the Livy server doesn’t become overloaded when multiple user sessions are running.
If Spark jobs run in YARN mode, set the livy.spark.master and livy.spark.deployMode properties (client or cluster). For example:
.. cat /etc/livy/conf/livy.conf
livy.spark.master = yarn
livy.spark.deployMode = client
...
cat /etc/livy/conf/livy.conf
livy.spark.master = yarn
livy.spark.deployMode = cluster
If you want to be able to access Hive through Spark for Livy, you should configure Spark with Hive, and set livy.repl.enableHiveContext to true in livy.conf. For example:
cat /etc/livy/conf/livy.conf
livy.repl.enableHiveContext = true
livy.server.access-control.allowed-users *
livy.server.port 8998
livy.spark.master yarn
==> Please see important configurations which is required to have when you use livy.conf [root@ip-10-0-0-9 conf]# ls
livy-client.conf.template livy.conf livy.conf.template livy-env.sh livy-env.sh.template log4j.properties log4j.properties.template spark-blacklist.conf.template [root@ip-10-0-0-9 conf]#
=============================================================================================================================
livy.conf: contains the server configuration spark-blacklist.conf: lists Spark configuration options that users are not allowed to override. These options will be restricted to either their default values, or the values set in the Spark configuration used by Livy. log4j.properties: configuration for Livy logging. Defines log levels and where log messages will be written to. The default configuration template will print log messages to stderr.
============================================================================================================================ ==> Below configurations cannot be overrides as these are present in spark-blacklist.conf:
spark.master spark.submit.deployMode
spark.yarn.jar spark.yarn.jars spark.yarn.archive
livy.rsc.server.idle-timeout
==> Now let's come back to the main question which you wanted to explore i.e livy use case for spark and how to change configuration using livy How to use Livy and run spark jobs
Once the Livy server is running, you can connect to it on port 8998 (this can be changed with the livy.server.port config option). Some examples to get started are provided here, or you can check out the API documentation:
- REST API
- Programmatic API
- curl -H "Content-Type: application/json" -X POST -d ‘’ :/batches
Example:- curl -X POST --data '{"file": "/user/spark/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" localhost:8998/batches Above example takes default spark configuration which is mentioned in spark-defaults.conf.
- curl -X POST --data '{"file": "/user/spark/spark-examples.jar", “executorMemory”: “20g”,”className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" localhost:8998/batches Above example overrides spark-default configuration. The above rest API call will take executor memory as 20GB. If you do not have sufficient resource on your node then it will fail immediately. And same behavior in my case as well.
18/12/26 22:51:46 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: Required executor memory (20480+2048 MB) is above the max threshold (11520 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'. at org.apache.spark.deploy.yarn.Client.verifyClusterResources(Client.scala:318) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:166) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
Hence, if executor memory is above than yarn.scheduler.maximum-allocation-mb or yarn.nodemanager.resource.memory-mb then it will fail simply
Let me run spark-submit job and programmatically
If you see the instance then spark by default spark take executor memory as 5GB and executor-cores=4. Please see below screen shot
[hadoop@ip-10-0-0-7 ~]$ cat /etc/spark/conf/spark-defaults.conf |grep executor spark.executor.extraClassPath :/usr/lib/hadoop-lzo/lib/:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/:/usr/share/aws/emr/emrfs/auxlib/:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' spark.executor.memory 5120M spark.executor.cores 4 [hadoop@ip-10-0-0-7 ~]$
-
Let's assume you have 3 core nodes with memory 24GB and can be find in /etc/yarn/conf/yarn-site.xml yarn.nodemanager.resource.memory-mb=24G as we have 3 core nodes then total available memory is 24 * 3 = 72GB
-
Let's assume each machine has 16 vcores then total vcores are available is 3*16=48. Can be found the below configuration in yarn-site.xml yarn.nodemanager.resource.cpu-vcores=16
-
The executor memory also includes executor memory overhead. Hence, we have to consider both overhead memory and executor memory. As spark.executor.memory =5120M it means you can use 3 executor per machine (3 * 5120 = 15360 M) + overhead (10% of executor memory)
Total executor memory = executor + executor overhead = 5120M + 512M = 5624 If there are 3 executors then total memory executor memory used is 3 * 5624 = 18072M (per node). Hence, total 9 executors can be launched.
And now come to vcores then each machine has 16 vcores. So, each executor can use 5 cores. (The ApplicationMaster uses a core on one of the hosts)
So it can be defined as below --executor-cores 5 --num-executors 9
Please feel free to refer to the links [1] [2] for more information.