Spark Day 2 - TechCruncher/SparkCode GitHub Wiki

PairRDD’s

val logdataall = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*.log")

val userids = logdataall.map(line ⇒ (line.split(" ")(2),1))

val idcount = userids.reduceByKey((v1,v2) ⇒ v1+v2)

idcount.take(5)

val logdataall = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*.log")

val userids = logdataall.map(line ⇒ (line.split(" ")(2),1))

val idcount = userids.reduceByKey((v1,v2) ⇒ v1+v2)

val revid = idcount.map{case (key,value) ⇒ (value,key)}

revid.sortByKey(false).top(10)

val logdataall = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*.log")

val userids = logdataall.map(line ⇒ (line.split(" ")(2),line.split(" ")(0)))

val iplist = userids.groupByKey()

iplist.take(5)

val logdataall = sc.textFile("file:/home/training/training_materials/sparkdev/data/weblogs/*.log")

val accountdata = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv")

val userids = logdataall.map(line ⇒ (line.split(" ")(2),1))

val idcount = userids.reduceByKey((v1,v2) ⇒ v1+v2)

val accountrdd = accountdata.map(line ⇒ (line.split(",")(0),line.split(",")))

val joineddata = accountrdd.join(idcount)

var record = ""

for (record ← joineddata.take(5)) println(record._1 + " " +record._2._2 + " " + record._2._1(3) + " " + record._2._1(4))

Challenges

1. val accountdata = sc.textFile("file:/home/training/training_materials/sparkdev/data/accounts.csv")

val accountrdd = accountdata.keyBy(line ⇒ line.split(",")(8))

2. val pdetails = accountrdd.map({ case(k,v) ⇒ (k, (v.split(",")(4),v.split(",")(3)))})

val pdetails = accountrdd.mapValues(v ⇒ (v.split(",")(4),v.split(",")(3)))

3. val postgrp = pdetails.groupByKey().sortByKey()

var record = "" var name = ""

for (record ← postgrp.take(5)){ println("\n-------" + record._1); for(name ← record._2) println(name._1 + "," + name._2)}

HDFS

val logs=sc.textFile("hdfs://localhost/user/training/weblogs/2014-03-08.log")

val jpglogs = logs.filter(x ⇒ x.contains(".jpg"))

jpglogs.saveAsTextFile("hdfs://localhost/user/training/jpgs")

Partitions

val act = sc.textFile("hdfs://localhost/user/training/activations/*")

val xmlrdd = act.mapPartitions(xml ⇒ getactivations(xml))

val modelrdd = xmlrdd.map(xml ⇒ getmodel(xml))

val countrdd = modelrdd.map(line ⇒ (line,1))

val reducerdd = countrdd.reduceByKey((v1,v2) ⇒ v1+v2)

val revid = reducerdd.map{case (key,value) ⇒ (value,key)}

revid.sortByKey(false).top(10)

CheckPointing

modelrdd.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

var mydata = sc.parallelize(Array(1,2,3,4,5))

for(i ← 1 to 200){ mydata = mydata.map(myInt ⇒ myInt + 1); if(i%3 ==0){ mydata.checkpoint(); mydata.count() }}

for(x ← mydata.collect()) println(x)

sc.setCheckpointDir("checkpoints")

spark-submit --class stubs.CountJPGs target/countjpgs-1.0.jar weblogs/*

spark-submit --class stubs.CountJPGs --master spark://localhost:7077 target/countjpgs-1.0.jar weblogs/*

spark-submit --class stubs.CountJPGs --master spark://localhost:7077 --name 'Count JPGs' target/countjpgs-1.0.jar weblogs/*

spark-submit --class stubs.CountJPGs --properties-file myspark.conf target/countjpgs-1.0.jar weblogs/*

⚠️ **GitHub.com Fallback** ⚠️