RDD的操作 - twilighthook/BigDataNote GitHub Wiki
RDD主要分成兩種操作,轉換(Transformation)和行動(Actions)操作。轉換操作指的是對已存在的RDD或資料操作,並回傳一個新的RDD( 例如map()或是filter() ),行動操作則是做檔案中的運算( 例如first()或是count() ),並回傳RDD以外的資料型態。 對於Spark來講處理轉換和操作機制差別很大,有些會關係到資源被占用的多寡,所以分別兩者之間的差異是蠻重要的。
RDD建立
用既有數列生成RDD
val num = sc.parallelize(1 to 10 , 3)
建立1~10的數列,並指定分布在3個分布點,第二個參數可省略
用既有檔案生成RDD(以hdfs為例)
val file = sc.textFile("hdfs://localhost/~")
轉換(Transfomations)操作
轉換操作是對既有的資料或RDD操作並回傳RDD,例如下者:
val inputRDD = sc.textFile("text.txt")
val filterRDD = inputRDD.filter( line => line.contains("temperature") )
inputRDD, filterRDD都是RDD型態的配置
其他也有一些function回傳的是RDD
- map(func):經由func函式處理後,回傳新的數據結構
- flatMap(func):和map()相似,但回傳數據只有扁化後的型態,因為是先做map在做flatten
val input : {"hello world" , "rdd"}
經過filter(line => line.split(" "))<br>
map回傳{ [ "hello" , "world" ] , [ "rdd" ] }<br>
flatMap回傳{ "hello" , "world" , "rdd" }
- filter(func):經由func函式,找到對應為true的資料回傳
val test1 = List.range(1, 10)
val test2 = test.filter(_ % 2 == 0)
test2 = (2,4,6,8)
- sample(withReplacement, frac, seed):根據給的seed,回傳其中的frac數據 withReplacement:樣本是否可以重複抽取而重複、frac:取樣比例、seed:亂數種子
val test = input.sample(false , 0.5)
test會得到input隨機50%左右的數據(因為是用 Bernoulli sampling的算法,不能確定會取樣多少個,每個樣本都是50%被抽取機率)
- takeSample(withReplacement, num, seed) withReplacement:樣本是否可以重複抽取而重複、num:取樣數量、seed:亂數種子 在ETL數據要進行先行測試時可以使用,避免浪費大量轉換、抽取時間
val test = input.sample(false , 100)
test會取到100個input中的數據,並回傳陣列型態
- union() 會進行RDD的聯集
a:{1,2.3}
b:{3,4,5}
a.union(b) : {2,3,1,3,4,5}
- groupByKey([taskNum]) tackNum : 可傳入任務數量 可以將數組依據相同的key進行value合併
- reduceByKey(func , [taskNum]) 可以將數組依據相同的key進行value的function操作
- reduceByKeyLocally() 和上面的reduceByKey相似,但回傳的是MAP型態並不是RDD,對於避免分布式資料的負荷可以使用此函式
- join(otherDataset , [taskNum]) 加入其他資料集,(K,M).join((K,N)) = (K,(M+N)),並且依照key合併
- groupWith(otherDataset , [taskNum]) (K,M).join((K,N)) = (K,Seq(M),Seq(N))
- cartesian(otherDataset) 算出兩個資料集的笛卡爾積
動作(Actions)操作
- collect() 以collection的方式回傳物件,通常皆在filter之使用
- take(n) 取出數列前N個,可超過數列數量(只回傳上限)
- first() 回傳數列第一個
- saveAsTextFile(path) 存數據用toString轉換後到指定位置,可支援hdfs
- saveAsSequenceFile(path) 存數據到指定位置,只限定key-value型態,可支援hdfs
Value或是Key-Value的Transformation運算不會提交Job Action算式會觸發SparkContext並提交Job
資料集處理範例
map1 = Map(1 -> 2 , 2 -> 4 , 3 -> 8)
map2 = Map(1 -> 3 , 2 -> 5 , 4 -> 2)
依照相同key做sum
(map1.toList ++ map2.toList).groupBy(_._1).map(case(k,v) => k -> v.map(_._2).sum)
(map1.toSeq ++ map2.toSeq).groupBy(_._1).map(case(k,v) => k -> v.map(_._2).sum)