prob and solutions - veeraravi/Spark-notes GitHub Wiki
Recursively Joining DataFrames
Below is the sample code to join two dataframes on different column keys; and join multiple dataframes on the same column key.
join {
join-keys = ["id", "accountid"]
join-type = "inner"
}
class Joiner(joinConfig: JoinConfig) {
val joinType = joinConfig.joinType
val keyTuple = joinConfig.keys match {
case List(left, right) = (left, right)
case _ =
throw new IllegalArgumentException("Should be two key columns")
}
def joinTwoDFs(leftDf: DataFrame, rightDf: DataFrame): DataFrame = {
JoinType(joinType) match {
case RightOuter | LeftOuter | Inner | LeftSemi =
leftDf
.join(rightDf, leftDf(keyTuple._1) === rightDf(keyTuple._2), joinType)
.drop(rightDf(keyTuple._2))
case _ =
throw new IllegalArgumentException(s"Unsupported join type '$joinType'. ")
}
}
/**Join multiple dfs on the same column key**/
def joinMultiDFs(dfs: Seq[Seq[DataFrame]], commonKey: String): DataFrame = {
dfs.flatten
.reduceLeft(joinTwoDFs(_, _))
}
}
Extracting the same columns from a list of Dataframs
We have a list of dataframes, which have only one row, with the same schema.
If we want to extract two specific columns from all dataframes then return values in two list.
? 1 2 3 4 5 6 7 val listPair: (List[String], List[String]) = headerDfs.map{ header= val row = header .select(field1, field2) .head (row(0).toString, row(1).toString) }.unzip[String, String]
Another approach is to union all dataframes, then do the select columns.