assignment 4 - veeraravi/Spark-notes GitHub Wiki

import sqlContext.implicits._ import org.apache.spark.sql.functions._

val providersDf = sqlContext.sql("select * from retail_db.Providers") providersDf.drop(providersDf.col("SSN")) providersDf.registerTempTable("providers_temp")

providersDf.write.mode("overwrite").saveAsTable("retail_db.Providers_trans")

var ea_df = sqlContext.sql("select * from entity_assignment").filter(ea_df["active"] == True).filter(ea_df("record_type")  !== 'E');
ea_df.registerTempTable("entity_assignment_tmp");
val xwalk_Df = sqlContext.sql("select * from cactus_qnxt_state_crosswalk");
xwalk_Df.registerTempTable("cactus_qnxt_state_crosswalk_tmp");
val refDf =  sqlContext.sql("select * from reftable");
refDf.registerTempTable("refTable_tmp");
//xwalk_Df = xwalk_Df.withColumnRenamed("state_id", "cactus_state_id");
//var temp_join_df = ea_df.alias("e_assignment").join(xwalk_Df,seq("entity_k"),"left_outer" ).select("e_assignment.*,xwalk_Df($"state_id")")
var temp_join_df = sqlContext.sql("""select es.*, cqsc.state_id as cactus_state_id from cactus_qnxt_state_crosswalk_tmp cqsc left join entity_assignment_tmp es on es.entity_k = cqsc.entity_k""");
val cactus_state_id_list = List("CA","FL","IL","MI","NM","NY","OH","PR","SC","TX","UT","WA","WI");
temp_join_df = temp_join_df.filter(temp_join_df("cactus_state_id".isin(cactus_state_id_list:_*)));
temp_join_df.registerTempTable("temp_join");
temp_join_df = sqlContext.sql("""select es.*,  ref.description as entity_description from reftable ref left join temp_join es on es.assignment_rtk = ref.reftable_k""")
temp_join_df = temp_join_df.filter(temp_join_df["entity_description"].contains("Molina Healthcare"));
//temp_join_df = temp_join_df.join(temp2,temp2("reftable_k") === ea_df("assignment_rtk"),"left_outer").filter(temp2["entity_description"].contains("Molina Healthcare"))
temp_join_df = sqlContext.sql("""select es.*,  ref.description as credentialing_status from reftable ref left join temp_join es on es.assignment_rtk = ref.reftable_k""")
temp_join_df = temp_join_df.filter(temp_join_df["credentialing_status"] !== "Initial Applicant");
//filter "credentialing_status", exclude: ["Initial Applicant"]
//filter "last_credentialing_date", exclude: [null]
temp_join_df = temp_join_df.alias("final_table").filter("last_credentialing_date is not null").select("final_table.*");
temp_join_df.registerTempTable("final_result");
val result = sqlContext.sql(""" select distinct provider_k, cactus_state_id, entity_description, credentialing_status from final_result""")
final_result.write.mode("overwrite").saveAsTable("retail_db.entity_assignment_cleaned");
val providerDf = sqlContext.sql("select * from retail_db.Provider").filter(providerDf["active"] == True);
val eac_df = sqlContext.sql("select * from entity_assignment_cleaned");
var result = providerDf.join(eac_df,seq("provider_k"),"left_outer")
⚠️ **GitHub.com Fallback** ⚠️