hiveql to df 31 08 - veeraravi/Spark-notes GitHub Wiki
set initial.processing.date=2018-07-04; set extraction.ardw.year.month=201806; set extraction.ardw.feedkeys=2018152005,2018152006,2018152007,2018152008,2018153000,2018153001,2018153002,2018153003,2018153004,2018154000,2018154001,2018154002; set extraction.gdr.feedkeys=2018156007,2018157000,2018157001,2018157002,2018157003,2018157004,2018157005,2018157006,2018157007,2018158000,2018158001,2018158002,2018158003,2018158004; use pmdlg;
import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.{trim, length, when} import org.apache.spark.sql.Column
val ardw_df = sqlContext.sql("select * from cstonedb3.ardw_trans_unbilled_keys"); ardw_df.createOrReplaceTempView("ardw_trans_unbilled_keys")
val gms_merchant_char = sqlContext.sql("select se_sic_cd,se_mer_ctgy_cd,se10 from cstonedb3.gms_merchant_char"); gms_merchant_char.createOrReplaceTempView("gms_merchant_char")
val triumph_demographics = sqlContext.sql("select gaicodeacctstatd,gaidatelaststatchgd,gaidateplstexprd,gaidateplstissuedd,CM13 from cstonedb3.triumph_demographics"); triumph_demographics.createOrReplaceTempView("triumph_demographics_temp")
val gstar_account_details = sqlContext.sql("select acc_int_status,acc_date_last_stat_chg,acc_dt_opened,CM13 from cstonedb3.gstar_account_details"); gstar_account_details.createOrReplaceTempView("gstar_account_details_temp")
val gstar_card_details = sqlContext.sql("select crd_amed_date_expire,CM13 from cstonedb3.gstar_card_details"); gstar_card_details.createOrReplaceTempView("gstar_card_details_temp")
val crt_currency = sqlContext.sql("select iso_alpha_cd,iso_no_cd from crt_currency"); crt_currency.createOrReplaceTempView("crt_currency_temp")
val crt_country = sqlContext.sql("select ctry_id,iso_2_pos_cd from crt_country"); crt_country.createOrReplaceTempView("crt_country_temp")
val gdr_se_characteristics = sqlContext.sql("select iso_mer_ctgy_cd,se_no from cstonedb3.gdr_se_characteristics"); gdr_se_characteristics.createOrReplaceTempView("gdr_se_characteristics_temp")
val mns_demographics_ar = sqlContext.sql("select sta_cd1,plstc_expr_dt,acct_creat_dt,CM13 from cstonedb3.mns_demographics_ar"); mns_demographics_ar.createOrReplaceTempView("mns_demographics_ar")
val apa_ar_demographic = sqlContext.sql("select sta_cd,card_expr_mo_yr,card_estb_mo_yr,CM13 from cstonedb3.apa_ar_demographic"); apa_ar_demographic.createOrReplaceTempView("apa_ar_demographic")
val gdr_corp_trans = sqlContext.sql("select * from cstonedb3.gdr_corp_trans_dtl_unbilled"); gdr_corp_trans.createOrReplaceTempView("gdr_corp_trans_temp")
val gdr_corp_trans_add_dtl = sqlContext.sql("select intr_trans_sub_type_cd,vpay_proxy_acct_no,intr_db_cr_cd,intr_trans_type_cd,intr_trans_sub_type_cd,intr_fin_ctgy_cd,trans_srce_type_cd,trans_plan_type_cd,clnt_orgn_id,bus_proc_dt,btch_no,trans_seq_no,base_acct_id from cstonedb3.gdr_corp_trans_add_dtl"); gdr_corp_trans_add_dtl.createOrReplaceTempView("gdr_corp_trans_add_dtl_temp")
val gdr_card_acct = sqlContext.sql("select card_account_number,acct_sta_cd,acct_sta_dt,plstc_expr_dt,mbr_since_dt,card_iss_dt,acct_type_cd,base_acct_id,basic_supp_no,prod_id from cstonedb3.gdr_card_acct"); gdr_card_acct.createOrReplaceTempView("gdr_card_acct_temp")
val psu_snapshot = sqlContext.sql("select trim_inst_id,ctry_id from psu_snapshot"); psu_snapshot.createOrReplaceTempView("psu_snapshot_temp")
val gdr_bill_cycle = sqlContext.sql("select bill_cycle_id,bill_dt from cstonedb3.gdr_bill_cycle"); gdr_bill_cycle.createOrReplaceTempView("gdr_bill_cycle_temp")
val gdr_corp_product = sqlContext.sql("select prod_id from cstonedb3.gdr_corp_product"); gdr_corp_product.createOrReplaceTempView("gdr_corp_product_temp")
val gdr_corp_product_ext = sqlContext.sql("select prtr_type_cd,prod_id from cstonedb3.gdr_corp_product_ext"); gdr_corp_product_ext.createOrReplaceTempView("gdr_corp_product_ext_temp")
val corp_prod_id_mapping = sqlContext.sql("select ar_sor,prod_id from corp_prod_id_mapping"); corp_prod_id_mapping.createOrReplaceTempView("corp_prod_id_mapping_temp")
val ardw_trans_gms_merchant_Query = s"SELECT cast(trim(t1.trans_id) as bigint) as pmd_ar_trans_id,trim(t1.acqr_refer_no) as pmd_ar_arn,t1.cm11 as pmd_ar_cm_11,concat(substr(t1.CM15,1,11),substr(t1.CM15,13,2)) as pmd_ar_cm_13, trim(t1.bill_curr_cd) as bill_curr_cd,trim(t1.subm_curr_cd) as subm_curr_cd, trim(t1.iss_ctry_cd) as iss_ctry_cd,trim(t1.se10) as se10, "+ s"t1.cm15 AS pmd_ar_cm_15, '0' as pmd_ar_clnt_orgn_id, trim(t1.prod_ia_cd) as pmd_ar_prod_id,trim(nvl(t1.post_dt,'')) as pmd_ar_bus_proc_dt,trim(nvl(t1.trans_dt,'')) as pmd_ar_trans_dt, "+ s"case when trim(t1.trans_cd) IN ('0410C','0410P') then t1.trans_am*-1 else t1.trans_am end as pmd_ar_bill_am,t1.subm_trans_am as pmd_ar_subm_trans_am,0 as pmd_ar_bill_usd_am,trim(t1.se10) as pmd_ar_se_acct_no, "+ s"trim(t2.se_sic_cd) as pmd_ar_se_sic_cd,trim(t2.se_mer_ctgy_cd) as pmd_ar_se_mcc_cd,trim(t1.trans_cd) as pmd_ar_trans_type_cd,'' as pmd_ar_trans_subtype_cd,trim(t1.btch_no) as pmd_ar_btch_no,trim(nvl(t1.bill_thru_dt,'')) as pmd_ar_bill_thru_dt, "+ s"trim(t1.ardw_sor_org) as pmd_ar_mkt_cd,trim(t1.device_pan) as pmd_cons_ar_dpan,0 as pmd_opp_id_se,0 as pmd_opp_id_corp,trim(t1.logo_grp) as pmd_ar_logo_grp,trim(t1.plan_grp) as pmd_ar_plan_grp,trim(t1.flm_seq_no) as pmd_ar_flm_sq_no, "+ s"'' as pmd_cm_chan,trim(t1.config_id) as pmd_ar_config_id,case when trim(t1.srce_sys)='TRIUMPH' then 'CONSTRI' when trim(t1.srce_sys)='GLOBESTAR' then 'CONSGSTAR' when trim(t1.srce_sys)='MNS' then 'CONSMNS' when trim(t1.srce_sys)='APAR' then 'CONSAPAR' else 'CONSUNK' end pmd_ar_sor, "+ s"trim(t1.list_idx) as pmd_ar_list_idx,'${hiveconf:initial.processing.date}' AS pmd_ar_init_proc_dt,'' as pmd_ar_trans_cd_cat_cd,'' as pmd_ar_trans_seq_no,'' as pmd_ar_btch_sub_cd,'' as pmd_ar_db_cr_cd,'' as pmd_ar_fin_ctgy_cd,'' as pmd_ar_srce_trans_cd,'' as pmd_ar_srce_trans_type_cd, "+ s"'' as pmd_ar_srce_lgc_modl_cd,'' as pmd_ar_trans_plan_type_cd,'' as pmd_ar_base_acct_id "+ s"FROM ardw_trans_unbilled_keys t1 LEFT OUTER JOIN cstonedb3.gms_merchant_char t2 ON trim(t1.se10) = trim(t2.se10) "+ s"WHERE trim(t1.srce_sys) in ('TRIUMPH','GLOBESTAR','MNS','APAR') AND t1.cstone_feed_key IN (${hiveconf:extraction.ardw.feedkeys}) AND upper(trim(NVL(t1.spnd_nonspnd_in,""))) <> "NS" AND bus_yr_mo IN (${hiveconf:extraction.ardw.year.month})"
ardw_trans_gms_merchant_Query.createOrReplaceTempView("ardw_trans_gms_temp")
val joinDf1 = s"SELECT atg.*,trim(t8.iso_2_pos_cd) as pmd_ar_ctry_id,trim(t6.iso_alpha_cd) as pmd_ar_bill_am_curr_cd, trim(t7.iso_alpha_cd) as pmd_ar_subm_trans_am_curr_cd,trim(t9.iso_mer_ctgy_cd) as pmd_seller_se_mcc, "+ s"COALESCE(t3.gaicodeacctstatd,t4.acc_int_status,t10.sta_cd1,t11.sta_cd) as pmd_ar_acct_sta_cd, "+ s"trim(nvl(COALESCE(t3.gaidatelaststatchgd,t4.acc_date_last_stat_chg),'')) as pmd_ar_acct_sta_dt, "+ s"trim(nvl(COALESCE(t3.gaidateplstexprd,t5.crd_amed_date_expire,t10.plstc_expr_dt,t11.card_expr_mo_yr),'')) as pmd_ar_plstc_expr_dt, "+ s"trim(nvl(COALESCE(t3.gaidateplstissuedd,t4.acc_dt_opened,t10.acct_creat_dt,t11.card_estb_mo_yr),'')) as pmd_ar_cm_since_dt, "+ s"trim(nvl(COALESCE(t3.gaidateplstissuedd,t4.acc_dt_opened,t10.acct_creat_dt,t11.card_estb_mo_yr),'')) as pmd_ar_card_iss_dt,"+ s"FROM ardw_trans_gms_temp atg LEFT OUTER JOIN triumph_demographics_temp t3 ON s"concat(substr(atg.pmd_ar_cm_15,1,11),substr(atg.pmd_ar_cm_15,13,2)) = t3.CM13 "+ s"LEFT OUTER JOIN gstar_account_details_temp t4 ON concat(substr(atg.pmd_ar_cm_15,1,11),substr(atg.pmd_ar_cm_15,13,2)) = t4.CM13 "+ s"LEFT OUTER JOIN gstar_card_details_temp t5 ON concat(substr(atg.pmd_ar_cm_15,1,11),substr(atg.pmd_ar_cm_15,13,2)) = t5.CM13 "+ s"LEFT OUTER JOIN crt_currency_temp t6 ON trim(t6.iso_no_cd)=trim(atg.bill_curr_cd) "+ s"LEFT OUTER JOIN crt_currency_temp t7 ON trim(t7.iso_no_cd)=trim(atg.subm_curr_cd) "+ s"LEFT OUTER JOIN crt_country_temp t8 ON trim(t8.ctry_id)=trim(atg.iss_ctry_cd) "+ s"LEFT OUTER JOIN gdr_se_characteristics_temp t9 ON trim(atg.se10)=trim(t9.se_no) "+ s"LEFT OUTER JOIN mns_demographics_ar_temp t10 ON concat(substr(atg.pmd_ar_cm_15,1,11),substr(atg.pmd_ar_cm_15,13,2))=t10.CM13 "+ s"LEFT OUTER JOIN apa_ar_demographic_temp t11 ON concat(substr(atg.pmd_ar_cm_15,1,11),substr(atg.pmd_ar_cm_15,13,2))=t11.CM13 "+
val joinDf2 = s"select trim(t1.src_ref_no) as pmd_ar_arn, t1.clnt_orgn_id as pmd_ar_clnt_orgn_id,trim(t1.prod_id) as pmd_ar_prod_id,trim(t1.roc_id) as roc_id,trim(t1.trans_id) as trans_id"+ s"trim(nvl(t1.bus_proc_dt ,'')) as pmd_ar_bus_proc_dt,trim(nvl(t1.trans_dt,'')) as pmd_ar_trans_dt,t1.bill_dcml_am as pmd_ar_bill_am,trim(t1.bill_curr_cd) as pmd_ar_bill_am_curr_cd,t1.subm_curr_dcml_am as pmd_ar_subm_trans_am, "+ s"trim(t1.subm_curr_cd) as pmd_ar_subm_trans_am_curr_cd,t1.bill_usd_am as pmd_ar_bill_usd_am,trim(t1.se_acct_no) as pmd_ar_se_acct_no,trim(t1.sic_cd) as pmd_ar_se_sic_cd, t2.intr_trans_type_cd as intr_trans_type_cd,"+ s"trim(t1.trans_type_cd) as pmd_ar_trans_type_cd,trim(t2.intr_trans_sub_type_cd) as pmd_ar_trans_subtype_cd,trim(t1.btch_no) as pmd_ar_btch_no,trim(t1.mkt_cd) as pmd_ar_mkt_cd, trim(t2.intr_db_cr_cd) as intr_db_cr_cd"+ s"trim(t2.vpay_proxy_acct_no) as pmd_cons_ar_dpan,0 as pmd_opp_id_se,0 as pmd_opp_id_corp,'' as pmd_ar_logo_grp,'' as pmd_ar_plan_grp,'' as pmd_ar_flm_sq_no,'' as pmd_cm_chan,'' as pmd_ar_config_id, "+ s" '' as pmd_ar_list_idx,'${hiveconf:initial.processing.date}' AS pmd_ar_init_proc_dt, t1.basic_supp_no as basic_supp_no, "+ s"t1.trans_seq_no as pmd_ar_trans_seq_no,t1.btch_sub_cd as pmd_ar_btch_sub_cd, "+ s"case when trim(t2.intr_db_cr_cd) IN ('001','003') then 'DR' "+ s"when trim(t2.intr_db_cr_cd) IN ('002','004') then 'CR' end pmd_ar_db_cr_cd,t2.intr_fin_ctgy_cd as pmd_ar_fin_ctgy_cd,t1.srce_trans_cd as pmd_ar_srce_trans_cd,t2.trans_srce_type_cd as pmd_ar_srce_trans_type_cd, t1.opt_cd as opt_cd,t1.cstone_feed_key as cstone_feed_key"+ s"t1.bill_cycle_id as bill_cycle_id,trim(t1.srce_lgc_modl_cd) as pmd_ar_srce_lgc_modl_cd, "+ s"trim(t2.trans_plan_type_cd) as pmd_ar_trans_plan_type_cd,trim(t1.base_acct_id) as pmd_ar_base_acct_id, " + s"t1.gcp_entitlement as gcp_entitlement"+ s"from gdr_corp_trans_temp t1 LEFT OUTER JOIN gdr_corp_trans_add_dtl_temp t2 ON (t1.clnt_orgn_id=t2.clnt_orgn_id AND t1.bus_proc_dt=t2.bus_proc_dt AND t1.btch_no=t2.btch_no AND t1.trans_seq_no=t2.trans_seq_no AND t1.base_acct_id=t2.base_acct_id)"
joinDf2.createOrReplaceTempView("joinDf2")
val joinDf3 = s" jd.,CASE WHEN trim(t12.ar_sor)='TSYS' then cast(trim(jd.roc_id) as bigint) else cast(trim(jd.trans_id) as bigint) end as pmd_ar_trans_id,"+ s"substr(t3.card_account_number,1,11) as pmd_ar_cm_11,"+ s"concat(substr(t3.card_account_number,1,11),substr(t3.card_account_number,13,2)) as pmd_ar_cm_13, " + s"t3.card_account_number AS pmd_ar_cm_15, " + trim(t8.iso_2_pos_cd) as pmd_ar_ctry_id,trim(t5.se_mer_ctgy_cd) as pmd_ar_se_mcc_cd,trim(nvl(t6.bill_dt,'')) as pmd_ar_bill_thru_dt, "+ s"trim(t3.acct_sta_cd) pmd_ar_acct_sta_cd,trim(t3.acct_sta_dt) as pmd_ar_acct_sta_dt,trim(t3.plstc_expr_dt) as pmd_ar_plstc_expr_dt, trim(t3.mbr_since_dt) as pmd_ar_cm_since_dt,trim(t3.card_iss_dt) as pmd_ar_card_iss_dt, "+ s"case when trim(t12.ar_sor)='GLOBESTAR' then 'CORPGSTAR' when trim(t12.ar_sor)='TSYS' then 'CORPTSYS' when trim(t12.ar_sor)='CARS' then 'CORPCARS' when trim(t12.ar_sor)='APA_AR' then 'CORPAPAR' when trim(t12.ar_sor)='CARE' then 'CORPCARE' when trim(t12.ar_sor)='MNS' then 'CORPMNS' else 'CORPUNK' end pmd_ar_sor, "+ s"case when trim(t12.ar_sor)='CARS' and jd.intr_db_cr_cd IS NOT NULL then trim(nvl(COALESCE(CONCAT('CA',jd.pmd_ar_btch_no,jd.pmd_ar_btch_sub_cd,jd.intr_db_cr_cd,jd.intr_trans_type_cd,jd.pmd_ar_trans_subtype_cd,jd.pmd_ar_fin_ctgy_cd,jd.pmd_ar_trans_type_cd)),'CA')) "+ s"when trim(t12.ar_sor)='GLOBESTAR' and jd.intr_db_cr_cd IS NOT NULL then trim(nvl(COALESCE(CONCAT(jd.pmd_ar_srce_trans_type_cd,jd.pmd_ar_srce_trans_cd,jd.intr_db_cr_cd,jd.intr_trans_type_cd,jd.pmd_ar_trans_subtype_cd,jd.pmd_ar_fin_ctgy_cd,jd.pmd_ar_trans_type_cd)),'G')) "+ s"when trim(t12.ar_sor)='TSYS' and jd.intr_db_cr_cd IS NOT NULL then trim(nvl(COALESCE(CONCAT(jd.pmd_ar_srce_trans_type_cd,jd.pmd_ar_srce_trans_cd,jd.intr_db_cr_cd,jd.intr_trans_type_cd,jd.pmd_ar_trans_subtype_cd,jd.pmd_ar_fin_ctgy_cd,jd.pmd_ar_trans_type_cd)),'TSYS')) "+ s"when trim(t12.ar_sor) IN ('APA_AR','CARE','MNS') then trim(nvl(COALESCE(CONCAT(t12.ar_sor,jd.pmd_ar_trans_type_cd)),t12.ar_sor)) "+ s"when trim(t12.ar_sor)='TSYS' and trim(jd.pmd_ar_prod_id) IN ('GWR','JA6') and trim(t3.acct_type_cd)='000008' then trim(nvl(COALESCE(CONCAT('TSYSLEG',jd.pmd_ar_trans_type_cd)),t12.ar_sor)) end as pmd_ar_trans_cd_cat_cd, "+ s"trim(t11.iso_mer_ctgy_cd) as pmd_ar_seller_se_mcc "+ s" from joinDf2 jd LEFT OUTER JOIN gdr_card_acct_temp t3 ON jd.pmd_ar_base_acct_id=t3.base_acct_id AND jd.basic_supp_no=t3.basic_supp_no s"LEFT OUTER JOIN psu_snapshot_temp t4 ON substr(t3.card_account_number,1,6)=t4.trim_inst_id s"LEFT OUTER JOIN cstonedb3.gms_merchant_char t5 ON trim(jd.pmd_ar_se_acct_no)=trim(t5.se10) ---t2 s"LEFT OUTER JOIN gdr_bill_cycle_temp t6 ON jd.bill_cycle_id=t6.bill_cycle_id s"LEFT OUTER JOIN crt_country_temp t8 ON trim(t8.ctry_id)=trim(t4.ctry_id) ---t8 s"LEFT OUTER JOIN gdr_corp_product_temp t9 ON trim(t9.prod_id)=trim(t3.prod_id) s"LEFT OUTER JOIN gdr_corp_product_ext_temp t10 ON trim(t9.prod_id)=trim(t10.prod_id) s"LEFT OUTER JOIN gdr_se_characteristics_temp t11 ON trim(jd.pmd_ar_se_acct_no)=trim(t11.se_no) ---t9 s"LEFT OUTER JOIN corp_prod_id_mapping_temp t12 ON trim(jd.pmd_ar_prod_id)=trim(t12.prod_id) s"WHERE trim(t10.prtr_type_cd)!='002' AND jd.gcp_entitlement NOT LIKE '%REX%' AND jd.pmd_ar_clnt_orgn_id NOT LIKE '%RX%' AND trim(jd.opt_cd)='I' AND jd.cstone_feed_key IN (${hiveconf:extraction.gdr.feedkeys})";
joinDf1.unionAll(joinDf3)