Howto DataSets - animeshtrivedi/notes GitHub Wiki
Find min and max values
scala>input.agg(min("value"), max("value")).show
Filter out values or null condition
scala> orc.filter(orc.col("ss_sold_date_sk").isNull).count
Sum of a column in a dataset
scala> daatset.aggr(sum("col_name")).show
How to rename column in DS
val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)
How to make alias for an specific column
df.select($"_1".alias("x1"))
For multiple columns
val lookup = Map("_1" -> "foo", "_3" -> "bar")
df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
or withColumnRenamed:
df.withColumnRenamed("_1", "x1")
which use with foldLeft
to rename multiple columns:
lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))
ref: http://stackoverflow.com/questions/35592917/renaming-column-names-of-a-data-frame-in-spark-scala
How to add a new column with index in a range from [0, n -1]
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// your input data set
val input = spark.read.parquet("/sql/data1.pq")
// your input data set with zip index
val inputWithZipIndex = input.rdd.zipWithIndex
// now we need to convert it back to a Dataframe[Row], currently it is in Tuple2(T, Long). And while doing so, we can add a separate column with the zip index and new schema
// new schema = old schema + a new column
val newSchema = StructType(input.schema.fields :+ StructField("myIndex", LongType, false))
val inputRowRdd = inputWithZipIndex.map{case (row, zipIndex) => Row.fromSeq(row.toSeq :+ zipIndex)}
// now we convert the RDD back to a dataframe
val inputWithIndex = spark.createDataFrame(inputRowRdd, newSchema)
// now at this point we have a data set with a new column named "myIndex" which goes from [0, elements_in_rdd - 1]
How to check is a DS is cached or not
scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-caching.html