ontology joine assignment - veeraravi/Spark-notes GitHub Wiki

import org.apache.spark.sql.functions.udf

def fillter_column(prov_all:String,affil_all:String,full_name:String) = { !(prov_all == null && affil_all == null && full_name.startsWith("SERVICE")) }

val columnFilter_udf = udf(fillter_column(:String,:String,_:String))

import org.apache.commons.lang3.StringUtils

 def nullToEmpty(name:String) = {
 if(null == name) " " else name
}
def newColumn(nppes_provider_first_name:String, nppes_provider_last_name:String, nppes_provider_other_first_name:String,nppes_provider_other_last_name:String, nppes_provider_middle_name:String, full_name:String, is_pay_to:Boolean, nppes_entity_type_code:String): Double = {
if (!is_pay_to || !"1".equals(nppes_entity_type_code)) {
	return null
}
val nppes_full_name = nppes_provider_first_name+" "+nppes_provider_last_name
   val nppes_other_full_name = nppes_provider_other_first_name+" "+nppes_provider_other_last_name
   val nppes_full_with_middle = nppes_provider_first_name+" "+nppes_provider_middle_name+" "+ nppes_provider_last_name
return Math.max(
           StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nppes_full_name),
           Math.max(StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nppes_other_full_name),
           StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nppes_full_with_middle)))

}

val newColumn_udf = udf(newColumn(:String, _:String, _:String,:String, _:String, _:String, _:Boolean, _:String))

val err_prov_df = sqlContext.sql("select * from qnxt.err_provider") val npi_reg_prov_df = sqlContext.sql("select * from qnxt.npi_registry_provider")

val filtered_df = err_prov_df.filter(columnFilter_udf($"affils_as_provider_all",$"affils_as_affiliate_all",$"full_name"))

var joined_df = filtered_df.as("filteredTable").join(npi_reg_prov_df.as("npi_reg_prov_table"),filtered_df("npi") === npi_reg_prov_df("npi"),"left_outer").select($"filteredTable.*",$"npi_reg_prov_table.npi_reg_prov_table.npi".alias("nppes_npi"), $"npi_reg_prov_table.npi_reg_prov_table.npi_last_update_date".alias("nppes_npi_last_update_date"), $"npi_reg_prov_table.npi_reg_prov_table.npi_deactivation_reason_code".alias("nppes_npi_deactivation_reason_code"), $"npi_reg_prov_table.npi_reg_prov_table.npi_deactivation_reason".alias("nppes_npi_deactivation_reason"), $"npi_reg_prov_table.npi_deactivation_date".alias("nppes_npi_deactivation_date"), $"npi_reg_prov_table.npi_reactivation_date".alias("nppes_npi_reactivation_date"), $"npi_reg_prov_table.provider_enumeration_date".alias("nppes_provider_enumeration_date"), $"npi_reg_prov_table.provider_gender_code".alias("nppes_provider_gender_code"), $"npi_reg_prov_table.entity_type_code".alias("nppes_entity_type_code"), $"npi_reg_prov_table.entity_type".alias("nppes_entity_type"), $"npi_reg_prov_table.provider_organization_name_legal_business_name".alias("nppes_provider_organization_name_legal_business_name"), $"npi_reg_prov_table.provider_other_organization_name".alias("nppes_provider_other_organization_name"), $"npi_reg_prov_table.provider_other_organization_name_type_code".alias("nppes_provider_other_organization_name_type_code"), $"npi_reg_prov_table.provider_other_organization_name_type".alias("nppes_provider_other_organization_name_type"), $"npi_reg_prov_table.provider_last_name".alias("nppes_provider_last_name"), $"npi_reg_prov_table.provider_first_name".alias("nppes_provider_first_name"), $"npi_reg_prov_table.provider_middle_name".alias("nppes_provider_middle_name"), $"npi_reg_prov_table.provider_other_last_name".alias("nppes_provider_other_last_name"), $"npi_reg_prov_table.provider_other_first_name".alias("nppes_provider_other_first_name"), $"npi_reg_prov_table.provider_other_middle_name".alias("nppes_provider_other_middle_name"), $"npi_reg_prov_table.authorized_official_last_name".alias("nppes_authorized_official_last_name"), $"npi_reg_prov_table.authorized_official_first_name".alias("nppes_authorized_official_first_name"), $"npi_reg_prov_table.authorized_official_middle_name".alias("nppes_authorized_official_middle_name"), $"npi_reg_prov_table.authorized_official_title_or_position".alias("nppes_authorized_official_title_or_position"), $"npi_reg_prov_table.authorized_official_telephone_number".alias("nppes_authorized_official_telephone_number"), $"npi_reg_prov_table.is_sole_proprietor".alias("nppes_is_sole_proprietor"), $"npi_reg_prov_table.is_organization_subpart".alias("nppes_is_organization_subpart"), $"npi_reg_prov_table.parent_organization_lbn".alias("nppes_parent_organization_lbn"), $"npi_reg_prov_table.parent_organization_tin".alias("nppes_parent_organization_tin"), $"npi_reg_prov_table.provider_first_line_business_mailing_address".alias("nppes_provider_first_line_business_mailing_address"), $"npi_reg_prov_table.provider_second_line_business_mailing_address".alias("nppes_provider_second_line_business_mailing_address"), $"npi_reg_prov_table.provider_business_mailing_address_city_name".alias("nppes_provider_business_mailing_address_city_name"), $"npi_reg_prov_table.provider_business_mailing_address_state_name".alias("nppes_provider_business_mailing_address_state_name"), $"npi_reg_prov_table.provider_business_mailing_address_postal_code".alias("nppes_provider_business_mailing_address_postal_code"), $"npi_reg_prov_table.provider_business_mailing_address_telephone_number".alias("nppes_provider_business_mailing_address_telephone_number"), $"npi_reg_prov_table.provider_business_mailing_address_fax_number".alias("nppes_provider_business_mailing_address_fax_number"), $"npi_reg_prov_table.provider_address_1".alias("nppes_provider_address_1"), $"npi_reg_prov_table.provider_address_2".alias("nppes_provider_address_2"), $"npi_reg_prov_table.provider_city".alias("nppes_provider_city"), $"npi_reg_prov_table.provider_state".alias("nppes_provider_state"), $"npi_reg_prov_table.provider_zip".alias("nppes_provider_zip"), $"npi_reg_prov_table.provider_business_practice_location_address_telephone_number".alias("nppes_provider_business_practice_location_address_telephone_number"), $"npi_reg_prov_table.provider_business_practice_location_address_fax_number".alias("nppes_provider_business_practice_location_address_fax_number"), $"npi_reg_prov_table.provider_taxonomies".alias("nppes_provider_taxonomies"), $"npi_reg_prov_table.primary_taxonomy_code".alias("nppes_primary_taxonomy_code"), $"npi_reg_prov_table.provider_grouping".alias("nppes_provider_grouping"), $"npi_reg_prov_table.provider_class".alias("nppes_provider_class"), $"npi_reg_prov_table.provider_specialty".alias("nppes_provider_specialty"))

