Azure databricks running guzzle jobs from Azure Data Factory pipeline - ja-guzzle/guzzle_docs GitHub Wiki

Using Databricks Jar Activity

  • Add databricks jar activity in pipeline

  • Select databricks linked service (create new if not exists. select existing cluster)

  • Provide main class of guzzle job (i.e. for ingestion 'com.justanalytics.guzzle.ingestion.Main')

  • Provide job parameters and dbfs guzzle libraries path

  • Following is the sample pipeline template

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Ingestion Job",
                "type": "DatabricksSparkJar",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "mainClassName": "com.justanalytics.guzzle.ingestion.Main",
                    "parameters": [
                        "environment=test",
                        "job_instance_id=504",
                        "batch_id=-1",
                        "stage_id=-1",
                        "business_date=2018-05-01 03:03:00.000",
                        "job_config_name=csv_lfs_demo",
                        "system=default",
                        "location=IN"
                    ],
                    "libraries": [
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/commons-beanutils-1.9.2.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/commons-codec-1.10.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/commons-collections-3.2.2.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/commons-digester-1.8.1.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/commons-logging-1.2.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/commons-validator-1.5.1.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/guava-19.0.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/hive-metastore.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/httpclient-4.5.3.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/httpcore-4.4.6.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-annotations-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-core-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-databind-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-dataformat-csv-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-dataformat-yaml-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-module-paranamer-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/jackson-module-scala_2.11-2.6.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/json-20160810.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/log4j-1.2.17.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/mssql-jdbc-6.1.0.jre8.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/mysql-connector-java-5.1.46.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/org.everit.json.schema-1.5.1.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/paranamer-2.6.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/postgresql-42.2.5.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/scala-library-2.11.8.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/scala-reflect-2.11.7.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/slf4j-api-1.7.16.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/slf4j-log4j12-1.7.16.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/snakeyaml-1.15.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/spark-streaming-kafka-0-10_2.11-2.2.0.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/libs/spark-xml_2.11-0.4.1.jar"
                        },
                        {
                            "jar": "dbfs:/mnt/guzzle_home/bin/ingestion.jar"
                        }
                    ]
                },
                "linkedServiceName": {
                    "referenceName": "guzzle_dbws",
                    "type": "LinkedServiceReference"
                }
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Using Guzzle APIs

  • Add web activity in pipeline

  • In settings tab, provide guzzle api url http://<host:port>/api/execute/job and select PUT method

  • Add Authorization header with value Bearer <guzzle authorization token>

  • Provide body like following:

{
    "name":"csv_lfs_demo",
    "jobParameters": {
        "system":"default",
        "location":"IN",
        "business_date":"2018-12-05 17:33:55",
        "environment":"test"
    }
}
  • This web activity will submit guzzle job

  • Create Until activity to wait for job completion (by querying job_info table in guzzle database)

  • Following is the sample pipeline template

{
    "name": "pipeline3",
    "properties": {
        "activities": [
            {
                "name": "Submit job",
                "type": "WebActivity",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "url": "http://guzzle.eastus2.cloudapp.azure.com:9090/api/execute/job",
                    "method": "PUT",
                    "headers": {
                        "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJhZG1pbiIsInVzZXJfaW5mbyI6IntcInVzZXJJZFwiOjEsXCJ1c2VybmFtZVwiOlwiYWRtaW5cIixcImVtYWlsXCI6XCJhZG1pblwiLFwiYXV0aG9yaXRpZXNcIjpbXCJST0xFX0FETUlOXCJdfSIsImNyZWF0ZV9kYXRlIjoxNTQ0MDE0MzY3NDEwLCJhdXRoIjoiUk9MRV9BRE1JTiJ9.P7o_U95LVQoEW1jtl4vQzv9Mi5_qo42bab-qm-JYsGCAY28hBABkBPYPunxNynBE7meXFz9LSooazrpIhsbUpQ"
                    },
                    "body": {
                        "name": "csv_lfs_demo",
                        "jobParameters": {
                            "system": "default",
                            "location": "IN",
                            "business_date": "2018-12-05 17:33:55",
                            "environment": "test"
                        }
                    }
                }
            },
            {
                "name": "Wait for job completion",
                "type": "Until",
                "dependsOn": [
                    {
                        "activity": "Submit job",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "typeProperties": {
                    "expression": {
                        "value": "@and(not(equals(activity('Lookup Job Status').output.firstRow.status, 'PENDING')), not(equals(activity('Lookup Job Status').output.firstRow.status, 'RUNNING')))",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Lookup Job Status",
                            "type": "Lookup",
                            "dependsOn": [
                                {
                                    "activity": "Wait",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "select status from job_info where job_instance_id = @{activity('Submit job').output.jobInstanceId}",
                                        "type": "Expression"
                                    }
                                },
                                "dataset": {
                                    "referenceName": "guzzle_repo",
                                    "type": "DatasetReference"
                                }
                            }
                        },
                        {
                            "name": "Wait",
                            "type": "Wait",
                            "typeProperties": {
                                "waitTimeInSeconds": 1
                            }
                        }
                    ],
                    "timeout": "7.00:00:00"
                }
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}
⚠️ **GitHub.com Fallback** ⚠️