Create Dataproc on GCP using Cloud Function and Python - atodkar/blog GitHub Wiki
January 13, 2019
This week we will see how Batch mode Data Analytics application can be triggered on Dataproc Cluster to process pyspark jobs. In order to Develop truly Cloud Native Data Analytics Applications on Google Cloud Platform, we need to create orchestration so that once data is available in Cloud Storage bucket the trigger can execute Cloud Function which will spawn Dataproc cluster to process data. Trigger can also be based on cron job to create data processing Cluster periodically.
This scenario may not be applicable for continuous streaming processing as we will need long running cluster which can be created using Cloud Deployment Manager.
Following steps are involved in this process -
- Create bucket in Cloud Storage for incoming data storage.
- Create Cloud Function for Python 3.7 (This is the only version supported by GCP Cloud Function at this time).
- Use Google Cloud python client SDK to trigger Cloudproc.
I see there are various solutions to solve this issue using cloud shell SDK etc. but I wanted to explore alternative on GCP for boto3 libraries we use on AWS to access cloud infrastructure programmatic way. I tried searching on Google Documentation for similar solution but couldnt find any apart from API documentation.
So lets see how GCP Python Client SDK can be used to access Dataproc.
Create Bucket
Unlike AWS, we cannot see trigger configuration on bucket, trigger is configured on the Console of Cloud Function.
Create Cloud Function
Trigger needs to be configured here -
Use Python Client Library
This is where the core logic is. First we need to add google-cloud-dataproc and google-api-python-client libraries in requirements.txt file.
Requirements
# Function dependencies, for example:
# package>=version
google-cloud-dataproc
google-api-python-client
Entry point
These libraries wrap around underneath REST calls and provide with simple API calls. Entry point to the Cloud Function is createdataproc, we need to mention it in 'Function to execute'.
My code for createdataproc looks like below -
import time
from google.cloud import dataproc
def createdataproc(request):
"""Triggered by a change to a Cloud Storage bucket.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
client = dataproc.ClusterControllerClient()
zone = "us-central1-a"
cluster_name = "test-analytics"
project_id = 'playground-333820'
region = 'global'
if cluster_exists(client, project_id, region, cluster_name) is not None:
print(f"Deleting cluster...")
client.delete_cluster(project_id, region, cluster_name)
while(cluster_exists(client, project_id, region, cluster_name) is not None):
print("Waiting...")
time.sleep(5)
create_cluster(client, project_id, zone, region, cluster_name)
print(f"Cluster created: {cluster_name}.")
We can see documentation for ClusterControllerClient and its supported methods at Cluster Client documentation here
In our logic we are looking for a cluster with name is available, deleting it if available and recreate it. I know this logic may not be correct in every case but that can be base for logic here. create_cluster method will fail if the cluster with same name already exists. So cluster_exists method simply checks if a given cluster exists.
Check if a Cluster exists
def cluster_exists(client, project_id, region, cluster_name):
try:
return client.get_cluster(project_id, region, cluster_name)
except:
return None
client.get_cluster throws exception if a given cluster_name does not exist in the specified project. If cluster exists, we can choose to skip further process or delete and recreate new one. I an doing the later. delete_cluster takes cluster_name as parameter and we need to wait until the cluster is completely deleted before we go for create_cluster, otherwise create_cluster will throw an Exception. We can see wait logic being implemented in first method.
Create Cluster
In this method we define parameters and configurations for cluster create. We can see Datatype Documentation for further details and specific parameters.
def create_cluster(client, project_id, zone, region, cluster_name):
print('Creating cluster...')
zone_uri = \
'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format(
project_id, zone)
cluster_data = {
'project_id': project_id,
'cluster_name': cluster_name,
'config': {
'gce_cluster_config': {
'zone_uri': zone_uri,
'metadata': {
'hive-metastore-instance': 'playground-221820:us-central1:hive-metastore'
},
'service_account_scopes': [
'https://www.googleapis.com/auth/sqlservice.admin',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/devstorage.full_control'
]
},
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-1'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-1'
},
'initialization_actions' : [
{
'executable_file' : 'gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh'
}
]
}
}
response = client.create_cluster(project_id, region, cluster_data)
result = response.result()
print("After cluster create")
return result
Few important configurations apart from master_config and worker_config is initialization_actions which executes startup steps. In current example, we are initializing cloud proxy in order for cluster to be able to connect to Cloud SQL. This is needed if our metastore is inside SQL database and every time when a new cluster is spawned should get access to metadata table schemas.
We can have custom scripts as well on initialization. Also in CloudConfig if there are some mandatory parameters needs to be mentioned such as zone_uri, metadata (for hive metastore location) and service_account_scopes for cluster permission to other cloud resources. There are some default permissions implicitely given but for any other permission such as 'https://www.googleapis.com/auth/sqlservice.admin' we need to explicitely mention then here.
For more details on cluster create parameters we can refer to documentation here.
Create Jobs
We need to use separate client for Jobs create on cluster, I am not going in details of "how to" on it but I think it should be fairly straight like how we did for create-cluster.
Thank you for reading and hope you find it useful.