dinesh canada - veeraravi/Spark-notes GitHub Wiki
api_ts_utc metric_ts_utc epg_channel_id fonse_session_id metric_ts_dt 2018-03-09 17:49:48 2018-03-09 17:38:48 CNNHD 20 20180309 2018-03-09 17:49:48 2018-03-09 17:38:48 GLFHD 12 20180309 2018-03-09 17:49:48 2018-03-09 17:38:48 CP24H 8 20180309
import org.apache.spark.sql.functions.udf def stringTrim = udfvalue: String) ⇒ { if(value != null && !value.isEmpty( value.split(',')(1).trim.dropRight(4) else " " }) df.select(from_unixtime(unix_timestamp(col("cr_dt")).minus(5 * 60), "YYYY-MM-dd HH:mm:ss"))
val date3 = df2.withColumn("api_ts_utc",stringTrim($"api_ts_utc")) val ts = unix_timestamp(date3("api_ts_utc"), "dd MMM yyyy HH:mm:ss").cast("timestamp") val date4 = date3.withColumn("api_ts_utc",ts)
val df1 =spark.read.json(spark.sparkContext.wholeTextFiles("file://home/datalake/rawdata.json").values)
val df2 = df1.select(explode($"batchesOutput").as("batches"),explode($"batches.output.HTTPClient_01OutputLane15121525507550").as("output"),explode($"output.header.values.date").as("api_ts_utc"),explode($"output.value.value.facets.value").as("facet"),explode($"facet.value.name.value").as("epg_channel_id"),explode($"facet.value.results.value").as("chncount"),explode($"chncount.value.uniqueCount.value").as("fonse_session_id"))
val df3 = df2.select(explode($"batches.output.HTTPClient_01OutputLane15121525507550").as("output"))
df3.select($"output.header.values.date").show
var date = df3.select(explode($"output.header.values.date").as("api_ts_utc"))
date = date.withColumn("api_ts_utc",stringTrim($"api_ts_utc"))
val ts = unix_timestamp(date("api_ts_utc"), "dd MMM yyyy HH:mm:ss").cast("timestamp")
date = date.withColumn("api_ts_utc",ts)
//var channelid = df3.select(explode($"output.value.value").as("facets"))
var facets = df3.select(explode($"output.value.value.facets.value").as("facet"))
val chn = facets.select(explode($"facet.value.name.value").as("epg_channel_id"))
val counts = facets.select(explode($"facet.value.results.value").as("chncount"))
val cnts = counts.select(explode($"chncount.value.uniqueCount.value").as("fonse_session_id"))
val res = Seq1L, "05/26/2016 01:01:01"), (2L, "$@@#".toDF("id", "dts")
val res1 =res.withColumn("api_ts_utc",date("api_ts_utc")).withColumn("metric_ts_utc",lit(1)).withColumn("epg_channel_id",chn("epg_channel_id")).withColumn("fonse_session_id",cnts("fonse_session_id")).withColumn("metric_ts_dt",lit(1)).show
df.select($"header.values.date" as "api_ts_utc",$"value.value.metadata.value.facet.value" as "epg_channel_id",count($"value.value.metadata.value.contents.value.contents.value.value.attribute.value" as "fonse_session_id_count")).show(2,false)
df.select($"value.value.metadata").printSchema
date.withColumn("api_ts_utc",uppercaseUdf($"api_ts_utc")).show
scala> val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
scala> val ts = unix_timestamp($"dts", "dd MMM yyyy HH:mm:ss").cast("timestamp") ts: org.apache.spark.sql.Column = cast(unixtimestamp(dts,dd MMM yyyy HH:mm:ss) as timestamp)
scala> val df = Seq1L, "26 May 2016 01:01:01"), (2L, "$@@#".toDF("id", "dts") df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
scala> val ts = unix_timestamp($"dts", "dd MMM yyyy HH:mm:ss").cast("timestamp") ts: org.apache.spark.sql.Column = cast(unixtimestamp(dts,dd MMM yyyy HH:mm:ss) as timestamp)
scala> df.withColumn("ts", ts).show(2, false) -------------------------------------------- |id |dts |ts | -------------------------------------------- |1 |26 May 2016 01:01:01|2016-05-26 01:01:01.0| |2 |$@@# |null | --------------------------------------------
import org.apache.spark.sql.functions.to_timestamp import org.apache.spark.sql.functions.unix_timestamp
def convertDate(date:String) = { val x= date.split(','); val ts = unix_timestamp(col(x(1).dropRight(4)), "dd MMM yyyy HH:mm:ss").cast("timestamp") ts }
val filePath = "file:///home/datalake/schema_file" new PrintWriter(filePath) { write(df1.schema.treeString); close }