Spark中的combineByKey範例 - twilighthook/BigDataNote GitHub Wiki

combineByKey算是一個用途蠻多的函式,比較麻煩的是它的參數較多 以一個RDD要從RDD[(K,V)] => RDD[(K,C)]為例

def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)] = {
}

createCombiner:指的是在合併後要轉成何種型態 mergeValue: 表示兩個key-value之間的合併方式 mergeCombiners: 表示計算後的整合方式


例子1:算平均值

val result = input.combineByKey(
(v) => (v,1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case(key, value) => (key, value._1 / value._2.toFloat) }
  1. 建立計數器
  2. 做數組間的加總
  3. 做各結果之間的加總

例子2:計算果汁量

case class Juice(volumn: Int) {
    def add(j: Juice):Juice = Juice(volumn + j.volumn)
}
case class Fruit(kind: String, weight: Int) {
    def makeJuice:Juice = Juice(weight * 100)
}
val apple1 = Fruit("apple", 5)
val apple2 = Fruit("apple", 8)
val orange1 = Fruit("orange", 10)
     
val fruit = sc.parallelize(List(("apple", apple1) , ("orange", orange1) , ("apple", apple2))) 
val juice = fruit.combineByKey(
    f => f.makeJuice,
    (j:Juice, f) => j.add(f.makeJuice),
    (j1:Juice, j2:Juice) => j1.add(j2) 
)