spark csv support - veeraravi/Spark-notes GitHub Wiki
https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe https://stackoverflow.com/questions/40959655/adding-two-columns-to-existing-dataframe-using-withcolumn
import org.apache.spark.sql.functions._ val newsdf = sdf.withColumn("make", when(col("make") === "Tesla", "S").otherwise(col("make")))
val rdd = sc.parallelize( List( (2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt")) ) val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._
val dataframe = rdd.toDF()
dataframe.foreach(println)
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1
Row(row(0),make,row(2))
}).collect().foreach(println)
import org.apache.spark.sql.functions._ val sqlcont = new org.apache.spark.sql.SQLContext(sc)
val inputfile = val df1 = sqlcont.jsonRDD(sc.parallelize(Array( """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""", """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""", """{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}""" )))
val makeSIfTesla = udf {(make: String) ⇒ if(make == "Tesla") "S" else make } val blanktoVeera = udf {(blank: String) ⇒ if(blank.length ==0) "veera" else blank } val newdf = df1.withColumn("make", makeSIfTesla(df1("make"))).withColumn("blank", blanktoVeera(df1("blank"))) newdf.show
import org.apache.spark.sql.functions._ val sqlcont = new org.apache.spark.sql.SQLContext(sc) val inputfile = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "false").option("dateFormat", "yyyyMMdd").load("/home/datalake/emp.csv")
val datecheck = udf {(strDate: String) ⇒ if(strDate.equals("00000000")) "null" else strDate } val newdf = inputfile.withColumn("excl_date", datecheck(inputfile("excl_date"))).withColumn("rein_date", datecheck(inputfile("rein_date"))).withColumn("waiver_date", datecheck(inputfile("waiver_date"))).withColumn("npi", datecheck(inputfile("npi")))
newdf.show() newdf.write.format("com.databricks.spark.csv").option("header", "true").save("outputpath")
====alternatve=== def parseDate(d:String):Date = { val format = new java.text.SimpleDateFormat("yyyyMMdd") format.parse(d) }
====alternate2===
newdf.withColumn("excel_date", to_date(unix_timestamp(