Pyspark scenario based questions - geramkumar/ram-databricks-lab GitHub Wiki

Advanced PySpark Scenarios for Senior Architect Interviews

from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql.window import Window

spark = SparkSession.builder.appName("AdvancedPySparkScenarios").getOrCreate()

=== Sample Data ===

data = [ (1, "Alice", "HR", 5000, "2023-01-10"), (2, "Bob", "IT", 6000, "2023-01-12"), (3, "Charlie", "IT", 6500, "2023-01-13"), (4, "David", "Finance", 7000, "2023-01-11"), (5, "Eve", "HR", 4500, "2023-01-09"), (6, "Frank", "Finance", 8000, "2023-01-15"), (7, "Grace", "IT", 5500, "2023-01-14"), (8, "Heidi", "HR", 5800, "2023-01-08"), (9, "Ivan", "Finance", 7300, "2023-01-12"), (10, "Judy", "IT", 5000, "2023-01-10") ] cols = ["emp_id", "name", "dept", "salary", "hire_date"] df = spark.createDataFrame(data, cols)

=== 1. Cumulative Aggregation with Condition ===

windowSpec = Window.partitionBy("dept").orderBy("salary").rowsBetween(Window.unboundedPreceding, Window.currentRow) df.filter(col("salary") > 5000).withColumn("cumulative_salary", sum("salary").over(windowSpec)).show()

=== 2. Deduplicate Based on Latest Date ===

data2 = [(1, "Alice", "2023-01-10"), (1, "Alice", "2023-01-12"), (2, "Bob", "2023-01-08"), (2, "Bob", "2023-01-15"), (3, "Charlie", "2023-01-11")] df2 = spark.createDataFrame(data2, ["emp_id", "name", "date"]).withColumn("date", to_date("date")) df2.withColumn("rn", row_number().over(Window.partitionBy("emp_id").orderBy(desc("date")))).filter("rn = 1").show()

=== 3. Join with Null-Safe Logic ===

emp_df = spark.createDataFrame([(1, "Alice", "HR"), (2, "Bob", None), (3, "Charlie", "IT")], ["emp_id", "name", "dept"]) dept_df = spark.createDataFrame([("HR", "Human Resources"), ("IT", "Information Tech")], ["dept", "dept_name"]) emp_df.join(dept_df, "dept", "outer").fillna({"dept_name": "Unknown Department"}).show()

=== 4. Pivot Table ===

pivot_data = [("HR", "2023-01", 5000), ("HR", "2023-02", 5200), ("IT", "2023-01", 6000), ("IT", "2023-02", 6300), ("Finance", "2023-01", 7000)] df_pivot = spark.createDataFrame(pivot_data, ["dept", "month", "salary"]) df_pivot.groupBy("dept").pivot("month").sum("salary").show()

=== 5. Explode Skills ===

skill_data = [(1, "Alice", "Python,SQL,Databricks"), (2, "Bob", "Java,Spark")] df_skill = spark.createDataFrame(skill_data, ["id", "name", "skills"]) df_skill.withColumn("skill", explode(split("skills", ","))).drop("skills").show()

=== 6. UDF for Salary Category ===

def categorize(salary): if salary < 5000: return "Low" elif salary < 7000: return "Mid" return "High" categorize_udf = udf(categorize, StringType()) df.withColumn("salary_category", categorize_udf("salary")).select("name", "salary", "salary_category").show()

=== 7. Compare Rows (Lag) ===

df.withColumn("prev_salary", lag("salary").over(Window.partitionBy("dept").orderBy("salary")))
.filter(col("salary") > col("prev_salary")).show()

=== 8. Conditional Logic with when() ===

df.withColumn("bonus", when(col("salary") > 7000, col("salary") * 0.10) .when((col("salary") > 5000), col("salary") * 0.05) .otherwise(0)).select("name", "salary", "bonus").show()

=== 9. Detect Missing Dates ===

dates = [("2023-01-01",), ("2023-01-02",), ("2023-01-04",)] df_dates = spark.createDataFrame(dates, ["date"]).withColumn("date", to_date("date")) df_range = spark.sql("SELECT explode(sequence(to_date('2023-01-01'), to_date('2023-01-05'), interval 1 day)) AS date") df_range.join(df_dates, "date", "left_anti").show()

=== 10. Broadcast Join Optimization ===

small_dept_df = spark.createDataFrame([("HR", "Human Resources"), ("IT", "Information Tech"), ("Finance", "Finance Dept")], ["dept", "dept_name"]) df.join(broadcast(small_dept_df), "dept").explain(True)

End of Script