DLT Pipeline Source Notebooks - pathfinder-analytics-uk/dab_project GitHub Wiki

Project Code

citibike_etl/dlt/01_bronze/01_bronze_citibike.ipynb

import dlt
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
catalog = spark.conf.get("catalog")
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True), 
    StructField("start_station_id", StringType(), True),   
    StructField("end_station_name", StringType(), True), 
    StructField("end_station_id", StringType(), True), 
    StructField("start_lat", DecimalType(), True), 
    StructField("start_lng", DecimalType(), True), 
    StructField("end_lat", DecimalType(), True), 
    StructField("end_lng", DecimalType(), True), 
    StructField("member_casual", StringType(), True), 
])
@dlt.table(
    comment="Bronze layer: raw Citi Bike data with ingest metadata"
)
def bronze_jc_citibike():
    df = (
        spark.read
             .schema(schema)
             .csv(f"/Volumes/{catalog}/00_landing/source_citibike_data/JC-202503-citibike-tripdata.csv",
                  header=True)
    )
    return df

citibike_etl/dlt/02_silver/02_silver_citibike.ipynb

import dlt
from pyspark.sql.functions import col, unix_timestamp, to_date
@dlt.table(
    comment="Silver layer: cleaned and enriched Citi Bike data"
)
def silver_jc_citibike():
    # Read the Bronze DLT table
    df_bronze = dlt.read("bronze_jc_citibike")

    # Compute duration, extract date, then select the final columns
    df_silver = (
        df_bronze
        .withColumn(
            "trip_duration_mins",
            (unix_timestamp(col("ended_at")) - unix_timestamp(col("started_at"))) / 60
        )
        .withColumn(
            "trip_start_date",
            to_date(col("started_at"))
        )
        .select(
            "ride_id",
            "trip_start_date",
            "started_at",
            "ended_at",
            "start_station_name",
            "end_station_name",
            "trip_duration_mins"
        )
    )

    return df_silver

citibike_etl/dlt/03_gold/03_gold_citibike_daily_ride_summary.ipynb

import dlt
from pyspark.sql.functions import max, min, avg, count, round
@dlt.table(
    comment="Gold layer: daily aggregates of ride durations and trip counts for Citi Bike data"
)
def gold_daily_ride_summary():
    df = (
        dlt.read("silver_jc_citibike").\
                groupBy("trip_start_date").agg(
                round(max("trip_duration_mins"),2).alias("max_trip_duration_mins"),
                round(min("trip_duration_mins"),2).alias("min_trip_duration_mins"),
                round(avg("trip_duration_mins"),2).alias("avg_trip_duration_mins"),
                count("ride_id").alias("total_trips")
        ))
    return df

citibike_etl/dlt/03_gold/03_gold_citibike_daily_station_performance.ipynb

import dlt
from pyspark.sql.functions import max, min, avg, count, round
@dlt.table(
    comment="Gold layer: daily aggregates of ride durations and trip counts for Citi Bike data"
)
@dlt.table(
    comment="Gold layer: daily ride performance metrics per station, including average duration and total trips"
)
def gold_daily_station_performance():
    df = (
        dlt.read("silver_jc_citibike").\
                    groupBy("trip_start_date", "start_station_name").\
                    agg(
                    round(avg("trip_duration_mins"),2).alias("avg_trip_duration_mins"),
                    count("ride_id").alias("total_trips")
        ))
    return df