Links and Resources
Project Code
citibike_etl/notebooks/01_bronze/01_bronze_citibike.ipynb
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
from pyspark.sql.functions import create_map, lit
schema = StructType([
StructField("ride_id", StringType(), True),
StructField("rideable_type", StringType(), True),
StructField("started_at", TimestampType(), True),
StructField("ended_at", TimestampType(), True),
StructField("start_station_name", StringType(), True),
StructField("start_station_id", StringType(), True),
StructField("end_station_name", StringType(), True),
StructField("end_station_id", StringType(), True),
StructField("start_lat", DecimalType(), True),
StructField("start_lng", DecimalType(), True),
StructField("end_lat", DecimalType(), True),
StructField("end_lng", DecimalType(), True),
StructField("member_casual", StringType(), True),
])
df = spark.read.csv("/Volumes/citibike_dev/00_landing/source_citibike_data/JC-202503-citibike-tripdata.csv", schema=schema, header=True)
df = df.withColumn("metadata",
create_map(
lit("pipeline_id"), lit("placeholder"),
lit("run_id"), lit("placeholder"),
lit("task_id"), lit("placeholder),
lit("processed_timestamp"), lit("placeholder")
))
df.write.\
mode("overwrite").\
option("overwriteSchema", "true").\
saveAsTable("citibike_dev.01_bronze.jc_citibike")