import subprocess
from pyspark.sql import SparkSession
import os
# Set Python environment for PySpark
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.8"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.8"
# Initialize Spark session with Hive support
spark = SparkSession.builder \
.appName("MongoToRDV Control Table Reader") \
.enableHiveSupport() \
.getOrCreate()
# Required JARs
jars = [
"/home/acabes_dev_team/mongo/mongo-spark-connector_2.12-10.2.1.jar",
"/home/acabes_dev_team/mongo/bson-4.11.1.jar",
"/home/acabes_dev_team/mongo/mongodb-driver-sync-4.11.1.jar",
"/home/acabes_dev_team/mongo/mongodb-driver-core-4.11.1.jar",
"/home/hariharan-m/mongodb-driver-sync-4.11.1.jar",
"/home/hariharan-m/hive-serde-4.0.1.jar",
"/home/hariharan-m/hive-hcatalog-core-0.13.0.jar",
"/home/hariharan-m/json-serde-1.3.8-jar-with-dependencies.jar"
]
# Read all job IDs from control table
control_df = spark.sql("""
SELECT * FROM acabes_training.mongo_to_rdv_control
""")
# Iterate through each job
for row in control_df.collect():
job_id = row.job_id.lower()
audittable = row.audittable
# Check if job_id already exists in audit table
audit_check_query = f"""
SELECT COUNT(*) as count FROM acabes_training.mongo_to_rdv_v2_audit WHERE job_id = '{job_id}'
"""
audit_df = spark.sql(audit_check_query)
if audit_df.first()['count'] > 0:
print(f"Job ID {job_id} already audited. Skipping.")
continue
# Extract job parameters
jobid = row.job_id.lower()
executor_memory = row.executor_memory
num_executors = row.num_executors
driver_memory = row.driver_memory
driver_cores = row.driver_cores
db_user = row.db_user
db_pass = row.db_pass
db_host = row.db_host
db_port = row.db_port
db_name = row.db_name
db = row.db
collection = row.collection
stgname = row.stgname
targetname = row.targetname
loadstatustable = row.loadstatustable
writetype = row.writetype
# Validate JARs
for jar in jars:
if not os.path.exists(jar):
print(f"Warning: JAR file does not exist: {jar}")
jars_cmd = ",".join(jars)
# Construct spark-submit command
command = f"""
spark3-submit --master yarn --deploy-mode client --executor-memory {executor_memory} --num-executors {num_executors} \\
--driver-memory {driver_memory} --driver-cores {driver_cores} \\
--conf spark.shuffle.compress=true --conf spark.io.compression.codec=snappy \\
--conf spark.streaming.backpressure.enabled=true --conf spark.yarn.maxAppAttempts=2 \\
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -Djavax.net.ssl.trustStore=/home/hariharan-m/truststore.jks -Djavax.net.ssl.trustStorePassword=changeit" \\
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC -Djavax.net.ssl.trustStore=/home/hariharan-m/truststore.jks -Djavax.net.ssl.trustStorePassword=changeit" \\
--jars {jars_cmd} \\
--supervise --files /home/hariharan-m/mongodb-shared-chain.pem \\
"/home/hariharan-m/mongo_to_rdv_final/mongo_to_rdv_v2.py" \\
--uri "mongodb://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}?ssl=true" \\
--db "{db_name}" --collection "{collection}" --stgname "{stgname}" \\
--targetname "{targetname}" --audittable "{audittable}" \\
--loadstatustable "{loadstatustable}" --writetype "{writetype}"
--jobid "{job_id}"
"""
print(f"Executing job for Job ID: {job_id}")
subprocess.run(command, shell=True)
print("All eligible jobs have been processed.")