Simple Apache Beam Template deploy on GCP Dataflow - atodkar/blog GitHub Wiki

January 20, 2019

Google Cloud Platform is gaining popularity day by day because of various reasons. One of that is it's offering in BigData sector. In this week I got opportunity to work on Cloud Dataflow, this is an ETL Pipeline based on Apache Beam framework. Today we will look at how Dataflow deploys builtin template to Google Cloud platform which does various operations involving Cloud Storage and BigQuery. We will also see how a quick and simple Apache Beam Pipeline can be created and deployed on Cloud Dataflow.

Deploying Builtin templates to Dataflow

GCP has several builtin templates to do various jobs. There are mainly four classifications -

  1. Batch based processing
  2. Stream based processing
  3. Utilities
  4. Custom Templates

Batch based processing includes one time steps to process data in batch such as wordcount in files stored in cloud storage, copy data from storage to Datastore (Serverless Document Store) and BigTable, using BigQuery etc. Figure 1. shows various builtin templates we can use for our ETL.

Figure 1

Another classification is about Stream processing where we can process data from Pub/Sub service and send that data to Storage bucket or any other service in Google platform.Figure 2. Shows list of offered services there -

Figure 2

In addition to these main segments, there are some utility templates and then custom templates where we can create our own templates using Apache Beam platform and deploy then to Dataflow. In most of use cases we would need to create our own template where we can have better control over processing.

Apache Beam supports Java and Python languages at this moment and there are various run-times including Spark, Flume and Dataflow on which these custom templates can execute. While I can see google needs to work a lot on enhancing their documentation on Custom Beam Templates and ease in deploying them to GCP, I am hopeful with Apache and open source framework Beam will gain popularity soon. If we compare Beam with Spark at this moment Beam needs to get a lot matured but with enhanced data extraction engine should get popular and gain popularity.

In order to get custom template deployed, we need to first create code for our use case, I am using simple wordcount template, we can enhance this as per our need and then deploy it to GCP Dataflow. Before deploying, we can also test locally by downloading Dataflow Runtime locally.

So lets start -

Deploying Custom Dataflow Templates

Understand Apache Beam

In today's example I am using Python example, we can follow instructions given at https://beam.apache.org/get-started/quickstart-py/ to start quickly and note that its good to do this locally on virtual environment as at this moment Beam supports only python 2.7. Quickstart examples are given at https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples

Create custom template

We can use wordcount.py to deploy to start with and later can modify to do custom processing. We need to follow certain guidelines in order to use PipelineOptions and add arguments as add_value_provider_argument. We can see some google documentation under https://cloud.google.com/dataflow/docs/guides/templates/creating-templates but it is not sufficient. We need to modify our template in order to load options in separate class and then provide with reference to this class while initializing pipeline.

class WordcountOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    # Use add_value_provider_argument for arguments to be templatable
    # Use add_argument as usual for non-templatable arguments
    parser.add_value_provider_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='Path of the file to read from')
    parser.add_value_provider_argument(
        '--output',
        required=True,
        help='Output file to write results to.')
        
        
def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).

  pipeline_options = PipelineOptions(argv)
  word_count_options = pipeline_options.view_as(WordcountOptions)
  # word_count_options.save_main_session = True
  
  with beam.Pipeline(options=pipeline_options) as p:
    lines = p | beam.io.ReadFromText(word_count_options.input)

Once this is modified we can execute this code using below command -

python -m wordcount \
  --runner DataflowRunner \
  --project playground \
  --staging_location gs://[YOUR_BUCKET_NAME]/staging \
  --temp_location gs://[YOUR_BUCKET_NAME]/temp \
  --template_location gs://[YOUR_BUCKET_NAME]/templates/mytemplate

Above command executes inside Dataflow runtime locally and also copies code to staging_location on cloud bucket. As part of execution, this code needs to login to GCP using browser and get access to bucket.

There is optional metadata.json file we can create to tell Dataflow which parameters are required and shown on UI. Sample metadata.json is as below -

{
  "name": "AnandWordCount",
  "description": "An example pipeline that counts words in the input file.",
  "parameters": [{
    "name": "inputFile",
    "label": "Input Cloud Storage File(s)",
    "help_text": "Path of the file pattern glob to read from.",
    "regexes": ["^gs:\/\/[^\n\r]+$"],
    "is_optional": true
  },
  {
    "name": "output",
    "label": "Output Cloud Storage File Prefix",
    "help_text": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
    "regexes": ["^gs:\/\/[^\n\r]+$"]
  }]
}

Create Template in GCP

We can then select custom template option on New Dataflow Template screen on GCP as shown in below diagram -

On highlighted location, we need to select staging location where the code is uploaded.

Summary

Platform is still getting matured but I see it's newer way of data analytics. I am still learning and will post more as I get more details.