data ingestion - veeraravi/Spark-notes GitHub Wiki

data-ingestion spark-shell -i script_load.scala

spark-shell --jars /tmp/jars/terajdbc4.jar,/tmp/jars/tdgssconfig.jar,/tmp/Spark_Jars/commons-csv-1.6.jar,/tmp/Spark_Jars/spark-csv_2.10-1.5.0.jar

// Hive tables drop table if exists poc.sourceInfo; create table poc.sourceInfo(SourceId String,SourceName String, SourceType String, filePath String,FileType String,ColumnDelimiter String,RowDelimiter String,Schedule String,IsItHasHeader int,rawdatabase String, rawtablename String, stagedatabase string, stagetablename string)row format delimited fields terminated by ',' stored as textfile;

load data local inpath '/home/ravi/SourceInfo.csv' into table poc.sourceInfo;

drop table if exists poc.columnInfo;

create table poc.columnInfo(SourceId String,SourceColumnName String,SourceColumnOrder String,SourceDataType String,SourcePrecision int,SourceScale int,Format String,TargetColumnName String,TargetDataType String,TargetPrecision int,TargetScale int)row format delimited fields terminated by ',' stored as textfile;

load data local inpath '/home/ravi/ColumnsInfo.csv' into table poc.columnInfo;

spark-shell --jars /tmp/Spark_Jars/commons-csv-1.6.jar,/tmp/Spark_Jars/spark-csv_2.10-1.5.0.jar

import org.apache.spark.sql.types.{StructType,StructField,StringType}; import org.apache.spark.sql.DataFrame import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions._ import scala.collection.mutable.ListBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.aggregate.TypedAverage import org.apache.spark.sql.{DataFrame, Dataset, Row, TypedColumn} import org.apache.spark.sql.types._

def writeIntoTable(df:DataFrame,dataBaseName:String,tableName:String,tableColumns:String,delimiter:String,fileFormat:String) { sqlContext.sql("CREATE TABLE IF NOT EXISTS"+ dataBaseName+"."+tableName+"(" + tableColumns.substring(0,tableColumns.length-1)+") row format delimited fields terminated by '" + delimiter + "' stored as" +fileFormat); //write the dataframe data to the hdfs location for the created Hive table df.write.format("com.databricks.spark.csv").option("delimiter",delimiter).mode("overwrite").saveAsTable(dataBaseName+"."+tableName); }

// get the dataframe with trimmed values def getTrimmedDf(sourceDF:DataFrame):DataFrame = { val actualDF = sourceDF.columns.foldLeft(sourceDF) { (DF, colName) => DF.withColumn( colName, regexp_replace(col(colName), "^\s+|\s+$", "") ) } actualDF }

//get the schema split as string with comma-separated field-datatype pairs def getSchema(sourceDF:DataFrame):String = { val columnNames = sourceDF.schema.fields var fieldStr = ""; for(x <- columnNames){ fieldStr += x.name+" "+x.dataType.typeName+"," } fieldStr
}

//val sourceDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/dev/veera/blocklist.CSV")

//case class Bids(bid_id: Double, bidder_id: String, auction: String, device: String, time: String, country: String, ip: String, url: String)

def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) => { if (idx == 0) { lines.drop(1) } lines }) }

def getColumnsAsString(columnInfoDF:DataFrame, sourceId:String):String = { val sourceSchema = columnInfoDF.filter($"sourceid" === sourceId).select($"sourceColumnName",$"SourceDataType",$"SourcePrecision",$"SourceScale").na.fill(0,Seq("SourcePrecision","SourceScale")).rdd.map(x =>{ if(x.getInt(2) > 0 && Array("char","varchar").contains(x.getString(1))) x.getString(0)+" "+x.getString(1)+"("+x.getInt(2)+")" else if(x.getInt(2) > 0) x.getString(0)+" "+x.getString(1)+"("+x.getInt(2)+","+x.getInt(3)+")" else x.getString(0)+" "+x.getString(1) }).collect sourceSchema.mkString(",") }

def getSchemaForDf(columnInfoDF:DataFrame,sourceId:String):StructType={

val typeMapping = Map( "double" -> DoubleType, "integer" -> IntegerType, "string" -> StringType, "date" -> StringType, "boolean" -> BooleanType, "decimal" -> DoubleType, "varchar" -> StringType, "char" -> StringType, "smallint" -> IntegerType, "timestamp" -> StringType, "int"-> IntegerType)

var rddSchema = (new StructType)

val listOfColsForSrc = columnInfoDF.filter($"sourceid" === sourceId ).select($"sourceColumnName",$"SourceDataType").rdd.map(x => { x.getString(0)+" "+x.getString(1) }).collect.toList listOfColsForSrc.foreach{ ele => rddSchema = rddSchema.add(ele.split(" ")(0),StringType,true) } rddSchema }

def run(sourceInfoDf:DataFrame):Unit = { sourceInfoDf.select($"SourceId",$"filepath",$"ColumnDelimiter",$"rawdatabase",$"rawtablename",$"stagedatabase",$"stagetablename",$"IsItHasHeader").rdd.map(ele =>{ val sourceId = ele.getString(0) val filePath = ele.getString(1) val delimiter = ele.getString(2) val rwdb = ele.getString(3) val rwtbl = ele.getString(4) val stgdb = ele.getString(5) val stgtbl = ele.getString(6) val isItHasHeader = ele.getInt(7)

println("SourceId "+sourceId)

val columnInfoDF = sqlContext.sql(s"select * from poc.columnInfo where SourceId =$sourceId")

//val fileSource= args(0) //val schemaDF = sqlContext.sql(s"select * from poc.SourceInfo where SourceId =$sourceId")

val tableColumns = getColumnsAsString(columnInfoDF,sourceId) val dataBaseName = "poc"; val tableName = "csvTest"; //val delimiter = "," //sqlContext.sql("select ColumnDelimiter from poc.SourceInfo where SourceId = 5").map(.getString(0)).first val listOfColsForSrc = getSchemaForDf(columnInfoDF,sourceId) val fileFormat = "parquet" //val isItHasHeader = false //sqlContext.sql("select ColumnDelimiter from poc.SourceInfo where SourceId = 5").map(.getString(0)).first

var sourceDF = sc.textFile(filePath) //if(fileSource) if(isItHasHeader != 0) sourceDF = dropHeader(sourceDF) //val splitedRdd = withoutHeader.map(ele => ele.split(delimiter)) val row_rdd = sourceDF.map(x => x.split(delimiter)).map(x => Row.fromSeq(x)) val dataFrame = sqlContext.createDataFrame(row_rdd, listOfColsForSrc)

//******schema validation

val schema = dataFrame.schema.simpleString var isValidSchema = schema.contains("structEinNbr:int,FirstNm:string,CurCityStFraudCd:string,CurPstlCd:string,PrevStCd:string")

isValidSchema = true

if(isValidSchema){ writeIntoTable(dataFrame,rwdb,rwtbl,tableColumns,delimiter,fileFormat) } else{ //data is not valid, move raw files to specified location dataFrame.write.format("parquet").save("/data/failure/") }

var trimmedDF = getTrimmedDf(dataFrame)

if(trimmedDF.count > 0){ val tableColumns = getSchema(dataFrame) val dataBaseName = "poc"; val tableName = "csvTest"; val delimiter = "|" val fileFormat = "parquet"

writeIntoTable(trimmedDF,stgdb,stgtbl,tableColumns,delimiter,fileFormat) } }).collect }

val sourceInfoDf = sqlContext.sql("select * from poc.SourceInfo") run(sourceInfoDf)