ontology assignment - veeraravi/Spark-notes GitHub Wiki
import org.apache.spark.sql.functions.udf
def fillter_column(prov_all:String,affil_all:String,full_name:String) = { !(prov_all == null && affil_all == null && full_name.startsWith("SERVICE")) }
val columnFilter_udf = udf(fillter_column(:String,:String,_:String))
import org.apache.commons.lang3.StringUtils
def nullToEmpty(name:String) = {
if(null == name) " " else name
}
def newColumn(nppes_provider_first_name:String, nppes_provider_last_name:String, nppes_provider_other_first_name:String,nppes_provider_other_last_name:String, nppes_provider_middle_name:String, full_name:String, is_pay_to:Boolean, nppes_entity_type_code:String): Double = {
if (!is_pay_to || !"1".equals(nppes_entity_type_code)) {
return null
}
val nppes_full_name = nppes_provider_first_name+" "+nppes_provider_last_name
val nppes_other_full_name = nppes_provider_other_first_name+" "+nppes_provider_other_last_name
val nppes_full_with_middle = nppes_provider_first_name+" "+nppes_provider_middle_name+" "+ nppes_provider_last_name
return Math.max(
StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nppes_full_name),
Math.max(StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nppes_other_full_name),
StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nppes_full_with_middle)))
}
val newColumn_udf = udf(newColumn(:String, _:String, _:String,:String, _:String, _:String, _:Boolean, _:String))
val err_prov_df = sqlContext.sql("select * from qnxt.err_provider") val npi_reg_prov_df = sqlContext.sql("select * from qnxt.npi_registry_provider")
val filtered_df = err_prov_df.filter(columnFilter_udf(
var joined_df = filtered_df.join(npi_reg_prov_df,filtered_df("npi") === npi_reg_prov_df("npi"),"left_outer").select(
joined_df = joined_df.withColumn("sole_prop_nppes_name_comp",newColumn_udf(joined_df("nppes_provider_first_name"),joined_df("nppes_provider_last_name"),joined_df("nppes_provider_other_first_name"),joined_df("nppes_provider_other_last_name"),joined_df("nppes_provider_middle_name"),joined_df("full_name"),joined_df("is_pay_to"),joined_df("nppes_entity_type_code")))
val sole_prop_df = sqlContext.sql("select * from qnxt.sole_prop")
val cactus_prov_ent_asgn_df = sqlContext.sql("select * from qnxt.cactus_provider_entity_assignment")
joined_df = joined_df.join(sole_prop_df,joined_df("err_state_id") === sole_prop_df("err_state_id") && joined_df("prov_id") === sole_prop_df("prov_id"),"left_outer").join(cactus_prov_ent_asgn_df,joined_df("err_state_id") === cactus_prov_ent_asgn_df("err_state_id") && joined_df("npi") === cactus_prov_ent_asgn_df("npi"),"left_outer").select(
import org.apache.spark.sql.functions.udf def uppercaseUdf = udfvalue: String) ⇒ { if(value != null && !value.isEmpty( value.toUpperCase() else " " })
joined_df = joined_df.withColumn("cactus_long_name", testUdf(joined_df("cactus_long_name"))).withColumn("cactus_first_name", testUdf(joined_df("cactus_first_name"))).withColumn("cactus_last_name", testUdf(joined_df("cactus_last_name"))).withColumn("cactus_other_first_name", testUdf(joined_df("cactus_other_first_name"))).withColumn("cactus_other_last_name", testUdf(joined_df("cactus_other_last_name")))
def cactusNameSimilarity(cactus_first_name:String, cactus_last_name:String, cactus_other_first_name:String,cactus_other_last_name:String, cactus_long_name:String, ent_first_name:String, ent_last_name:String, full_name:String, npi:String): Double = {
// For pay-to providers, compare the full name in the provider table to the cactus long name.
if (!StringUtils.isNotBlank(ent_first_name) && StringUtils.isNotBlank(npi) ) {
return StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nullToEmpty(cactus_long_name))
}
//For rendering providers, compare the first and last names separately to the cactus first and last names, and take the average.
// Consider both the regular and other name entries, take the higher score. FUTURE--> see if including the middle name improves the score.
if (StringUtils.isNotBlank(ent_first_name) && StringUtils.isNotBlank(npi) ) {
val regular_name_avg = (StringUtils.getJaroWinklerDistance(nullToEmpty(ent_first_name), nullToEmpty(cactus_first_name)) + StringUtils.getJaroWinklerDistance(nullToEmpty(ent_last_name), nullToEmpty(cactus_last_name)))/2
val other_name_avg = (StringUtils.getJaroWinklerDistance(nullToEmpty(ent_first_name), nullToEmpty(cactus_other_first_name)) + StringUtils.getJaroWinklerDistance(nullToEmpty(ent_last_name), nullToEmpty(cactus_other_last_name)))/2
return Math.max(regular_name_avg, other_name_avg)
}
}
val cactusNameSimilarity_udf = udf(cactusNameSimilarity(:String, _:String, _:String,:String, _:String, _:String, _:String, _:String, _:String))
joined_df = joined_df.withColumn("cactus_name_similarity",cactusNameSimilarity_udf(joined_df("cactus_first_name"),joined_df("cactus_last_name"),joined_df("cactus_other_first_name"),joined_df("cactus_other_last_name"),joined_df("ent_first_name"),joined_df("ent_last_name"),joined_df("full_name"),joined_df("npi")))
val deduped_oig_leie_df = sqlContext.sql("select * from external_db.deduped_oig_leie") val type_splty_crosswalk_df = sqlContext.sql("select * from external_db.type_specialty_crosswalk")
joined_df = joined_df.join(deduped_oig_leie_df,joined_df("npi") === deduped_oig_leie_df("npi"),"left_outer").join(type_splty_crosswalk_df,joined_df("err_state_id") === type_splty_crosswalk_df("err_state_id") && joined_df("rules_prov_type") === type_splty_crosswalk_df("rules_prov_type") && joined_df("primary_specialty_code") === type_splty_crosswalk_df("primary_specialty_code"),"left_outer").join(type_splty_crosswalk_df,joined_df("err_state_id") === type_splty_crosswalk_df("err_state_id") && joined_df("rules_prov_type") === type_splty_crosswalk_df("rules_prov_type") && joined_df("nppes_primary_taxonomy_code") === type_splty_crosswalk_df("nppes_primary_taxonomy_code"),"left_outer").select(
joined_df.write.mode("overwrite").saveAsTable("qnxt_db.provider")