Regular Variables with Manual Override - pathfinder-analytics-uk/dab_project GitHub Wiki

Commands

Overriding the catalog variable for test deployment

databricks bundle deploy -t test --var="catalog=citibike_test"

Overriding the catalog variable for prod deployment

databricks bundle deploy -t prod --var="catalog=citibike_prod"

Project Code

databricks.yml

# This is a Databricks asset bundle definition for dab_project.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
  name: dab_project

include:
  - resources/*.yml

variables:
  catalog:
    default: "citibike_dev"

targets:
  dev:
    # The default target uses 'mode: development' to create a development copy.
    # - Deployed resources get prefixed with '[dev my_user_name]'
    # - Any job schedules and triggers are paused by default.
    # See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
    mode: development
    default: true
    workspace:
      host: https://adb-3167041784358437.17.azuredatabricks.net

  test:
    mode: production
    presets:
      name_prefix: '[testing] '
    workspace:
      host: https://adb-348512119942792.12.azuredatabricks.net/
      # We explicitly specify /Workspace/Users/[email protected] to make sure we only have a single copy.
      root_path: /Workspace/Shared/.bundle/${bundle.name}/${bundle.target}
    permissions:
      - user_name: [email protected]
        level: CAN_MANAGE
    run_as:
      user_name: [email protected]

  prod:
    mode: production
    workspace:
      host: https://adb-3274436598051014.14.azuredatabricks.net/
      # We explicitly specify /Workspace/Users/[email protected] to make sure we only have a single copy.
      root_path: /Workspace/Shared/.bundle/${bundle.name}/${bundle.target}
    permissions:
      - user_name: [email protected]
        level: CAN_MANAGE
    run_as:
      user_name: [email protected]

citibike_etl/notebooks/01_bronze/01_bronze_citibike.ipynb

from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
from pyspark.sql.functions import create_map, lit
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True), 
    StructField("start_station_id", StringType(), True),   
    StructField("end_station_name", StringType(), True), 
    StructField("end_station_id", StringType(), True), 
    StructField("start_lat", DecimalType(), True), 
    StructField("start_lng", DecimalType(), True), 
    StructField("end_lat", DecimalType(), True), 
    StructField("end_lng", DecimalType(), True), 
    StructField("member_casual", StringType(), True), 
])
df = spark.read.csv(f"/Volumes/{catalog}/00_landing/source_citibike_data/JC-202503-citibike-tripdata.csv", schema=schema, header=True)
df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit(pipeline_id),
                  lit("run_id"), lit(run_id),
                  lit("task_id"), lit(task_id),
                  lit("processed_timestamp"), lit(processed_timestamp)
                  ))
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.01_bronze.jc_citibike")

citibike_etl/notebooks/02_silver/02_silver_citibike.ipynb

from citibike.citibike_utils import get_trip_duration_mins
from utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")
df = spark.read.table(f"{catalog}.01_bronze.jc_citibike")
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "trip_duration_mins")
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")
df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit(pipeline_id),
                  lit("run_id"), lit(run_id),
                  lit("task_id"), lit(task_id),
                  lit("processed_timestamp"), lit(processed_timestamp)
                  ))
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata"
    )
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.02_silver.jc_citibike")

citibike_etl/notebooks/03_gold/03_gold_citibike_daily_ride_summary.ipynb

from pyspark.sql.functions import max, min, avg, count, round
catalog = dbutils.widgets.get("catalog")
df = spark.read.table(f"{catalog}.02_silver.jc_citibike")
df = df.groupBy("trip_start_date").agg(
    round(max("trip_duration_mins"),2).alias("max_trip_duration_mins"),
    round(min("trip_duration_mins"),2).alias("min_trip_duration_mins"),
    round(avg("trip_duration_mins"),2).alias("avg_trip_duration_mins"),
    count("ride_id").alias("total_trips")
)
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.03_gold.daily_ride_summary")

citibike_etl/notebooks/03_gold/03_gold_citibike_daily_station_performane.ipynb

from pyspark.sql.functions import avg, count, round
catalog = dbutils.widgets.get("catalog")
df = spark.read.table(f"{catalog}.02_silver.jc_citibike")
df = df.\
    groupBy("trip_start_date", "start_station_name").\
    agg(
    round(avg("trip_duration_mins"),2).alias("avg_trip_duration_mins"),
    count("ride_id").alias("total_trips")
    )
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.03_gold.daily_station_performance")