manu scala sql 1 - veeraravi/Spark-notes GitHub Wiki
val className = e.getClass.getName
className match {
case "org.postgresql.util.PSQLException" => e.getSQLState() match {
case "23505" => throw new UniqueConstraintException(e)
case _ => throw e
}
case "org.h2.jdbc.JdbcSQLException" => e.getSQLState() match {
case "23505" => throw new UniqueConstraintException(e)
case _ => throw e
}
case _ => throw e }
import com.databricks.WorkflowException import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import java.util.Properties import scala.concurrent.{Await} import scala.concurrent.duration._ import org.apache.spark.sql.hive.HiveContext import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
%scala try { Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver") } catch { case e: ClassNotFoundException ⇒{ println("SQLServerDriver class is not found to load") } case e: Exception ⇒ { e.printStackTrace } }
val jdbcHostname = "datsqlsv01hub01dv01.database.windows.net" val jdbcPort = 1433 val jdbcDatabase ="datsqldb02fmhub01dv01" val jdbcUsername = dbutils.secrets.get(scope = "jdbc", key = "username") val jdbcPassword = dbutils.secrets.get(scope = "jdbc", key = "password")
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};"
import java.util.Properties import org.apache.spark.sql.SaveMode
val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}") connectionProperties.put("password", s"${jdbcPassword}")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" connectionProperties.setProperty("Driver", driverClass)
val jobid = dbutils.widgets.get("job_id") val notebookid = dbutils.widgets.get("notebook_id") val executionid = dbutils.widgets.get("job_execution_id")
val hivedbname = "zade" val curate_db ="CREATE DATABASE If Not Exists " + hivedbname + " LOCATION " + "'/mnt/zade/curation/zeacuration/db'"; spark.sql(curate_db)
val sourcedbname ="zeacuration" val sourcetablename = sourcedbname + ".config_" + jobid.toString() + "" + notebookid.toString() + "" + executionid.toString()
try{ val df = spark.read.jdbc(jdbcUrl, sourcetablename, connectionProperties) } catch { case e: SQLException ⇒ throw e case e: Exception ⇒ throw e }
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").save("/mnt/zade/curation/zeacuration/config/"+ jobid + "/" + notebookid + "/" + executionid)
spark.sql("DROP TABLE IF EXISTS " + sourcetablename.replace(sourcedbname, hivedbname)) val configtable = sourcetablename.replace(sourcedbname, hivedbname)
spark.sql("CREATE TABLE " + configtable + " USING DELTA LOCATION " + "'/mnt/zade/curation/zeacuration/config/"+ jobid + "/" + notebookid + "/" + executionid+"'")
spark.sql("UPDATE " + configtable + " SET notebook_run_status = 'Started', notebook_run_status_dt= current_timestamp(), notebook_run_start_dt = current_timestamp() " + " WHERE job_execution_id = " + executionid + " AND notebook_id = " + notebookid)
val childrendf = df.where$"parent_notebook_id" === notebookid.sort($"notebook_run_order".asc) childrendf.createOrReplaceTempView("childrennb")
val nbdf = spark.sql("SELECT job_execution_id, notebook_execution_id, notebook_id, notebook_name, notebook_path, parent_notebook_id, notebook_run_order FROM childrennb GROUP BY job_execution_id, notebook_execution_id, notebook_id, notebook_name, notebook_path, parent_notebook_id, notebook_run_order ORDER BY notebook_run_order ASC")
val nborder = nbdf.select("notebook_run_order").distinct.map(r ⇒ r.getInt(0)).collect.toList var priorcount :Long =0
try { nborder.foreach{ x ⇒ { //Check if the notebooks with lower run_order have run successfully if (nborder.indexOf(x) >0) { val priorres = spark.sql("SELECT notebook_id FROM " + configtable + " WHERE notebook_run_status <> 'SUCCESS' AND notebook_run_order<" + x ) priorcount = priorres.count() } else { val priorres = spark.sql("SELECT notebook_id FROM " + configtable + " WHERE notebook_run_status = 'SUCCESS' AND notebook_run_order=" + x) priorcount = priorres.count() }
if (priorcount==0) {
val subnbdf = nbdf.where(($"notebook_run_order" === x))
val nbid = subnbdf.select("notebook_id").map(r => r.getInt(0)).collect.toList
val nbpath = subnbdf.select("notebook_path").map(r => r.getString(0)).collect.toList
val nbexeid = subnbdf.select("notebook_execution_id").map(r => r.getInt(0)).collect.toList
var parallelnb = Seq(
(nbpath(0), 3600, Map("job_id" -> jobid, "master_notebook_id" -> notebookid, "notebook_id" ->nbid(0), "job_execution_id" ->executionid, "notebook_execution_id" ->nbexeid(0))))
var i = 1
while( i< nbid.size ){
val newnb = Seq(
(nbpath(i), 3600, Map("job_id" -> jobid, "master_notebook_id" -> notebookid, "notebook_id" ->nbid(i), "job_execution_id" ->executionid, "notebook_execution_id" ->nbexeid(i))))
parallelnb = parallelnb ++ newnb
i = i + 1
}
val res = parallelNotebooks(parallelnb)
Await.result(res, 2 minutes)
//each task returns Success or Failure based on Notebook Exit Conditions as a Sequence. Max 4 in parallel
if(res.value != "SUCCESS")
spark.sql("UPDATE " + configtable + " SET notebook_run_status = 'FAILED', notebook_run_status_dt = current_timestamp()" + " WHERE job_execution_id = " + executionid + " AND notebook_id = " + notebookid )
}
}
}
} catch {
case ie: InterruptedException =>
println("Interrupted: " + ie)
case e: Exception =>
println("Error: " + e)
try {
spark.sql("UPDATE " + configtable + " SET notebook_run_status = 'FAILED', notebook_run_status_dt = current_timestamp()" + " WHERE job_execution_id = " + executionid + " AND notebook_id = " + notebookid )
//Populate execution logs back to meta store
val logdf = spark.table(configtable)
logdf.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, sourcetablename , connectionProperties)
dbutils.notebook.exit("FAILED")
} catch {
case e: Exception =>
dbutils.notebook.exit("FAILED")
}
}
try { var priorcount :Long =0
val priorres = spark.sql("SELECT notebook_id FROM " + configtable + " WHERE job_execution_id = " + executionid + " AND notebook_id = " + notebookid + " AND notebook_run_status = 'FAILED' UNION SELECT notebook_id FROM " + configtable + " WHERE job_execution_id = " + executionid + " AND parent_notebook_id = " + notebookid + " AND notebook_run_status = 'FAILED'")
priorcount = priorres.count()
if(priorcount==0)
spark.sql("UPDATE " + configtable + " SET notebook_run_status = 'SUCCESS', notebook_run_status_dt = current_timestamp(), notebook_run_end_dt = current_timestamp()" + " WHERE job_execution_id = " + executionid + " AND notebook_id = " + notebookid )
//Populate execution logs back to meta store val logdf = spark.table(configtable) logdf.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, sourcetablename , connectionProperties)
if(priorcount == 0)
dbutils.notebook.exit("SUCCESS")
else
dbutils.notebook.exit("FAILED")
} catch {
case ie: InterruptedException =>
println("Interrupted: " + ie)
case e: Exception =>
println("Error: " + e)
dbutils.notebook.exit("FAILED")
}