PySpark script for Delta Tables - geramkumar/ram-databricks-lab GitHub Wiki
Delta Table Feature Demonstration Script in PySpark (Databricks Ready)
from pyspark.sql import SparkSession from pyspark.sql.functions import * from delta.tables import DeltaTable import os
spark = SparkSession.builder
.appName("DeltaTableFeaturesDemo")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
Set your path in DBFS or local for Delta storage
base_path = "/tmp/delta/employees"
spark.sql(f"DROP TABLE IF EXISTS delta.{base_path}
")
spark.sql(f"DROP VIEW IF EXISTS employee_view")
1. Create Initial DataFrame
init_data = [(1, "Alice", 5000), (2, "Bob", 6000), (3, "Charlie", 5500)] df = spark.createDataFrame(init_data, ["id", "name", "salary"])
2. Write to Delta Table
(df.write.format("delta").mode("overwrite").save(base_path))
3. Read Delta Table
spark.read.format("delta").load(base_path).show()
4. Create Delta Table from Path
spark.sql(f"CREATE TABLE delta_employees USING DELTA LOCATION '{base_path}'")
5. Update Record
DeltaTable.forPath(spark, base_path).update("id = 2", {"salary": "7000"})
6. Delete Record
DeltaTable.forPath(spark, base_path).delete("id = 3")
7. Merge (Upsert)
upsert_data = [(2, "Bob", 7200), (4, "David", 5800)]
upsert_df = spark.createDataFrame(upsert_data, ["id", "name", "salary"])
DeltaTable.forPath(spark, base_path).alias("target")
.merge(upsert_df.alias("source"), "target.id = source.id")
.whenMatchedUpdate(set={"name": "source.name", "salary": "source.salary"})
.whenNotMatchedInsert(values={"id": "source.id", "name": "source.name", "salary": "source.salary"})
.execute()
8. Schema Evolution
new_data = [(5, "Eve", 6400, "HR")] new_df = spark.createDataFrame(new_data, ["id", "name", "salary", "department"]) (new_df.write.format("delta").mode("append").option("mergeSchema", "true").save(base_path))
9. Time Travel (Describe History)
spark.sql(f"DESCRIBE HISTORY delta.{base_path}
").show(truncate=False)
10. Read Previous Version
spark.read.format("delta").option("versionAsOf", 0).load(base_path).show()
11. Vacuum (remove old files)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql(f"VACUUM delta.{base_path}
RETAIN 0 HOURS")
12. Optimize
spark.sql(f"OPTIMIZE delta.{base_path}
")
13. ZORDER
spark.sql(f"OPTIMIZE delta.{base_path}
ZORDER BY (id)")
14. Convert Parquet to Delta
parquet_path = "/tmp/parquet_data"
df.write.mode("overwrite").parquet(parquet_path)
spark.sql(f"CONVERT TO DELTA parquet.{parquet_path}
")
15. Create Table from Delta
spark.sql("CREATE OR REPLACE TABLE managed_employees USING DELTA AS SELECT * FROM delta.{}
".format(base_path))
16. Alias for Query
DeltaTable.forPath(spark, base_path).alias("emp").toDF().filter("salary > 6000").show()
17. SQL Query on Delta Table
spark.sql("SELECT * FROM delta_employees WHERE salary > 6000").show()
18. View on Delta Table
spark.sql("CREATE OR REPLACE VIEW employee_view AS SELECT * FROM delta_employees") spark.sql("SELECT * FROM employee_view").show()
19. Column Rename
DeltaTable.forPath(spark, base_path).toDF().withColumnRenamed("name", "emp_name").show()
20. Add Column + Write
updated_df = DeltaTable.forPath(spark, base_path).toDF().withColumn("bonus", col("salary") * 0.10) (updated_df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(base_path))
View Final Table
spark.read.format("delta").load(base_path).show()