2. Silver Layer: Transformation, Enrichment, and Validation - RogerThattt/SAP-Retail-Data-Databricks GitHub Wiki
🧠 Objective: Convert raw SAP structures into normalized, high-quality, query-efficient business datasets.
📦 Step-by-Step with Delta Live Tables (DLT) Step 2.1: Define Pipeline Configuration Create DLT pipeline in Triggered mode for batch processing (unless real-time needed).
Configure:
Target schema: retail_silver
Storage location: /mnt/silver/retail
Notebooks or Python Modules as source.
Step 2.2: Clean Product Master Data python Copy Edit import dlt from pyspark.sql.functions import trim, col
@dlt.table(name="product_silver") def clean_product_master(): return ( dlt.read("sap_product_master_bronze") .filter(col("material_id").isNotNull()) .withColumn("material_id", trim(col("material_id"))) .withColumn("category", trim(col("category"))) ) Purpose:
Removes noise: nulls, whitespace, and malformed entries.
Standardizes dimension keys (e.g., material_id).
Maintains dimension conformance across downstream tables.
Step 2.3: Enrich POS Transactions python Copy Edit @dlt.table(name="pos_transactions_silver") def enrich_pos_transactions(): pos_df = dlt.read("sap_pos_transactions_bronze") product_df = dlt.read("product_silver")
return (
pos_df
.filter(col("transaction_id").isNotNull())
.join(product_df, on="material_id", how="left")
.withColumn("total_amount", col("quantity") * col("unit_price"))
)
Key Points:
Semantic join between transactional and dimensional data.
Revenue enrichment added using explicit multiplication (quantity * unit_price).
Supports GenAI-friendly embedding models by attaching business semantics to transactions (product categories, revenue, etc.).
Step 2.4: Apply Data Quality Expectations python Copy Edit @dlt.expect("valid_price", "unit_price > 0") @dlt.expect_or_drop("non_null_material", "material_id IS NOT NULL") @dlt.table(name="validated_pos_transactions") def validate_transactions(): return dlt.read("pos_transactions_silver") @dlt.expect → Logs failures but allows records through.
@dlt.expect_or_drop → Actively drops invalid records.
Enables observability with metrics such as failure ratios, drop rates.
Step 2.5: Dependency Management & Lineage DLT automatically resolves DAGs:
product_silver → pos_transactions_silver → validated_pos_transactions
All stages have built-in lineage visualization in the Databricks UI.
You can access this lineage via:
sql Copy Edit DESCRIBE HISTORY validated_pos_transactions;