Spark Day3 - TechCruncher/SparkCode GitHub Wiki
package stubs
import org.apache.spark._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.Seconds
object StreamingLogs { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StreamingLogs <hostname> <port>") System.exit(1) } val hostname = args(0) val port = args(1).toInt
val ssc = new StreamingContext(new SparkConf(), Seconds(20)) val logStream = ssc.socketTextStream(hostname, port)
val kbdocs = logStream.filter(line => line.contains("KBDOC"))
kbdocs.foreachRDD(rdd => println("Number of KB requests: " + rdd.count()))
kbdocs.saveAsTextFiles("file:/home/training/streaming_test/kblogs.txt");
ssc.checkpoint("checkpoints") kbdocs.countByWindow(Seconds(10), Seconds(2)).print()
ssc.start() ssc.awaitTermination() } }
spark-submit --class stubs.StreamingLogs --master local[2] target/streamlog-1.0.jar localhost d1234
var logs = sc.textFile("weblogs/*")
var htmlwords = sc.accumulator(0) var csswords = sc.accumulator(0) var jpgwords = sc.accumulator(0)
val res = logs.map(line ⇒ {if(line.contains(".html")){htmlwords +=1};if(line.contains(".css")){csswords +=1};if(line.contains(".jpg")){jpgwords +=1}})
res.count()
println("HTML files: "+ htmlwords + "\nCSS files: "+ csswords + "\nJPG files: "+ jpgwords)
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movie
sqoop import --connect jdbc:mysql://localhost/movielens --username training --password training --fields-terminated-by '\t' --table movierating
val movies = sc.textFile("movie/*")
val movieratings = sc.textFile("movierating/*")
val mdata = movies.map(line ⇒ (line.split("\t")(0),line.split("\t")(1)))
val mratings = movieratings.map(line ⇒ (line.split("\t")(1),(line.split("\t")(2)).toInt))
val joined = mdata.join(mratings)
val tran = joined.map(line ⇒ (line._2._1,line._2._2))
val grped = tran.groupByKey()
val avg = grped.map{case (k,v) ⇒ (k,(v.sum.toDouble/v.size.toDouble))}
avg.take(10).foreach(println)
val invmdata = mdata.map(line ⇒ line.swap)
val joinavg = invmdata.join(avg)
val res = joinavg.map{case (k,v) ⇒ (v._1,k,v._2)}
res.saveAsTextFile("movieavgs")
avg.take(10).foreach(println)