child - Hari-dta/hari GitHub Wiki

import pyspark import json import argparse from pyspark.sql import * from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql.utils import * import os from datetime import datetime import logging import sys import subprocess import re

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.8" os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.8" logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') parser = argparse.ArgumentParser() parser.add_argument("--uri", required=True) parser.add_argument("--db", required=True) parser.add_argument("--collection", required=True) parser.add_argument("--stgname", required=True) parser.add_argument("--targetname", required=True) parser.add_argument("--audittable", required=True) parser.add_argument("--loadstatustable", required=True) parser.add_argument("--writetype", required=True) args = parser.parse_args() spark = SparkSession.builder \ .appName("HM01_MongoDB_to_RDV_pyspark") \ .enableHiveSupport() \ .master("local[]") \ .config("spark.mongodb.read.connection.uri", args.uri) \ .config("hive.exec.dynamic.partition", "true") \ .config("hive.exec.dynamic.partition.mode", "nonstrict") \ .getOrCreate() database_name = args.db collection_name = args.collection staging_table_name = args.stgname target_table = args.targetname audit_table_name = args.audittable load_status_table = args.loadstatustable write_mode = args.writetype.lower() start_time = datetime.now() starting_time = start_time.strftime('%Y-%m-%d %H:%M:%S') spark_app_id = spark.sparkContext.applicationId load_status = "SUCCESS" error_message = None source_count = 0 target_count = 0 staging_table_count = 0 spark.sql(f""" INSERT INTO {load_status_table} (spark_app_id, database_name , collection_name, status, starttime, endtime, errormessage) VALUES ('{spark_app_id}', '{database_name}', '{collection_name}', 'STARTED', '{starting_time}', NULL, NULL) """) def sanitize_names_with_uniqueness(schema): unknown_counter = [0] sanitized_top_names = set() top_level_names = {} def make_unique(name, existing_names): base = name counter = 1 while name in existing_names: name = f"{base}{counter}" counter = 1 existing_names.add(name) return name sanitized_fields = [] for field in schema.fields: raw_name = field.name base_name = re.sub(r'\W', '', raw_name).strip('').lower() if not base_name: base_name = f"col_unknown{unknown_counter[0]}" unknown_counter[0] = 1 elif base_name[0].isdigit(): base_name = f"col_{base_name}" unique_name = make_unique(base_name, sanitized_top_names) top_level_names[raw_name] = unique_name sanitized_fields.append(StructField(unique_name, field.dataType, field.nullable)) def recursive_sanitize(field, parent_path=None): raw_name = field.name base_name = re.sub(r'\W', '', raw_name).strip('').lower() if not base_name: base_name = f"col_unknown_{unknown_counter[0]}" unknown_counter[0] += 1 elif base_name[0].isdigit(): base_name = f"col_{base_name}" prefix = ''.join(parent_path) if parent_path else '' full_name = f"{prefix}{base_name}" if prefix else base_name full_name = make_unique(full_name, sanitized_top_names) dtype = field.dataType if isinstance(dtype, StructType): new_fields = [recursive_sanitize(f, (parent_path or []) + [base_name]) for f in dtype.fields] return StructField(full_name, StructType(new_fields), field.nullable) elif isinstance(dtype, ArrayType): element_field = recursive_sanitize(StructField("element", dtype.elementType), (parent_path or []) + [base_name]) return StructField(full_name, ArrayType(element_field.dataType), field.nullable) elif isinstance(dtype, MapType): value_field = recursive_sanitize(StructField("value", dtype.valueType), (parent_path or []) + [base_name]) return StructField(full_name, MapType(StringType(), value_field.dataType), field.nullable) else: return StructField(full_name, dtype, field.nullable) final_fields = [] for field in sanitized_fields: dtype = field.dataType if isinstance(dtype, (StructType, ArrayType)): final_fields.append(recursive_sanitize(field)) else: final_fields.append(field) return StructType(final_fields) def build_explicit_schema(spark_schema): def sanitize_and_copy(field): name = field.name dtype = field.dataType nullable = field.nullable if isinstance(dtype, StringType): return StructField(name, StringType(), nullable) elif isinstance(dtype, IntegerType): return StructField(name, IntegerType(), nullable) elif isinstance(dtype, BooleanType): return StructField(name, BooleanType(), nullable) elif isinstance(dtype, DoubleType): return StructField(name, DoubleType(), nullable) elif isinstance(dtype, DecimalType): return StructField(name, DecimalType(30, 10), nullable) elif isinstance(dtype, TimestampType): return StructField(name, TimestampType(), nullable) elif isinstance(dtype, DateType): return StructField(name, DateType(), nullable) elif isinstance(dtype, LongType): return StructField(name, LongType(), nullable) elif isinstance(dtype, ArrayType): return StructField(name, ArrayType(build_explicit_schema(StructType([StructField("element", dtype.elementType)]))[0].dataType), nullable) elif isinstance(dtype, MapType): return StructField(name, MapType(StringType(), StringType()), nullable) elif isinstance(dtype, StructType): return StructField(name, build_explicit_schema(dtype), nullable) else: return StructField(name, StringType(), nullable) return StructType([sanitize_and_copy(field) for field in spark_schema.fields]) def extract_nested_fields(df, base_table_name, id_col="id", output_type="array"): nested_tables = [] for field in df.schema.fields: name = field.name dtype = field.dataType if isinstance(dtype, ArrayType) and output_type == "array": array_df = df.select(col(id_col), col(name)) if isinstance(dtype.elementType, StructType): flat_cols = [col(id_col)] + [col(f"{name}.{f.name}").alias(f"{name}{f.name}") for f in dtype.elementType.fields] else: flat_cols = [col(id_col), col(name).alias(f"{name}_value")] nested_tables.appendf"{base_table_name}_{name}_array", array_df.select(*flat_cols) elif isinstance(dtype, StructType) and output_type == "struct": struct_df = df.select(col(id_col), *[col(f"{name}.{f.name}").alias(f"{name}_{f.name}") for f in dtype.fields]) nested_tables.appendf"{base_table_name}_{name}_struct", struct_df return nested_tables # Main logic try: raw_df = spark.read.format("mongodb").option("database", database_name).option("collection", collection_name).load() if raw_df.rdd.isEmpty(): raise ValueError("MongoDB collection is empty.") source_count = raw_df.count() sanitized_schema = sanitize_names_with_uniqueness(raw_df.schema) if len(sanitized_schema.fields) == 0: raise ValueError("Sanitized schema has no valid fields.") explicit_schema = build_explicit_schema(sanitized_schema) sanitized_df = spark.createDataFrame(raw_df.rdd, schema=explicit_schema) sanitized_df.write.mode("overwrite").format("parquet").saveAsTable(staging_table_name) staging_table_count = spark.sql(f"SELECT COUNT() FROM {staging_table_name}").first()[0] if source_count != staging_table_count: raise ValueError("Row count mismatch: MongoDB vs staging") try: spark.catalog.refreshTable(target_table) target_df = spark.table(target_table) target_columns = target_df.columns for field in target_df.schema: if field.name not in sanitized_df.columns: sanitized_df = sanitized_df.withColumn(field.name, lit(None).cast(field.dataType)) sanitized_df.select(*target_columns).write.mode(write_mode).format("parquet").saveAsTable(target_table) target_count = sanitized_df.select(*target_columns).count() except AnalysisException: sanitized_df.write.mode(write_mode).format("parquet").saveAsTable(target_table) target_count = sanitized_df.count() # Extract arrays array_tables = extract_nested_fields(sanitized_df, staging_table_name, output_type="array") struct_tables = extract_nested_fields(sanitized_df, staging_table_name, output_type="struct") for tbl_name, df in array_tables + struct_tables: df.write.mode(write_mode).format("parquet").saveAsTable(tbl_name) except Exception as e: load_status = "FAILURE" error_message = str(e) logging.error(error_message) ending_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') load_duration = str(datetime.now() - start_time) spark.sql(f""" INSERT INTO {load_status_table} (spark_app_id, database_name, collection_name, status, starttime, endtime, errormessage) VALUES ('{spark_app_id}', '{database_name}', '{collection_name}', '{load_status}', '{starting_time}', '{ending_time}', {f"'{error_message}'" if error_message else "NULL"}) """) audit_schema = StructType([ StructField("spark_app_id", StringType(), True), StructField("database_name", StringType(), True), StructField("collection_name", StringType(), True), StructField("start_time", StringType(), True), StructField("end_time", StringType(), True), StructField("load_status", StringType(), True), StructField("load_duration", StringType(), True), StructField("error_message", StringType(), True), StructField("source_count", IntegerType(), True), StructField("staging_table_count", IntegerType(), True), StructField("target_table_count", IntegerType(), True) ]) audit_df = spark.createDataFrame([( spark_app_id, database_name, collection_name, starting_time, ending_time, load_status, load_duration, error_message, source_count, staging_table_count, target_count )], schema=audit_schema) audit_df.write.mode("append").format("parquet").saveAsTable(audit_table_name) # Impala invalidate def invalidate_impala_table(table_name): cmd = f"""impala-shell -i ab00-dimpalalb.arabbank.plc:21000 -d default -k --ssl \ --ca_cert=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_cacerts.pem -q 'INVALIDATE METADATA {table_name};'""" try: subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) logging.info(f"Impala metadata invalidated for {table_name}") except subprocess.CalledProcessError as e: logging.warning(f"Impala error: {e.stderr}") invalidate_impala_table(staging_table_name) invalidate_impala_table(target_table) invalidate_impala_table(audit_table_name) invalidate_impala_table(load_status_table) for tbl_name, _ in array_tables + struct_tables: invalidate_impala_table(tbl_name) spark.stop()

⚠️ **GitHub.com Fallback** ⚠️