joined_df = joined_df.withColumn("sole_prop_nppes_name_comp",newColumn_udf(joined_df("nppes_provider_first_name"),joined_df("nppes_provider_last_name"),joined_df("nppes_provider_other_first_name"),joined_df("nppes_provider_other_last_name"),joined_df("nppes_provider_middle_name"),joined_df("full_name"),joined_df("is_pay_to"),joined_df("nppes_entity_type_code")))

val sole_prop_df = sqlContext.sql("select * from qnxt.sole_prop")

val cactus_prov_ent_asgn_df = sqlContext.sql("select * from qnxt.cactus_provider_entity_assignment")

joined_df = joined_df.as("joined_table").join(sole_prop_df.as("solePropTable"),joined_df("err_state_id") === sole_prop_df("err_state_id") && joined_df("prov_id") === sole_prop_df("prov_id"),"left_outer").select("$joined_table.*",$"solePropTable.sole_prop")

joined_df = joined_df.as("joined_table").join(cactus_prov_ent_asgn_df.as("cpeaTable"),joined_df("err_state_id") === cactus_prov_ent_asgn_df("err_state_id") && joined_df("npi") === cactus_prov_ent_asgn_df("npi"),"left_outer").select($"cpeaTable.joined_table.*", $"cpeaTable.cpeaTable.long_name".alias("cactus_long_name"), $"cpeaTable.cpeaTable.first_name".alias("cactus_first_name"), $"cpeaTable.cpeaTable.last_name".alias("cactus_last_name"), $"cpeaTable.cpeaTable.other_name_first".alias("cactus_other_first_name"), $"cpeaTable.cpeaTable.other_name_last".alias("cactus_other_last_name"), $"cpeaTable.active".alias("cactus_active"), $"cpeaTable.caqh".alias("cactus_caqh"), $"cpeaTable.provider_k".alias("cactus_id"), $"cpeaTable.hippa_taxonomy_rtk".alias("cactus_specialty_code"), $"cpeaTable.date_of_birth".alias("cactus_dob"), $"cpeaTable.sex".alias("cactus_sex"), $"cpeaTable.medicare_number".alias("cactus_medicare_number"), $"cpeaTable.medicaid_number".alias("cactus_medicaid_number"), $"cpeaTable.taxid_number".alias("cactus_fedid"), $"cpeaTable.entity_description".alias("cactus_credentialing_body"), $"cpeaTable.credentialing_status".alias("cactus_credentialing_status"))

