Local Module Imports in Databricks - pathfinder-analytics-uk/dab_project GitHub Wiki

Project Code

citibike_etl/notebooks/02_silver/02_silver_citibike.ipynb

import os
import sys

current_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(current_dir, "..", "..", ".."))

sys.path.append(project_root)

from src.citibike.citibike_utils import get_trip_duration_mins
from src.utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit
df = spark.read.table("citibike_dev.01_bronze.jc_citibike")
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "trip_duration_mins")
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")
df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit("placeholder"),
                  lit("run_id"), lit("placeholder"),
                  lit("task_id"), lit("placeholder"),
                  lit("processed_timestamp"), lit("placeholder")
                  ))
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata"
    )
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable("citibike_dev.02_silver.jc_citibike")