bigger affiliation - veeraravi/Spark-notes GitHub Wiki
/From 67
import org.apache.spark.sql.functions.udf
def convertLower(id:String) = if("null".equals(id) ||(id == null)) null else id.toLowerCase
val convertLower_udf = udf(convertLower(_:String))
def nullCheck(id:String) = if(id != null && !id.isEmpty) id else null
val makeEmptyStringsNull = udf(nullCheck(_:String))
val provider_df = sqlContext.sql("select * from qnxt_db.provider_txn");
var finalDf = provider_df.withColumn("status",convertLower_udf(provider_df("status"))).withColumn("fullname",makeEmptyStringsNull(provider_df("fullname"))).withColumn("npi",makeEmptyStringsNull(provider_df("npi"))).withColumn("sex",makeEmptyStringsNull(provider_df("sex"))).withColumn("ssn",makeEmptyStringsNull(provider_df("ssn"))).withColumn("specialtycode",makeEmptyStringsNull(provider_df("specialtycode"))).withColumn("provtype",makeEmptyStringsNull(provider_df("provtype"))).withColumn("entityid",makeEmptyStringsNull(provider_df("entityid")))
finalDf.write.mode("overwrite").saveAsTable("qnxt_db.clean_provider")
===============//from 44
val prov_spec_df = sqlContext.sql("select * from qnxt_db.provspecialty_txn");
var prov_spec_result_df = prov_spec_df.groupBy(col("state"),col("provid")).agg(countDistinct("specialtycode").alias("specialty_codes"));
prov_spec_result_df.write.mode("overwrite").saveAsTable("qnxt_db.provider_specialties")
var err_prov_df = sqlContext.sql("select * from qnxt_db.provider_txn"); val entity_df = sqlContext.sql("select * from qnxt_db.entity_txn");
val provider_type_df = sqlContext.sql("select * from qnxt_db.providertype_txn"); val prov_spec_df2 = sqlContext.sql("select * from qnxt_db.provider_specialties");
val joined_df = err_prov_df.join(entity_df,err_prov_df("state") === entity_df("state") && err_prov_df("entityid") === entity_df("entid"),"left_outer").join(provider_type_df,err_prov_df("state") === provider_type_df("state") && err_prov_df("provtype") === provider_type_df("provtype"),"left_outer").join(prov_spec_df2,err_prov_df("state") === prov_spec_df2("state") && err_prov_df("provid") === prov_spec_df2("provid"),"left_outer").select(entity_df("addr1"), entity_df("addr2"), entity_df("city"), entity_df("state"), entity_df("zip"), entity_df("phone"), entity_df("phycounty"), entity_df("phyaddr1"),entity_df("phyaddr2"),entity_df("phycity"),entity_df("phystate"),entity_df("phyzip"),provider_type_df("description").as("prov_type_description"), provider_type_df("provclass").as("prov_class"),prov_spec_df2("specialty_codes"))
joined_df.write.mode("overwrite").saveAsTable("qnxt_db.err_provider")
import org.apache.spark.sql.functions._
sqlContext.udf.register("capitalise_id_udf", (id: String) ⇒ if(id.equals("0")) null else id.toUpperCase)
val affiliation_df = sqlContext.sql("select * from qnxt_db.affiliation_txn")
affiliation_df.registerTempTable("affiliation_tmp")
val affiliation_finalDF = sqlContext.sql("""SELECT capitalise_id_udf(affiliate_id), capitalise_id_udf(prov_id) from affiliation_tmp""" )
affiliation_finalDF.drop("affiliation_tmp").write.mode("overwrite").saveAsTable("qnxt_db.affiliation_transform")
====//from 70 (44,77)
import java.util.Date;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.TimeZone;
import org.apache.spark.sql.functions._
implicit def date2timestamp(date: java.util.Date) = new java.sql.Timestamp(date.getTime)
val date = new java.util.Date
val timestamp: java.sql.Timestamp = date
import org.apache.spark.sql.functions._;
sqlContext.udf.register("nullCheck", (id: String) ⇒ if(id.equals("null")) 0 else 1)
val affiliationDF = sqlContext.sql("select * from qnxt_db.affiliation_txn");
val result1 = affiliationDF.withColumn("active_affiliation_flag", when($"affiliationid".isNull, 0).otherwise(1))
val filtered = result1.filter(result1("effdate")> timestamp && result1("termdate") < timestamp)
var result2 = filtered.groupBy(col("err_state_id"),col("prov_id")).agg(round(sum("active_affiliation_flag"),2).alias("affils_as_provider_active_count"),countDistinct("affil_type").alias("affils_as_provider_active"),countDistinct("payflag").alias("affils_payflag_active"));
result2.write.mode("overwrite").saveAsTable("qnxt_db.affil_type_as_provider_active")
var result3 = filtered.groupBy(col("err_state_id"),col("affiliate_id")).agg(round(sum("active_affiliation_flag"),2).alias("affils_as_affiliate_active_count"),countDistinct("affil_type").alias("affils_as_affiliate_active"));
result3.write.mode("overwrite").saveAsTable("qnxt_db.affil_type_as_affiliate_active")
var result4 = affiliationDF.groupBy(col("err_state_id"),col("prov_id")).agg(countDistinct("affil_type").alias("affils_as_provider_all"),countDistinct("payflag").alias("affils_payflag_all"));
result4.write.mode("overwrite").saveAsTable("qnxt_db.affil_type_as_provider_all")
var result5 = affiliationDF.groupBy(col("err_state_id"),col("affiliate_id")).agg(countDistinct("affil_type").alias("affils_as_affiliate_all"));
result5.write.mode("overwrite").saveAsTable("qnxt_db.affil_type_as_affiliate_all")
import org.apache.spark.sql.functions.udf implicit def date2timestamp(date: java.util.Date) = new java.sql.Timestamp(date.getTime)
val date = new java.util.Date
val timestamp: java.sql.Timestamp = date
def getTimestamp(date:Any) :java.sql.Timestamp = { val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss") if (date.toString() == "") return null else { val d = format.parse(date.toString()); val t = new Timestamp(d.getTime()); return t } }
def dateDiff(start:String,end:String):Int={
val d1 = getTimestamp(start)
val d2 = getTimestamp(end)
val result = timestamp.after(d1) && timestamp.before(d2);
if(result)
return 1
else
return 0
}
val dateDiff_udf = udf(dateDiff(:String,:String))
var affiliationDF = sqlContext.sql("select * from qnxt_db.affiliation") affiliationDF = affiliationDF.filter("affil_type is not null") affiliationDF = affiliationDF.withColumn("affil_type", upper(col("affil_type")))
var result6 =affiliationDF.withColumn("active_affiliation_flag",dateDiff_udf(affiliationDF(col("effdate"),col("termdate"))))
result6.write.mode("overwrite").saveAsTable("qnxt_db.affiliation_standardized")
var aff_stand_df = sqlContext.sql("select * from affiliation_standardized").groupBy(col("err_state_id"),col("affiliate_id"),col("affil_type")).agg(round(sum("active_affiliation_flag"),2).alias("active_affils_as_affiliate_count_by_type"),aff_stand_df.count().alias("affils_as_affiliate_count_by_type"));
import org.apache.spark.sql.functions.udf
val newColumn_udf = udf((type:String,active_count:String,all_count:String) ⇒ type match {
case "SERVICE" | "GROUP" | "NETWORK" | "DIRECT" ⇒ Tuple2.of(Long.valueOf(active_count), Long.valueOf(all_count))
case _ ⇒ Tuple2.of(0L, 0L)
})
aff_stand_df = aff_stand_df.withColumn("dummyColumn",newColumn_udf(aff_stand_df("affil_type"),aff_stand_df("active_affils_as_affiliate_count_by_type"),aff_stand_df("affils_as_affiliate_count_by_type"))) .withColumn("active_service_affiliations_as_affiliate", col("dummyColumn._1")) .withColumn("service_affiliations_as_affiliate", col("dummyColumn._2")).drop("dummyColumn")
aff_stand_df = aff_stand_df.withColumn("dummyColumn",newColumn_udf(aff_stand_df("affil_type"),aff_stand_df("active_affils_as_affiliate_count_by_type"),aff_stand_df("affils_as_affiliate_count_by_type"))).withColumn("active_group_affiliations_as_affiliate", col("dummyColumn._1")) .withColumn("group_affiliations_as_affiliate", col("dummyColumn._2")).drop("dummyColumn")
aff_stand_df = aff_stand_df.withColumn("dummyColumn",newColumn_udf(aff_stand_df("affil_type"),aff_stand_df("active_affils_as_affiliate_count_by_type"),aff_stand_df("affils_as_affiliate_count_by_type"))) .withColumn("active_network_affiliations_as_affiliate", col("dummyColumn._1")) .withColumn("network_affiliations_as_affiliate", col("dummyColumn._2")).drop("dummyColumn")
aff_stand_df = aff_stand_df.withColumn("dummyColumn",newColumn_udf(aff_stand_df("affil_type"),aff_stand_df("active_affils_as_affiliate_count_by_type"),aff_stand_df("affils_as_affiliate_count_by_type"))).withColumn("active_direct_affiliations_as_affiliate", col("dummyColumn._1")) .withColumn("direct_affiliations_as_affiliate", col("dummyColumn._2")).drop("dummyColumn")
aff_stand_df = aff_stand_df.groupBy(col("err_state_id"),col("affiliate_id")).agg(sum("active_service_affiliations_as_affiliate")).alias("active_service_affiliations_as_affiliate_count").agg(sum("service_affiliations_as_affiliate")).alias("service_affiliations_as_affiliate_count").agg(sum("active_group_affiliations_as_affiliate")).alias("active_group_affiliations_as_affiliate_count").agg(sum("group_affiliations_as_affiliate")).alias("group_affiliations_as_affiliate_count").agg(sum("active_network_affiliations_as_affiliate")).alias("active_network_affiliations_as_affiliate_count").agg(sum("network_affiliations_as_affiliate")).alias("network_affiliations_as_affiliate_count").agg(sum("active_direct_affiliations_as_affiliate")).alias("active_direct_affiliations_as_affiliate_count").agg(sum("direct_affiliations_as_affiliate")).alias("direct_affiliations_as_affiliate_count")
aff_stand_df.write.mode("overwrite").saveAsTable("qnxt_db.affiliation_as_affiliate_count_by_type")
var aff_prov_df = sqlContext.sql("select * from affiliation_standardized").groupBy(col("err_state_id"),col("prov_id"),col("affil_type")).agg(round(sum("active_affiliation_flag"),2).alias("active_affils_as_provider_count_by_type"),aff_prov_df.count().alias("affils_as_provider_count_by_type"));
import org.apache.spark.sql.functions.udf
val newColumn_udf = udf((type:String,active_count:Long,all_count:Long) ⇒ type match {
case "SERVICE" | "GROUP" | "NETWORK" | "DIRECT" ⇒ Tuple2.of(Long.valueOf(active_count), Long.valueOf(all_count))
case _ ⇒ Tuple2.of(0L, 0L) })
aff_prov_df = aff_prov_df.withColumn("dummyColumn",newColumn_udf(aff_prov_df("affil_type"),aff_prov_df("active_affils_as_provider_count_by_type"),aff_prov_df("affils_as_provider_count_by_type"))) .withColumn("active_service_affiliations_as_provider", col("dummyColumn._1")).withColumn("service_affiliations_as_provider", col("dummyColumn._2")).drop("dummyColumn")
aff_prov_df = aff_prov_df.withColumn("dummyColumn",newColumn_udf(aff_prov_df("affil_type"),aff_prov_df("active_affils_as_provider_count_by_type"),aff_prov_df("affils_as_provider_count_by_type"))).withColumn("active_group_affiliations_as_provider", col("dummyColumn._1")).withColumn("group_affiliations_as_provider", col("dummyColumn._2")) .drop("dummyColumn")
aff_prov_df = aff_prov_df.withColumn("dummyColumn",newColumn_udf(aff_prov_df("affil_type"),aff_prov_df("active_affils_as_provider_count_by_type"),aff_prov_df("affils_as_provider_count_by_type"))).withColumn("active_network_affiliations_as_provider", col("dummyColumn._1")).withColumn("network_affiliations_as_provider", col("dummyColumn._2")).drop("dummyColumn")
aff_prov_df = aff_prov_df.withColumn("dummyColumn",newColumn_udf(aff_prov_df("affil_type"),aff_prov_df("active_affils_as_provider_count_by_type"),aff_prov_df("affils_as_provider_count_by_type"))).withColumn("active_direct_affiliations_as_provider", col("dummyColumn._1")).withColumn("direct_affiliations_as_provider", col("dummyColumn._2")).drop("dummyColumn")
aff_prov_df = aff_prov_df.groupBy(col("err_state_id"),col("prov_id")).agg(sum("active_service_affiliations_as_provider")).alias("active_service_affiliations_as_provider_count").agg(sum("service_affiliations_as_provider")).alias("service_affiliations_as_provider_count").agg(sum("active_group_affiliations_as_provider")).alias("active_group_affiliations_as_provider_count").agg(sum("group_affiliations_as_provider")).alias("group_affiliations_as_provider_count").agg(sum("active_network_affiliations_as_provider")).alias("active_network_affiliations_as_provider_count").agg(sum("network_affiliations_as_provider")).alias("network_affiliations_as_provider_count").agg(sum("active_direct_affiliations_as_provider")).alias("active_direct_affiliations_as_provider_count").agg(sum("direct_affiliations_as_provider")).alias("direct_affiliations_as_provider_count")
aff_prov_df.write.mode("overwrite").saveAsTable("qnxt_db.affiliation_as_provider_count_by_type")
var err_prov = sqlContext.sql("select * from err_provider") var aff_prov_active_df = sqlContext.sql("select * from affil_type_as_provider_active")
var affil_aff_active_df = sqlContext.sql("select * from affil_type_as_affiliate_active")
var affil_prov_all_df = sqlContext.sql("select * from affil_type_as_provider_all")
var affil_aff_all_df = sqlContext.sql("select * from affil_type_as_affiliate_all")
var aff_aff_count_df = sqlContext.sql("select * from affiliation_as_affiliate_count_by_type")
var aff_prov_count_df = sqlContext.sql("select * from affiliation_as_provider_count_by_type")
var joined_df = err_prov.join(aff_prov_active_df,err_prov("err_state_id") === aff_prov_active_df("err_state_id") && err_prov("prov_id") === aff_prov_active_df("prov_id"),"left_outer").join(affil_aff_active_df,err_prov("err_state_id") === affil_aff_active_df("err_state_id") && err_prov("affiliate_id") === affil_aff_active_df("affiliate_id"),"left_outer").join(affil_prov_all_df,err_prov("err_state_id") === affil_prov_all_df("err_state_id") && err_prov("prov_id") === affil_prov_all_df("prov_id"),"left_outer").join(affil_aff_all_df,err_prov("err_state_id") === affil_aff_all_df("err_state_id") && err_prov("affiliate_id") === affil_aff_all_df("affiliate_id"),"left_outer").join(aff_aff_count_df,err_prov("err_state_id") === aff_aff_count_df("err_state_id") && err_prov("affiliate_id") === aff_aff_count_df("affiliate_id"),"left_outer").join(aff_prov_count_df,err_prov("err_state_id") === aff_prov_count_df("err_state_id") && err_prov("prov_id") === aff_prov_count_df("prov_id"),"left_outer").select(aff_prov_active_df("affils_as_provider_active"), aff_prov_active_df("affils_payflag_active"), aff_prov_active_df("affils_as_provider_active_count"),affil_aff_active_df("affils_as_affiliate_active"), affil_aff_active_df("affils_as_affiliate_active_count"),affil_prov_all_df("affils_as_provider_all"), affil_prov_all_df("affils_payflag_all"),affil_aff_all_df("affils_as_affiliate_all"),aff_aff_count_df("active_service_affiliations_as_affiliate_count"), aff_aff_count_df("service_affiliations_as_affiliate_count"), aff_aff_count_df("active_group_affiliations_as_affiliate_count"), aff_aff_count_df("group_affiliations_as_affiliate_count"), aff_aff_count_df("active_network_affiliations_as_affiliate_count"), aff_aff_count_df("network_affiliations_as_affiliate_count"), aff_aff_count_df("active_direct_affiliations_as_affiliate_count"), aff_aff_count_df("direct_affiliations_as_affiliate_count"),aff_prov_count_df("active_service_affiliations_as_provider_count"), aff_prov_count_df("service_affiliations_as_provider_count"), aff_prov_count_df("active_group_affiliations_as_provider_count"), aff_prov_count_df("group_affiliations_as_provider_count"), aff_prov_count_df("active_network_affiliations_as_provider_count"), aff_prov_count_df("network_affiliations_as_provider_count"), aff_prov_count_df("active_direct_affiliations_as_provider_count"), aff_prov_count_df("direct_affiliations_as_provider_count"))
val newColumn_sum_udf = udfa:String,b:String) ⇒ (if(!"null".equals(a) && !a.isEmpty) a.toLong else 0L) + (if(!"null".equals(b) && !b.isEmpty) b.toLong else 0L
err_prov = err_prov.withColumn("active_service_affiliations_count",newColumn_udf(err_prov("active_service_affiliations_as_affiliate_count"),err_prov("active_service_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("service_affiliations_count",newColumn_udf(err_prov("service_affiliations_as_affiliate_count"),err_prov("service_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("active_group_affiliations_count",newColumn_udf(err_prov("active_group_affiliations_as_affiliate_count"),err_prov("active_group_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("group_affiliations_count",newColumn_udf(err_prov("group_affiliations_as_affiliate_count"),err_prov("group_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("active_network_affiliations_count",newColumn_udf(err_prov("active_network_affiliations_as_affiliate_count"),err_prov("active_network_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("network_affiliation_count",newColumn_udf(err_prov("network_affiliations_as_affiliate_count"),err_prov("network_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("active_direct_affiliations_count",newColumn_udf(err_prov("active_direct_affiliations_as_affiliate_count"),err_prov("active_direct_affiliations_as_provider_count")))
err_prov = err_prov.withColumn("direct_affiliations_count",newColumn_udf(err_prov("direct_affiliations_as_affiliate_count"),err_prov("direct_affiliations_as_provider_count")))
val colsToRemove = Seq("active_service_affiliations_as_affiliate_count", "active_service_affiliations_as_provider_count", "service_affiliations_as_affiliate_count", "service_affiliations_as_provider_count", "active_group_affiliations_as_affiliate_count", "active_group_affiliations_as_provider_count","group_affiliations_as_affiliate_count", "group_affiliations_as_provider_count", "active_network_affiliations_as_affiliate_count", "active_network_affiliations_as_provider_count", "network_affiliations_as_affiliate_count", "network_affiliations_as_provider_count", "active_direct_affiliations_as_affiliate_count", "active_direct_affiliations_as_provider_count", "direct_affiliations_as_affiliate_count", "direct_affiliations_as_provider_count")
val filteredDF = err_prov.select(err_prov.columns .filter(colName ⇒ !colsToRemove.contains(colName)).map(colName ⇒ new Column(colName)): _*)
filteredDF.write.mode("overwrite").saveAsTable("qnxt_db.provider_affiliation")