import org.apache.spark.sql.functions.udf def uppercaseUdf = udfvalue: String) ⇒ { if(value != null && !value.isEmpty( value.toUpperCase() else " " })

joined_df = joined_df.withColumn("cactus_long_name", testUdf(joined_df("cactus_long_name"))).withColumn("cactus_first_name", testUdf(joined_df("cactus_first_name"))).withColumn("cactus_last_name", testUdf(joined_df("cactus_last_name"))).withColumn("cactus_other_first_name", testUdf(joined_df("cactus_other_first_name"))).withColumn("cactus_other_last_name", testUdf(joined_df("cactus_other_last_name")))

def cactusNameSimilarity(cactus_first_name:String, cactus_last_name:String, cactus_other_first_name:String,cactus_other_last_name:String, cactus_long_name:String, ent_first_name:String, ent_last_name:String, full_name:String, npi:String): Double = {

// For pay-to providers, compare the full name in the provider table to the cactus long name.
if (!StringUtils.isNotBlank(ent_first_name) && StringUtils.isNotBlank(npi) ) {
    return StringUtils.getJaroWinklerDistance(nullToEmpty(full_name), nullToEmpty(cactus_long_name))
}
    //For rendering providers, compare the first and last names separately to the cactus first and last names, and take the average.
    // Consider both the regular and other name entries, take the higher score. FUTURE--> see if including the middle name improves the score.
    if (StringUtils.isNotBlank(ent_first_name) && StringUtils.isNotBlank(npi) ) {
        val regular_name_avg = (StringUtils.getJaroWinklerDistance(nullToEmpty(ent_first_name), nullToEmpty(cactus_first_name)) + StringUtils.getJaroWinklerDistance(nullToEmpty(ent_last_name), nullToEmpty(cactus_last_name)))/2
        val other_name_avg = (StringUtils.getJaroWinklerDistance(nullToEmpty(ent_first_name), nullToEmpty(cactus_other_first_name)) + StringUtils.getJaroWinklerDistance(nullToEmpty(ent_last_name), nullToEmpty(cactus_other_last_name)))/2
        return Math.max(regular_name_avg, other_name_avg)
    }
}

val cactusNameSimilarity_udf = udf(cactusNameSimilarity(:String, _:String, _:String,:String, _:String, _:String, _:String, _:String, _:String))

joined_df = joined_df.withColumn("cactus_name_similarity",cactusNameSimilarity_udf(joined_df("cactus_first_name"),joined_df("cactus_last_name"),joined_df("cactus_other_first_name"),joined_df("cactus_other_last_name"),joined_df("ent_first_name"),joined_df("ent_last_name"),joined_df("full_name"),joined_df("npi")))

val deduped_oig_leie_df = sqlContext.sql("select * from external_db.deduped_oig_leie") val type_splty_crosswalk_df = sqlContext.sql("select * from external_db.type_specialty_crosswalk")

joined_df = joined_df.as("joined_table").join(deduped_oig_leie_df.as("dol"),joined_df("npi") === deduped_oig_leie_df("npi"),"left_outer").select($"joined_table.*",$"dol.excl_type".alias("oig_excl_type"), $"dol.excl_date".alias("oig_excl_date"), $"dol.rein_date".alias("oig_rein_date"), $"dol.waiver_date".alias("oig_waiver_date"), $"dol.wvr_state".alias("oig_wvr_state"))

joined_df = joined_df.as("joined_table").join(type_splty_crosswalk_df.as("tscw"),joined_df("err_state_id") === type_splty_crosswalk_df("state_id") && joined_df("rules_prov_type") === type_splty_crosswalk_df("qnxt_type_code") && joined_df("primary_specialty_code") === type_splty_crosswalk_df("qnxt_specialty_code"),"left_outer")select($"joined_table.*",$"tscw.tscw.baseline_crosswalk_code".alias("rules_baseline_crosswalk_code"), $"tscw.baseline_crosswalk_description".alias("rules_baseline_crosswalk_description"), $"tscw.record_type".alias("rules_record_type"), $"tscw.can_be_sole_proprietor".alias("rules_can_be_sole_proprietor"), $"tscw.credentialing_required".alias("rules_credentialing_required"), $"tscw.direct_affil".alias("rules_direct_affil"), $"tscw.group_affil".alias("rules_group_affil"), $"tscw.notes".alias("rules_notes"))

joined_df = joined_df.as("joined_table").join(type_splty_crosswalk_df.as("tpcw"),joined_df("err_state_id") === type_splty_crosswalk_df("state_id") && joined_df("rules_prov_type") === type_splty_crosswalk_df("qnxt_type_code") && joined_df("nppes_primary_taxonomy_code") === type_splty_crosswalk_df("qnxt_specialty_code"),"left_outer").select($"joined_table.*", $"tpcw.baseline_crosswalk_code".alias("nppes_rules_baseline_crosswalk_code"), $"tpcw.baseline_crosswalk_description".alias("nppes_rules_baseline_crosswalk_description"), $"tpcw.record_type".alias("nppes_rules_record_type"), $"tpcw.can_be_sole_proprietor".alias("nppes_rules_can_be_sole_proprietor"), $"tpcw.credentialing_required".alias("nppes_rules_credentialing_required"), $"tpcw.direct_affil".alias("nppes_rules_direct_affil"), $"tpcw.group_affil".alias("nppes_rules_group_affil"), $"tpcw.notes".alias("nppes_rules_notes"))

joined_df.write.mode("overwrite").saveAsTable("qnxt_db.provider")

⚠️ **GitHub.com Fallback** ⚠️