Spark - studiofu/brain GitHub Wiki

Quick Start

export PYSPARK_DRIVER_PYTHON=ipython

bin/pyspark #python shell
bin/spark-shell #scala shell

RDD Basic

var lines = sc.textFile("README.md")
var words = lines.flatMap( x => x.split(" "))
var counts = words.map(word => (word,1).reduceByKey( (x,y) => x+y)
counts.saveAsTextFile("output")

Create RDD - Simple For Testing

var words = sc.parallelize(List("a","b"))
var a = sc.parallelize(List(1,2,3,4,5,6,7))
var b = a.map(x=>x+1)
var c = a.reduce((acc,value)=>acc+value) # reduce is the most simplifed version.
var d = a.fold(0)((acc,value)=>acc+value) # fold is simplified version of aggregrate
val data = sc.parallelize(List(1,2,3,4,5,6))
var result = data.fold(10)(_+_) # it is 111. (1+2+3+4+5+6) + 10* num of partitions, i.e. 8 and + 10 which is the combiner

/* for the aggregrate (combine serveral result from nodes) => google tutorial */

var nums = List(1,2,3,4,5,6,7)
val v = nums.aggregate(0) ( (acc,number) => acc + number,  (p1, p2) => (p1+p2)) # p1 and p2 is to merge the value calculated from different partition
val (x,y) = nums.aggregate(0,0)( (acc,number) => (acc._1 + number, acc._2 + 1), (p1,p2)=>(p1._1 + p2._1, p1._2+p2._2))

Create Pair RDD

var a = sc.parallelize(List(1,2,3,4,5,1,2))
var b = a.map(a => (a, 1))
var c = b.groupByKey.collect
var d = b.groupByKey.toSeq.sortBy(_._2) # it's x => x._2
var e = d.last # the most

Update Tuple

var a = sc.parallelize(List(1,2,3,4,5,1,2))
var b = a.map(x=> (x,1))
var c = b.mapValues(x =>(1,2,3))
var d = c.mapValues(x => {
    var x1 = x.copy(_1=99)
    x1})

Append to Set and Array

var a = Array(3)
var b = a :+ 999
var c = Set()
var d = c ++ Array(10,20)

GroupBy

val words = Array("one", "two", "two", "three", "three", "three")
words.reduce(_+" "+_)
val wp = words.map(x=>(x,1))
wp.groupBy(x=>x)
wp.groupBy(x=>x._1)
val wpRdd = sc.parallelize(wp)
wpRdd.groupByKey.collect.map(x=>(x._1,x._2.sum))

Combine List

List(1,2,3) ::: List(4,5,6)
Set() : ++ List(1,2,3)

SortBy

val a =sc.parallelize(List("joe","doris","john","vis","mary"))
val b =sc.parallelize(1 to 5)
val simpleKV=a.zip(b)
simpleKV.sortByKey().collect
simpleKV.sortBy(x=>x._2,false).collect

CombineByKey

val data =sc.parallelize(List(("a",1),("a",2),("a",3),("b",10),("b",20)))
val res= data.combineByKey( (v)=>(v,1),
    (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),
    (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))
res1: Array[(String, (Int, Int))] = Array((a,(6,3)), (b,(30,2)))
res.map{case(key,value)=>(key,value._1/value._2.toFloat)}.collect
 Array[(String, Float)] = Array((a,2.0), (b,15.0))

Join - Outer Join

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
var rdd3 = rdd1.cogroup(rdd2)

Resources

Spark Tutorial from ithelp

https://ithelp.ithome.com.tw/articles/10198318

Spark in Scala from ithelp

https://ithelp.ithome.com.tw/articles/10186152

Broadcast

https://www.kancloud.cn/kancloud/spark-internals/45238

Scala Set

http://www.runoob.com/scala/scala-sets.html

Match Case

http://cuipengfei.me/blog/2013/12/29/desugar-scala-8/

Scale For Yield

https://blog.csdn.net/qq_36330643/article/details/76227051