Python API - mliroz/hadoop_g5k GitHub Wiki

Python API

Hadoop_g5k provides a set of Pyhton interfaces and classes to manage Hadoop and Spark clusters from the Python code. Detailed information about the API is provided in Read the Docs.

Hadoop Basic Example

The same actions executed in [hg5k#general-overview-and-basic-usage] section of hg5h are implemented through the Python interfaces and available at hadoop_basic_usage.

Two classes are provided for a Hadoop cluster: HadoopCluster for a 0.x or 1.x installation and HadoopV2Cluster for a 2.x installation. In the constructor, the set of hosts where the cluster is to be installed is passed as a parameter.

Then Hadoop needs to be deployed, that is, the binary files transferred to the nodes in the cluster and the specified installation directories be created. The bootstrap() method takes the corresponding tar.gz distribution file and installs it on the hosts. Afterwards, ``ìnitialize()``` will make the necessary configurations for the cluster to run (definition of master and slaves, hardware-dependent parameters, etc.). Finally, we start the services of the cluster.

hc = HadoopV2Cluster(hosts)
hc.bootstrap(hadoop_tar_file)
hc.initialize()
hc.start_and_wait()

To execute a job, first we need to create a HadoopJarJob object. There is only one mandatory parameter in the constructor, the path of the Jar file with the code. In the example, we also specify the job parameters as a list of strings. Finally the object is executed in the cluster with the execute_job method.

job_output = "/output"
job = HadoopJarJob(jar_path, ["randomtextwriter", job_output])
(out, err) = hc.execute_job(job, verbose=False)

if job.success:
    print "The job finished successfully (with id %s)" % job.job_id
else:
    print "There was a problem during job execution"

Standard output and error can be retrieved from the execution method. Moreover, the job object stores some additional information like the success or the id.

Spark Basic Example

The same actions executed in [spark_g5k#general-overview-and-basic-usage] section of spark_g5k are implemented through the Python interfaces and available at spark_basic_usage.

Similarly to Hadoop, a cluster class is provided for Spark clusters, SparkCluster. The constructor needs to be sent the mode (either STANDALONE_MODE or YARN_MODE) and either the a list of hosts where it should be deployed or an instance of HadoopCluster from where to learn the hosts. Note that both elements may be provided. In this case Spark will be deployed in the specified hosts but still will be able to access the Hadoop cluster.

As for Hadoop, the cluster should be installed and initialized before the services can run.

sc = SparkCluster(YARN_MODE, hadoop_cluster=hc)
sc.bootstrap(spark_tar_file)
sc.initialize()
sc.start()

Once the cluster is running, the user may create a SparkJob and send it to the cluster object for execution. Two types of clusters are available: PythonSparkJob and JavaOrScalaSparkJob. To follow the mentioned example, we use the second class:

main_class = "org.apache.spark.examples.SparkPi"
params = []

job = JavaOrScalaSparkJob(job_path=jar_path,
                          app_params=params,
                          main_class=main_class)

(out, err) = sc.execute_job(job, verbose=False)

if job.success:
    print "The job finished successfully"
else:
    print "There was a problem during job execution"

The main_classmay or may not be specified depending on whether the Jar file has established a default class for execution or not.