Project Code
resources/citibike_etl_pipeline_nb.job.yml
resources:
jobs:
citibike_etl_pipeline_nb:
name: citibike_etl_pipeline_nb
tasks:
- task_key: 01_bronze_citibike
notebook_task:
notebook_path: ../citibike_etl/notebooks/01_bronze/01_bronze_citibike.ipynb
source: WORKSPACE
job_cluster_key: citibike_etl_pipeline_nb_cluster
- task_key: 02_silver_citibike
depends_on:
- task_key: 01_bronze_citibike
notebook_task:
notebook_path: ../citibike_etl/notebooks/02_silver/02_silver_citibike.ipynb
source: WORKSPACE
job_cluster_key: citibike_etl_pipeline_nb_cluster
- task_key: 03_gold_citibike_daily_ride_summary
depends_on:
- task_key: 02_silver_citibike
notebook_task:
notebook_path: ../citibike_etl/notebooks/03_gold/03_gold_citibike_daily_ride_summary.ipynb
source: WORKSPACE
job_cluster_key: citibike_etl_pipeline_nb_cluster
libraries:
- whl: ../dist/*.whl
- task_key: 03_gold_citibike_daily_station_performance
depends_on:
- task_key: 02_silver_citibike
notebook_task:
notebook_path: ../citibike_etl/notebooks/03_gold/03_gold_citibike_daily_station_performance.ipynb
source: WORKSPACE
job_cluster_key: citibike_etl_pipeline_nb_cluster
job_clusters:
- job_cluster_key: citibike_etl_pipeline_nb_cluster
new_cluster:
cluster_name: ""
spark_version: 15.4.x-scala2.12
spark_conf:
spark.master: local[*, 4]
spark.databricks.cluster.profile: singleNode
azure_attributes:
first_on_demand: 1
availability: SPOT_WITH_FALLBACK_AZURE
spot_bid_max_price: -1
node_type_id: Standard_DS3_v2
driver_node_type_id: Standard_DS3_v2
custom_tags:
ResourceClass: SingleNode
spark_env_vars:
PYSPARK_PYTHON: /databricks/python3/bin/python3
enable_elastic_disk: true
data_security_mode: SINGLE_USER
runtime_engine: STANDARD
num_workers: 0
queue:
enabled: true
resources/citibike_etl_pipeline_py.job.yml
resources:
jobs:
citibike_etl_pipeline_py:
name: citibike_etl_pipeline_py
tasks:
- task_key: 01_bronze_citibike
spark_python_task:
python_file: ../citibike_etl/scripts/01_bronze/01_bronze_citibike.py
parameters:
- "{{job.id}}"
- "{{job.run_id}}"
- "{{task.run_id}}"
- "{{job.start_time.iso_datetime}}"
- "${var.catalog}"
job_cluster_key: ds3_v2_sn
- task_key: 02_silver_citibike
depends_on:
- task_key: 01_bronze_citibike
spark_python_task:
python_file: ../citibike_etl/scripts/02_silver/02_silver_citibike.py
parameters:
- "{{job.id}}"
- "{{job.run_id}}"
- "{{task.run_id}}"
- "{{job.start_time.iso_datetime}}"
- "${var.catalog}"
job_cluster_key: ds3_v2_sn
libraries:
- whl: ../dist/*.whl
- task_key: 03_gold_citibike_daily_ride_summary
depends_on:
- task_key: 02_silver_citibike
spark_python_task:
python_file: ../citibike_etl/scripts/03_gold/03_gold_citibike_daily_ride_summary.py
parameters:
- "${var.catalog}"
job_cluster_key: ds3_v2_sn
- task_key: 03_gold_citibike_daily_station_performance
depends_on:
- task_key: 02_silver_citibike
spark_python_task:
python_file: ../citibike_etl/scripts/03_gold/03_gold_citibike_daily_station_performance.py
parameters:
- "${var.catalog}"
job_cluster_key: ds3_v2_sn
job_clusters:
- job_cluster_key: ds3_v2_sn
new_cluster: "${var.ds3_v2_sn}"
queue:
enabled: true
citibike_etl/notebooks/02_silver/02_silver_citibike.ipynb
from citibike.citibike_utils import get_trip_duration_mins
from utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit
df = spark.read.table("citibike_dev.01_bronze.jc_citibike")
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "trip_duration_mins")
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")
df = df.withColumn("metadata",
create_map(
lit("pipeline_id"), lit("placeholder"),
lit("run_id"), lit("placeholder"),
lit("task_id"), lit("placeholder"),
lit("processed_timestamp"), lit("placeholder")
))
df = df.select(
"ride_id",
"trip_start_date",
"started_at",
"ended_at",
"start_station_name",
"end_station_name",
"trip_duration_mins",
"metadata"
)
df.write.\
mode("overwrite").\
option("overwriteSchema", "true").\
saveAsTable("citibike_dev.02_silver.jc_citibike")
citibike_etl/scripts/02_silver/02_silver_citibike.py
import sys
from citibike.citibike_utils import get_trip_duration_mins
from utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit
pipeline_id = sys.argv[1]
run_id = sys.argv[2]
task_id = sys.argv[3]
processed_timestamp = sys.argv[4]
catalog = sys.argv[5]
df = spark.read.table(f"{catalog}.01_bronze.jc_citibike")
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "trip_duration_mins")
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")
df = df.withColumn("metadata",
create_map(
lit("pipeline_id"), lit(pipeline_id),
lit("run_id"), lit(run_id),
lit("task_id"), lit(task_id),
lit("processed_date"), lit(processed_timestamp)
))
df = df.select(
"ride_id",
"trip_start_date",
"started_at",
"ended_at",
"start_station_name",
"end_station_name",
"trip_duration_mins",
"metadata"
)
df.write.\
mode("overwrite").\
option("overwriteSchema", "true").\
saveAsTable(f"{catalog}.02_silver.jc_citibike")