spark core examples: - veeraravi/Spark-notes GitHub Wiki

scala> months.collect res2: Array[(String, Int)] = Array((a,100), (b,200), (a,100), (b,200))

reduceByKey() Vs aggregateByKey() Vs groupByKey

aggregateByKey()

scala> val months = sc.parallelize(List(("a",100),("b",200),("a",100),("b",200))) months: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at :27

scala> months.aggregateByKey(0)((a,b)=> a+b ,(x,y)=> x+y) res6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at aggregateByKey at :30

scala> months.aggregateByKey(0)((a,b)=> a+b ,(x,y)=> x+y).collect res7: Array[(String, Int)] = Array((b,400), (a,200))

List((a,100), (b,200), (a,100), (b,200)) ===> List((a , List(100,100),(b , List(200,200))

scala> months.aggregateByKey(ListAny)( (a,b) => a:::b::Nil, (x,y) => x ::: y).collect() res11: Array[(String, List[Any])] = Array((b,List(200, 200)), (a,List(100, 100)))

scala> months.aggregateByKey(ListAny)( (a,b) => a:::b::Nil, (x,y) => x ::: y).collect().toMap res12: scala.collection.immutable.Map[String,List[Any]] = Map(b -> List(200, 200), a -> List(100, 100))

// Bazic aggregateByKey example in scala // Creating PairRDD studentRDD with key value pairs val studentRDD = sc.parallelize(Array( ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3)

//Defining Seqencial Operation and Combiner Operations //Sequence operation : Finding Maximum Marks from a single partition def seqOp = (accumulator: Int, element: (String, Int)) => if(accumulator > element._2) accumulator else element._2

//Combiner Operation : Finding Maximum Marks out Partition-Wise Accumulators def combOp = (accumulator1: Int, accumulator2: Int) => if(accumulator1 > accumulator2) accumulator1 else accumulator2

//Zero Value: Zero value in our case will be 0 as we are finding Maximum Marks val zeroVal = 0 val aggrRDD = studentRDD.map(t => (t._1, (t._2, t._3))).aggregateByKey(zeroVal)(seqOp, combOp)

//Check the Outout aggrRDD.collect foreach println

// Output // (Tina,87) // (Thomas,93) // (Jackeline,83) // (Joseph,91) // (Juan,69) // (Jimmy,97) // (Cory,71)

/////////////////////////////////////////////////////// // Let's Print Subject name along with Maximum Marks // ///////////////////////////////////////////////////////

//Defining Seqencial Operation and Combiner Operations def seqOp = (accumulator: (String, Int), element: (String, Int)) => if(accumulator._2 > element._2) accumulator else element

def combOp = (accumulator1: (String, Int), accumulator2: (String, Int)) => if(accumulator1._2 > accumulator2._2) accumulator1 else accumulator2

//Zero Value: Zero value in our case will be tuple with blank subject name and 0 val zeroVal = ("", 0)

val aggrRDD = studentRDD.map(t => (t._1, (t._2, t._3))) .aggregateByKey(zeroVal)(seqOp, combOp) //Check the Outout aggrRDD.collect foreach println

// Check the Output // (Tina,(Biology,87)) // (Thomas,(Physics,93)) // (Jackeline,(Biology,83)) // (Joseph,(Chemistry,91)) // (Juan,(Physics,69)) // (Jimmy,(Chemistry,97)) // (Cory,(Chemistry,71))

reduceByKey()

scala> months.reduceByKey(+).collect() res4: Array[(String, Int)] = Array((b,400), (a,200))

scala> months.reduceByKey(+).collect() res8: Array[(String, Int)] = Array((b,400), (a,200))

groupByKey()

scala> months.groupByKey().collect() res9: Array[(String, Iterable[Int])] = Array((b,CompactBuffer(200, 200)), (a,CompactBuffer(100, 100)))

scala> val nosRdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",5),("b",5),("b",5)),5) nosRdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at parallelize at :27

scala> nosRdd.getNumPartitions res25: Int = 5

scala> nosRdd.reduceByKey{case(x,y) => x + (y+5) }.collect res26: Array[(String, Int)] = Array((a,16), (b,25))

scala> val nosRdd = sc.parallelize(List(("a",1),("a",2),("a",3),("b",5),("b",5),("b",5),("c",5),("c",5)),5) nosRdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at :27

scala> nosRdd.reduceByKey{case(x,y) => x + (y+5) }.collect res27: Array[(String, Int)] = Array((a,16), (b,25), (c,15))

scala> nosRdd.collect() res28: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (b,5), (b,5), (b,5), (c,5), (c,5))

scala> nosRdd.reduceByKey(+).collect() res29: Array[(String, Int)] = Array((a,6), (b,15), (c,10))

scala> nosRdd.aggregateByKey(0)( (a,b)=> a+b , (x,y) => x+y) res30: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at aggregateByKey at :30

scala> nosRdd.aggregateByKey(0)( (a,b)=> a+b , (x,y) => x+y).collect res31: Array[(String, Int)] = Array((a,6), (b,15), (c,10))

scala> nosRdd.groupByKey().collect() res32: Array[(String, Iterable[Int])] = Array((a,CompactBuffer(1, 2, 3)), (b,CompactBuffer(5, 5, 5)), (c,CompactBuffer(5, 5)))

scala> nosRdd.groupByKey().map{case(x,y)=> x.y._2.sum}.collect :30: error: value y is not a member of String nosRdd.groupByKey().map{case(x,y)=> x.y._2.sum}.collect ^

scala> nosRdd.groupByKey().map{case(x,y)=> (x,y._2.sum)}.collect :30: error: value _2 is not a member of Iterable[Int] nosRdd.groupByKey().map{case(x,y)=> (x,y._2.sum)}.collect ^

scala> nosRdd.groupByKey().map{case(x,y)=> (x,y.sum)}.collect() res35: Array[(String, Int)] = Array((a,6), (b,15), (c,10))

scala>

========================================================================= import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

object Demo1 { def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)

val logFile = "/user/cloudera/cca175/spark/data/input/student_basic.txt"	

val logData = sc.textFile(logFile, 2).cache()

val total = logData.flatMap(_.split(",")).map(x=> 1).reduce( (a,b) => (a+b))

println("Total no.of words in the given file is " + total)

} }

import org.apache.spark.SparkContext import org.apache.spark.SparkConf

object RddAggregation {

def main(args: Array[String]){

val conf = new SparkConf().setAppName("RddAggregation") val sc = new SparkContext(conf)

case class marks(sid:Int, subj1:Int, subj2:Int, subj3:Int) case class report(sid:Int, total:Int, avg:Double)

val inputRdd = sc.textFile("/user/cloudera/cca175/spark/data/input/student_marks.txt").map{ x=> val y = x.split(",") new marks(y(0).toInt,y(1).toInt,y(2).toInt,y(3).toInt) }

inputRdd.foreach{ case(stu) => println("sid = " + stu.sid + "subj1 = " + stu.subj1 + "subj2 = " + stu.subj2 + "subj3 = " + stu.subj3)
}

inputRdd.foreach{ case(stu) => val total = stu.subj1 + stu.subj2 + stu.subj3 val avg = total / 3 println("SID = " + stu.sid + " Total = " + total + " Average = " + avg) }

} }

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

object RddFilterDemo{

def main(args: Array[String]){

val conf = new SparkConf().setAppName("RddFilterDemo")
val sc = new SparkContext(conf)

case class basic(sid:Int, sname:String, sage:Int, ssex:String){
override def toString = s"$sid,$sname,$sage,$ssex"
}
case class address(sid:Int, scity:String, szip:Int, sstate:String){
override def toString = s"$sid,$scity,$szip,$sstate"
}

val basicRdd ={ sc.textFile("/user/cloudera/cca175/spark/data/input/student_basic.txt").
             map{x=> val y = x.split(","); new basic(y(0).toInt, y(1),y(2).toInt, y(3))}} 
			 
val addressRdd = {sc.textFile("/user/cloudera/cca175/spark/data/input/student_address.txt").
             map{x=> val y = x.split(","); new address(y(0).toInt, y(1),y(2).toInt, y(3))}}


val maleRdd = basicRdd.filter(_.ssex == "Male")
val femaleRdd = basicRdd.filter(_.ssex == "Female")

maleRdd.saveAsTextFile("/user/cloudera/cca175/spark/data/output/maleStudents")
femaleRdd.saveAsTextFile("/user/cloudera/cca175/spark/data/output/femaleStudents")

println("Total No.of Male Students = " + maleRdd.count())
println("Total No.of Female Students = " + femaleRdd.count())

}

} ================================================================================. import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

import org.apache.log4j.Logger import org.apache.log4j.Level

object RddMap { def main(args:Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) Logger.getLogger("akka").setLevel(Level.ERROR)

val conf = new SparkConf().setAppName("RddMap")
val sc = new SparkContext(conf)

val input:Array[Int] = Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)
val inputRdd = sc.parallelize(input)

    val increase10 = (x:Int) => x+10

    val toPair = (x:Int) => (x,10)
    
    inputRdd.map(increase10).foreach(println)

    println("Key-Value Pair")
    inputRdd.map(toPair).foreach(println)


    val monthsRdd = sc.parallelize(List("January","February","March","April","May","July","August","September","October","November","December"))
  
    val getLength = (str:String) => str.length()

    monthsRdd.map(getLength).foreach(println)

sc.stop()

} } ================================================================================. import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

import org.apache.log4j.Logger import org.apache.log4j.Level

object RddParallelize { def main(args:Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) Logger.getLogger("akka").setLevel(Level.ERROR)

val conf = new SparkConf().setAppName("RddParalleize")
val sc = new SparkContext(conf)

val input:Array[Int] = Array(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15)
val inputRdd = sc.parallelize(input)

    println("inputRDD")
inputRdd.foreach(println)

    println("Multiples of 5 only ")
inputRdd.filter(_%5 == 0).foreach(println)

    println("Multiplied by 10")
inputRdd.map(x=> x*10).foreach(println)

    println("Key-Value Pair ")
inputRdd.map{x=> (x,10000)}.foreach(println)

val sum = inputRdd.reduce(_+_)
    println("Sum is " + sum)	


    def add10(n:Int):Int = n+10

    val increase10 = (n:Int) => n+10

    println("add10() ")
    inputRdd.map(add10).foreach(println)

    println("Increase10")
    inputRdd.map(increase10).foreach(println)

   

sc.stop()

} } ================================================================================. import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

object RddReduceByKeyOrderItems {

def main(args: Array[String]){

    val conf = new SparkConf().setAppName("RddReduceByKeyOrderItems")
val sc  = new SparkContext(conf)

val orderItemsRdd = sc.textFile("/user/cloudera/cca175/retail_db/order_items")

println("Total No.of Orders in the Order_items.txt are ")

val results = orderItemsRdd.map{x=> (x.split(",")(1).toInt, x.split(",")(3).toInt)}.reduceByKey(_+_).sortByKey(true).saveAsTextFile("/user/cloudera/cca175/spark/data/output/order_items_grouped_by_orders_id")
    //results.foreach{case(x,y)=> println("Order ID " + x + " have " + y + " items")}

    sc.stop()

}

} ================================================================================.

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

import org.apache.log4j.Logger import org.apache.log4j.Level

object RddSumOfNumbers {

def main(args:Array[String]){

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("RddSumOfNumbers")
val sc = new SparkContext(conf)

val numbersRdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
    val result = numbersRdd.reduce(_+_)
    println("Sum of 1 to 10 by using reduce() " + result)

    val sumAcc = sc.accumulator(0,"Sum")
    numbersRdd.foreach{ x=> sumAcc+= x}
           
println("Sum Of 1 to 10 by using Accumulator " + sumAcc)

    val sumAggregate = numbersRdd.aggregate(0)(_+_, _+_)
    val sumAggregate5= numbersRdd.aggregate(5)(_+_, _+_)
    println("Sum Of 1 to 10 by using aggregate() " + sumAggregate)
    println("Sum Of 1 to 10 by using aggregate(5) " + sumAggregate5)
    
    sc.stop()

} }

================================================================================.

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

object RddUnionDemo { def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)

val rddA = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
val rddB = sc.parallelize(List(11,12,13,14,15),3)
val rddC = sc.parallelize(List(16,17,18,19,20),4)

rddA.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/numbers_01_To_10")
rddB.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/numbers_11_To_15")
rddC.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/numbers_16_To_20")

//It will create a file with 10 partitions since the input RDDS have 3,3 and 4 respectively
rddA.union(rddB.union(rddC)).saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/numbers_1_To_20")

} } ================================================================================. import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.log4j.{Logger, Level}

object Rdd_products_not_in_order_items { def main(args:Array[String]){

    Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("RddProudctsNotInOrderItems")
val sc = new SparkContext(conf)

val products = sc.textFile("/user/cloudera/cca175/retail_db/products")
val orderItems = sc.textFile("/user/cloudera/cca175/retail_db/order_items")

    val productsRdd = products.map{x=> (x.split(",")(0).toInt,0)}.distinct
val orderItemsRdd  = orderItems.map{x=> (x.split(",")(2).toInt,0)}.distinct

val joinedRdd = productsRdd.leftOuterJoin(orderItemsRdd)

val onlyInProducts = joinedRdd.filter{case(k,(y,z))=> z == None}.map{case(k,(y,z)) => k} 
    println(onlyInProducts.count() + " products are not present in the Order_items")
    
    onlyInProducts.saveAsTextFile("/user/cloudera/cca175/spark/data/output/products_not_in_order_items")

sc.stop()

} } ================================================================================. import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf

object SimpleWordCountDemo {

def main(args: Array[String]){

val conf = new SparkConf().setAppName("SimpleWordCountDemo")
val sc = new SparkContext(conf)

val inputRdd = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_basic.txt")

val wordRdd = inputRdd.flatMap(_.split(",")).map(x=> (x,1)).reduceByKey(_ + _)

wordRdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/student_basic_wordCount")

}

} ================================================================================. val prodFile = "/user/email2dgk/review/spark/data/prod_sales_rows.txt" val prodKvRdd = sc.textFile(prodFile).map{x=> val z= x.split(","); (z(0), z(1).toInt)}

val prodGroupedRdd= prodKvRdd.aggregateByKey((0,0))((x,y)=> (x._1 + y, x._2 +1), (x,y) => (x._1 + y._1, x._2 + y._2)) prodGroupedRdd.foreach{ case(p,(t,c)) => val avg = t.toInt/ c.toInt; println(s"Product: $p Average $avg")}

val prodFile = "/user/email2dgk/review/spark/data/prod_sales_rows.txt" val prodKvRdd = sc.textFile(prodFile).map{x=> val z= x.split(","); (z(0), z(1).toInt)}

val prodCountRdd = prodKvRdd.map{case(p,s) => (p,1)}.reduceByKey(+) val prodSalesRdd = prodKvRdd.reduceByKey(+) prodSalesRdd.leftOuterJoin(prodCountRdd)

prodSalesRdd.leftOuterJoin(prodCountRdd).foreach{ case(k,(s,someC))=> val count = someC.fold(1)(_.toInt); val avg = s.toInt/count.toInt; println(k + ":"+ avg)}

from pyspark import SparkConf,SparkContext,SQLContext

orders ================================================================================. case class basic(sid:Int, sname:String, sage:Int, ssex:String){ override def toString = s"$sid,$sname,$sage,$ssex" } case class address(sid:Int, scity:String, szip:Int, sstate:String){ override def toString = s"$sid,$scity,$szip,$sstate" }

val basicRdd ={ sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_basic.txt"). map{x=> val y = x.split(","); new basic(y(0).toInt, y(1),y(2).toInt, y(3))}}

val addressRdd = {sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_address.txt"). map{x=> val y = x.split(","); new address(y(0).toInt, y(1),y(2).toInt, y(3))}}

val basicPairRdd = basicRdd.map{x=> (x.sid, x)} val addressPairRdd = addressRdd.map{x=> (x.sid, x)}

val basicRDD ={ sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_basic.txt"). map{x=> val y = x.split(","); (y(0).toInt, (y(0).toInt, y(1),y(2).toInt, y(3)))}}

val addressRDD = {sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_address.txt"). map{x=> val y = x.split(","); (y(0).toInt, (y(0).toInt, y(1),y(2).toInt, y(3)))}}

basicRDD.join(addressRDD).collect()

basicRDD.leftOuterJoin(addressRDD).collect()

val maleRdd = basicRdd.filter(.ssex == "Male") val femaleRdd = basicRdd.filter(.ssex == "Female") maleRdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/maleStudents") femaleRdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/femaleStudents")

================================================================================. //Load data from HDFS

val inputRDD = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_basic.txt")

inputRDD.count()

inputRDD.foreach(println)

//store results back to HDFS using Spark

inputRDD.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output")

==================================================================================================================== case class orders(order_id:Int, order_date:String, order_customer_id:Int, order_status:String) case class order_items(order_item_id:Int, order_item_order_id:Int, order_item_product_id:Int, order_item_quantity:Int, order_item_subtotal:Double, order_item_product_price:Double)

val orderItemsFile = "/user/email2dgk/review/mysql/data/retail_db/order_items" val orderItemsRdd = sc.textFile(orderItemsFile).map(_.split(","))

val orderItemsRDD = orderItemsRdd.map(x=> new order_items(x(0).toInt,x(1).toInt,x(2).toInt,x(3).toInt, x(4).toDouble,x(5).toDouble))

val totalPerOrderId = orderItemsRdd.map(x=> (x(1).toInt, x(4).toDouble)).reduceByKey(+)

val ordersFile = "/user/email2dgk/review/mysql/data/retail_db/orders" val orderRdd= sc.textFile(ordersFile).map(_.split(",")) val ordersRDD = orderRddmap(x=> new orders(x(0).toInt,x(1),x(2).toInt, x(3).toInt))

val orderItemsKvRdd = orderItemsRDD.map(x=> (x.order_item_order_id, x)) val ordersKvRdd = ordersRDD.map(x=> (x.order_id, x))

val joinedRdd = ordersKvRdd.leftOuterJoin(orderItemsKvRdd) ================================================================================. prod100,200 prod100,200 prod100,200 prod200,100 prod200,100 prod200,100 prod200,100 ================================================================================. val rddA = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3) val rddB = sc.parallelize(List(11,12,13,14,15),3) val rddC = sc.parallelize(List(16,17,18,19,20),4)

rddA.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/numbers_01_To_10") rddB.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/numbers_11_To_15") rddC.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/numbers_16_To_20")

//It will create a file with 10 partitions since the input RDDS have 3,3 and 4 respectively rddA.union(rddB.union(rddC)).saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/numbers_1_To_20")

//It will create a file with 2 partitons since coalesce as 2 rddA.union(rddB.union(rddC)).coalesce(2).saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/numbers_1_To_20")

spark-submit --class "RddMap" --master local[2] /home/cloudera/myprojects/target/scala-2.10/simple-project_2.10-1.0.jar spark-submit --class "RddFlatMap" --master local[2] /home/cloudera/myprojects/target/scala-2.10/simple-project_2.10-1.0.jar spark-submit --class "RddSumOfNumbers" --master local[2] /home/cloudera/myprojects/target/scala-2.10/simple-project_2.10-1.0.jar spark-submit --class "Rdd_products_not_in_order_items" --master local[2] /home/cloudera/myprojects/target/scala-2.10/simple-project_2.10-1.0.jar ================================================================================. //wordCount

val inputRdd = sc.textFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/input/student_basic.txt")

val wordRdd = inputRdd.flatMap(.split(",")).map(x=> (x,1)).reduceByKey( + _)

wordRdd.saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/cca175/spark/data/output/student_basic_wordCount")

//Cumulative word count

inputRdd.flatMap(_.split(",")).map(x=> 1).reduce( (a,b) => (a+b)) Int=68

inputRdd.flatMap(.split(",")).map(x=> 1).fold(0)( + _) Int=68

=============================students.csv===================================================. eid,ename,estatecd 100,Albert,NC 110,Benjamin,NY 120,Charlie,SC 130,David,SC 140,Elvikis,CA 150,Frankie,CA 160,Ferdie,SC 170,Gabriel,NJ 180,Mark,FL 190,Mint,FL 200,Jess,NC 210,Jazz,SC 220,Jill,SC 230,Jim,NC 240,Miller,FL 250,Martin,NJ ================================================================================. import org.apache.spark.{SparkConf, SparkContext}

/**

  • Created by gdevaraj on 1/18/17. */ object order_items {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[2]").setAppName("order_items")
val sc  = new SparkContext(conf)

val orderItemsFile = "/Users/gdevaraj/pcdng/pcd-workflow/etl-pcd/src/test/scala/com/premierinc/renaissance/spark/exploreNew/input/order_items.txt"

val orderItemsRdd = sc.textFile(orderItemsFile).map(_.split(","))

val orderIdsRdd = orderItemsRdd.map{case(z) => (z(1).toInt, (z(1).toInt,z(4).toDouble))}.sortByKey(true)

orderIdsRdd.reduceByKey{
  case(x,y) => if (x._2 > y._2) x else y
}.foreach{case(x,y) => println(x + ":" + y._2)}

}

} ================================================================================. val ordersFile = "/user/cloudera/cca175/review/mysql/retail_db_all_tables/orders" val ordersRdd = sc.textFile(ordersFile)

ordersRdd.take(5)

val ordersKvStatusRdd = ordersRdd.map{x=> val z = x.split(","); (z(0).toInt, z(3)}

scala> ordersKvStatusRdd.map{case(x,y)=> (y,x)}.aggregateByKey(0)((a,v)=> a+1, (a,v) => a+v).collect() res6: Array[(String, Int)] = Array((PENDING_PAYMENT,15030), (PAYMENT_REVIEW,729), (PENDING,7610), (ON_HOLD,3798), (PROCESSING,8275), (SUSPECTED_FRAUD,1558), (COMPLETE,22899), (CANCELED,1428), (CLOSED,7556))

ordersKvStatusCustIdRdd = ordersRdd.map{x=> val z = x.split(","); ((z(2) + z(3), z(0)))}

scala> val ordersKvStatusCustIdRdd = ordersRdd.map{x=> val z = x.split(","); ((z(2) + z(3), z(0)))}

scala> ordersKvStatusCustIdRdd.aggregateByKey(0)((a,v)=> a+1, (a,v)=> a+v).collect()

scala> ordersKvStatusCustIdRdd.aggregateByKey(0)((a,v)=> a+1, (a,v)=> a+v).filter(_._1.contains("7664")).collect()

res8: Array[(String, Int)] = Array((7664PENDING,2), (7664PROCESSING,1), (7664ON_HOLD,1), (7664COMPLETE,2), (7664PENDING_PAYMENT,2), (7664CLOSED,2))

scala> ordersKvStatusCustIdRdd.map{case(x,y)=> (x,1)}.reduceByKey(+).filter(_._1.contains("7664")).collect() res9: Array[(String, Int)] = Array((7664PENDING,2), (7664PROCESSING,1), (7664ON_HOLD,1), (7664COMPLETE,2), (7664PENDING_PAYMENT,2), (7664CLOSED,2))

val ordersKvDateStatusRdd = ordersRdd.map{x=> val z = x.split(","); (z(1) + ","+z(3), z)}

scala> ordersKvDateStatusRdd.aggregateByKey(0)((a,v)=> a+1 ,(a,v)=> a+v).filter(_._1.contains("2013-07-25")).collect() res13: Array[(String, Int)] = Array((2013-07-25 00:00:00.0,PROCESSING,16), (2013-07-25 00:00:00.0,PAYMENT_REVIEW,3), (2013-07-25 00:00:00.0,PENDING,13), (2013-07-25 00:00:00.0,COMPLETE,42), (2013-07-25 00:00:00.0,ON_HOLD,5), (2013-07-25 00:00:00.0,CANCELED,1), (2013-07-25 00:00:00.0,SUSPECTED_FRAUD,2), (2013-07-25 00:00:00.0,CLOSED,20), (2013-07-25 00:00:00.0,PENDING_PAYMENT,41))

================================================================================. scala> val customersFile = "/user/cloudera/cca175/retail_db/customers"

scala> val customersRdd = sc.textFile(customersFile)

======================================================================================================================================================== //Find Out the stats of Customers per state

scala> val customersKvRdd = customersRdd.map(x=> (x.split(",")(0).toInt, x))

scala> customersKvRdd.map{case(x,y)=> (y.split(",")(7),0)}.aggregateByKey(0)((a,b)=>a+1, (a,b)=> a+b).sortBy{case(x,y)=> -y}.take(5) res10: Array[(String, Int)] = Array((PR,4771), (CA,2012), (NY,775), (TX,635), (IL,523))

mysql> select customer_state,count() from customers group by customer_state order by 2 desc LIMIT 5; +----------------+----------+ | customer_state | count() | +----------------+----------+ | PR | 4771 | | CA | 2012 | | NY | 775 | | TX | 635 | | IL | 523 | +----------------+----------+ 5 rows in set (0.01 sec)

//Find out the city of each State scala> customersRdd.map{x=> (x.split(",")(6), x.split(",")(7))}

scala> customersRdd.map{x=> (x.split(",")(7), x.split(",")(6))}.distinct.aggregateByKey(0)((a,b)=>a+1, (a,b)=> a+b).sortBy{case(x,y)=> -y}.collect() scala> customersRdd.map{x=> (x.split(",")(7), x.split(",")(6))}.distinct.aggregateByKey(0)((a,b)=>a+1, (a,b)=> a+b).sortBy(-_._2).collect() res24: Array[(String, Int)] = Array((CA,151), (TX,38), (NY,38), (IL,32), (OH,26), (NJ,25), (PR,22), (MI,21), (PA,18), (FL,17), (GA,17), (MD,16), (MA,15), (NC,14), (VA,11), (WA,10), (CT,9), (AZ,9), (TN,9), (CO,8), (MO,8), (OR,8), (IN,6), (HI,6), (WI,6), (UT,6), (SC,5), (LA,5), (NM,5), (NV,4), (KY,4), (KS,3), (OK,2), (AR,2), (MN,2), (DE,2), (ND,2), (RI,2), (WV,2), (IA,1), (DC,1), (ID,1), (AL,1), (MT,1))

mysql> select customer_state, count(distinct customer_city) from customers group by 1 order by 2 desc; +----------------+-------------------------------+ | customer_state | count(distinct customer_city) | +----------------+-------------------------------+ | CA | 151 | | TX | 38 | | NY | 38 | | IL | 32 | | OH | 26 | | NJ | 25 | | PR | 22 | | MI | 21 | | PA | 18 | | FL | 17 | | GA | 17 | | MD | 16 | | MA | 15 | | NC | 14 | | VA | 11 | | WA | 10 | | CT | 9 | | TN | 9 | | AZ | 9 | | OR | 8 | | MO | 8 | | CO | 8 | | HI | 6 | | WI | 6 | | UT | 6 | | IN | 6 | | NM | 5 | | LA | 5 | | SC | 5 | | NV | 4 | | KY | 4 | | KS | 3 | | AR | 2 | | RI | 2 | | ND | 2 | | DE | 2 | | OK | 2 | | MN | 2 | | WV | 2 | | DC | 1 | | MT | 1 | | AL | 1 | | ID | 1 | | IA | 1 | +----------------+-------------------------------+ 44 rows in set (0.03 sec)

customersRdd.map{x=> (x.split(",")(7), x.split(",")(6))}.distinct.aggregateByKey(0)((a,b)=>a+1, (a,b)=> a+b).filter(_._1=="TX").collect() res22: Array[(String, Int)] = Array((TX,38))

mysql> select customer_state, count(distinct customer_city) from customers where customer_state="TX"; +----------------+-------------------------------+ | customer_state | count(distinct customer_city) | +----------------+-------------------------------+ | TX | 38 | +----------------+-------------------------------+ 1 row in set (0.01 sec)

scala> customersRdd.map{x=> (x.split(",")(7), x.split(",")(6))}.distinct.countByKey().foreach(println)

//Note: the return type is Map(key -> value)

================================================================================. val categoriesFile = "/user/cloudera/cca175/retail_db/categories" val ordersFile = "/user/cloudera/cca175/retail_db/orders" val order_itemsFile = "/user/cloudera/cca175/retail_db/order_items" val productsFile = "/user/cloudera/cca175/retail_db/products" val departmentsFile = "/user/cloudera/cca175/retail_db/departments" val customersFile = "/user/cloudera/cca175/retail_db/customers"

val categoriesRdd = sc.textFile(categoriesFile) val ordersRdd = sc.textFile(ordersFile) val order_itemsRdd = sc.textFile(order_itemsFile) val productsRdd = sc.textFile(productsFile) val departmentsRdd = sc.textFile(departmentsFile) val customersRdd = sc.textFile(customersFile)

println("Total No.of categories " + categoriesRdd.count())
println("Total No.of orders " + ordersRdd.count())
println("Total No.of order_items " + order_itemsRdd.count())
println("Total No.of products " + productsRdd.count())
println("Total No.of departments " + departmentsRdd.count())
println("Total No.of customers " + customersRdd.count())

//Get all the fitness departments categories

val departmentsKvRdd = departmentsRdd.map{x=> val z= x.split(","); (z(0).toInt, (z(0).toInt, z(1)))} val fitnessDepartmentRdd = departmentsKvRdd.filter{case(x,y) => y._2 == "Fitness"}

val categoriesKvRdd = categoriesRdd.map{x=> val z=x.split(","); (z(0).toInt, (z(0).toInt, z(1).toInt,z(2)))} val categoriesDeptIdKvRdd = categoriesKvRdd.map{case(x,y) => (y._2, y)}

categoriesDeptIdKvRdd.filter{case(x,y) => (x ==2)}.count() // returns 8 categoriesDeptIdKvRdd.filter{_._2._2 == 2}.count() // returns 8

val fitnessDeptCategoryRdd = fitnessDepartmentRdd.join(categoriesDeptIdKvRdd) fitnessDeptCategoryRdd.count() //Returns 8

====================================================================================================================================================== //Get the counts of No.of Categories per Departments val categoriesKvGroupRdd = categoriesDeptIdKvRdd.map{case(x,y)=> (x,1)}.reduceByKey(+) Array[(Int, Int)] = Array((4,6), (8,10), (5,7), (6,12), (2,8), (3,8), (7,7))

//By Using .join()

departmentsKvRdd.join(categoriesKvGroupRdd).foreach{case(k,(d,c)) => println("Department : " + d._2 + " has " + c + " categories")} println("Department : " + d._2 + " has " + z + " categories")}

Department : Apparel has 6 categories Department : Golf has 7 categories Department : Outdoors has 12 categories Department : Fitness has 8 categories Department : Footwear has 8 categories Department : Fan Shop has 7 categories

//By using .leftOuterJoin() departmentsKvRdd.leftOuterJoin(categoriesKvGroupRdd).foreach{ case(k,(d,somecgy))=> val z= somecgy.fold(0)(_.get); println(d._2 + " has " + z + " categories") }

Apparel has 6 categories Golf has 7 categories Outdoors has 12 categories Fitness has 8 categories Footwear has 8 categories Fan Shop has 7 categories

====================================================================================================================================================== //Get the distinct category name from categories file

categoriesRdd.map{.split(",")(2)}.distinct.count() categoriesRdd.map{.split(",")(2)}.distinct.collect() categoriesRdd.map{_.split(",")(2)}.distinct.foreach(println)

//Get the Product counts from categories

categoriesKvRdd val productsKvRdd = productsRdd.map{x=> (x.split(",")(0).toInt,x)} val productsKvCgyIdRdd = productsKvRdd.map{case(x,y) => val z = y.split(","); (z(1).toInt, z(0).toInt)}

val joinedRdd = categoriesKvRdd.join(productsKvCgyIdRdd)

joinedRdd.aggregateByKey(0)((x,y) => x+1 , (x,y) => (x+y)).sortByKey(true).collect()

//other ways (Not recommended)

joinedRdd.map{case(k,(c,p)) => (k,1)}.aggregateByKey(0)((a,b)=> (a+b), (x,y) => (x+y)).sortByKey(true).foreach(println)

joinedRdd.map{case(k,(c,p)) => (k,1)}.reduceByKey(+).sortByKey(true).collect()

// Get the orders total per customer_id val order_itemsTotalRdd = order_itemsRdd.map{x=> val z= x.split(","); (z(1).toInt, z(4).toFloat)}.reduceByKey(+)

val ordersPerCustIdRdd = ordersRdd.map{x=> val z=x.split(","); (z(0).toInt, z(2).toInt)}

val joinedRdd = order_itemsTotalRdd.join(ordersPerCustIdRdd) val totalPerCustIdRdd = joinedRdd.map{case(oid,(total,custid))=> (custid,(total,oid))}

val resultRdd = totalPerCustIdRdd.aggregateByKey((0.0,0))((a,v)=> (a._1 + v._1, a._2+1), (a,v) => (a._1 + v._1, a._2 + v._2))

scala> resultRdd.filter(_._1==9762).collect() res60: Array[(Int, (Double, Int))] = Array((9762,(2149.5999908447266,3)))

cross verified in MYsql for the customer_id 9762

mysql> select * from orders where order_customer_id = 9762; +----------+---------------------+-------------------+--------------+ | order_id | order_date | order_customer_id | order_status | +----------+---------------------+-------------------+--------------+ | 18624 | 2013-11-17 00:00:00 | 9762 | CLOSED | | 19432 | 2013-11-22 00:00:00 | 9762 | PROCESSING | | 19520 | 2013-11-23 00:00:00 | 9762 | PENDING | +----------+---------------------+-------------------+--------------+ 3 rows in set (0.38 sec)

mysql> select sum(order_item_subtotal) from order_items where order_item_order_id in ( 18624, 19432, 19520); +--------------------------+ | sum(order_item_subtotal) | +--------------------------+ | 2149.60001373291 | +--------------------------+ 1 row in set (0.42 sec)

//Get the No.of orderders Placed by each customer_id val resultRdd = totalPerCustIdRdd.aggregateByKey(0)((a,v)=> a+1, (a,v)=> a+v) resultRdd.filter(_._1==9762).collect()

res67: Array[(Int, Int)] = Array((9762,3))

val ordersFile = "/user/cloudera/cca175/retail_db/orders" val ordersRdd = sc.textFile()

//Top 5 (Ascending)

ordersRdd.takeOrdered(5)(Ordering[Int].on(x=> (x.split(",")(0).toInt)))

res31: Array[String] = Array( 1,2013-07-25 00:00:00,11599,CLOSED, 2,2013-07-25 00:00:00,256,PENDING_PAYMENT, 3,2013-07-25 00:00:00,12111,COMPLETE, 4,2013-07-25 00:00:00,8827,CLOSED, 5,2013-07-25 00:00:00,11318,COMPLETE)

scala> ordersRdd.takeOrdered(5)(Ordering[Int].reverse.on(x=> x.split(",")(0).toInt)) res32: Array[String] = Array( 68883,2014-07-23 00:00:00,5533,COMPLETE, 68882,2014-07-22 00:00:00,10000,ON_HOLD, 68881,2014-07-19 00:00:00,2518,PENDING_PAYMENT, 68880,2014-07-13 00:00:00,1117,COMPLETE, 68879,2014-07-09 00:00:00,778,COMPLETE)

val productsFile = "/user/cloudera/cca175/retail_db/products" val productsRdd = sc.textFile(productsFile)

val productsKvRdd = productsRdd.map{x=> (x.split(",")(1).toInt, x)}.filter{case(x,y)=> (y.split(",")(4) != "")}

val productsGroupRdd = productsKvRdd.groupByKey()

val products52GroupRdd = productsGroupRdd.filter(_._1 == 52)

products52GroupRdd.flatMap{rec => (rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)).take(2)} products52GroupRdd.flatMap{rec => (rec._2.toList.sortBy(k=> -k.split(",")(4).toFloat)).take(2)}

productsGroupRdd.flatMap{rec=> (rec._2.toList.sortBy(k=> -k.split(",")(4).tofloat)).take(3)}.saveAsTextFile("/user/cloudera/cca175/review/spark/data/output/top_3_price_products")

val ordersFile = "/user/cloudera/cca175/retail_db/orders" val ordersRdd = sc.textFile(ordersFile)

val order_itemsFile = "/user/cloudera/cca175/retail_db/order_items" val order_itemsRdd = sc.textFile(orders_itemsFile)

==================================

productsFile = "/user/cloudera/cca175/retail_db/products" productsRdd = sc.textFile(productsFile) proudctsKvRdd= productsRdd.map(lambda x: (int(x.split(",")[1]),x)) productsKvRdd = proudctsKvRdd.filter(lambda (x,y): y.split(",")[4] != "") ================================================================================. spark-submit --master local[2]
--class read_a_hdfs_file
/home/cloudera/spark-scala/handson/target/scala-2.10/spark-scala-hands-on_2.10-1.0.jar

spark-submit --master local[2]
--class list_wordCount
/home/cloudera/spark-scala/handson/target/scala-2.10/spark-scala-hands-on_2.10-1.0.jar

spark-submit --master local[2]
--class groupOfTwo
/home/cloudera/spark-scala/handson/target/scala-2.10/spark-scala-hands-on_2.10-1.0.jar

aRdd = Hello my Friend. How are

aRdd.map(_.split(" ")) => Array(Array(Hello, my, Friend, How, are), Array(you, today, my, Friend))

aRdd.map(_.split(" ")).map(x=> x.sliding(2).toArray)

aRdd.map(_.split(" ")).map(x=> x.sliding(2).toArray).flatMap(x=> x) Array(Array(Hello, my), Array(my, Friend), Array(Friend, How), Array(How, are), Array(you, today), Array(today, my), Array(my, Friend))

aRdd.map(.split(" ")).map(x=> x.sliding(2).toArray).flatMap(.split(",")) ================================================================================.

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. master("local"). appName("Spark Structured Streaming Demo"). getOrCreate

spark.sparkContext.setLogLevel("ERROR")

val orders = spark. readStream. schema("order_id INT, order_date STRING, order_customer_id INT, order_status STRING"). csv("/mnt/c/data/retail_db/orders")

val query = orders. writeStream. queryName("orders"). format("memory"). start

spark.sql("select * from orders").show

============================================================================== import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._

val spark = SparkSession. builder. master("local"). appName("Get Department Traffic"). getOrCreate

import spark.implicits._ spark.sparkContext.setLogLevel("ERROR")

val lines = spark.readStream. format("socket"). option("host", "localhost"). option("port", "9999"). load

val departmentTraffic = lines. where(split(split($"value", " ")(6), "/")(1) === "department"). select(split(split($"value", " ")(6), "/")(2).alias("department_name")). groupBy($"department_name"). agg(count($"department_name").alias("department_count"))

val query = departmentTraffic. writeStream. queryName("department_count"). outputMode("complete"). format("memory"). start

spark.sql("select * from department_count").show

============================================================================= import com.typesafe.config._

val conf = ConfigFactory.load.getConfig("dev")

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. appName("Get Streaming Department Traffic"). master(conf.getString("execution.mode")). getOrCreate

spark.sparkContext.setLogLevel("ERROR") spark.conf.set("spark.sql.shuffle.partitions", "2")

import spark.implicits._

val lines = spark. readStream. format("kafka"). option("kafka.bootstrap.servers", conf.getString("bootstrap.servers")). option("subscribe", "retail"). load. select($"value".cast("string").alias("value"))

import org.apache.spark.sql.functions.{split, to_timestamp, ltrim}

val departmentLines = lines. where(split(split($"value", " ")(6), "/")(1) === "department"). withColumn("department_name", split(split($"value", " ")(6), "/")(2)).
withColumn("visit_time", to_timestamp(ltrim(split($"value", " ")(3), "["), "dd/MMM/yyyy:HH:mm")). drop($"value")

import org.apache.spark.sql.functions.count

val departmentTraffic = departmentLines. groupBy("visit_time", "department_name"). agg(count("department_name").alias("department_count"))

import org.apache.spark.sql.streaming.Trigger

val query = departmentTraffic. writeStream. format("console"). option("truncate", "false"). outputMode("update"). trigger(Trigger.ProcessingTime("20 seconds")). start

query.awaitTermination

import com.typesafe.config._

val conf = ConfigFactory.load.getConfig("dev")

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. appName("Get Streaming Department Traffic"). master(conf.getString("execution.mode")). getOrCreate

spark.sparkContext.setLogLevel("ERROR") spark.conf.set("spark.sql.shuffle.partitions", "2")

import spark.implicits._

val lines = spark. readStream. format("kafka"). option("kafka.bootstrap.servers", conf.getString("bootstrap.servers")). option("subscribe", "retail"). load. select($"value".cast("string").alias("value"))

import org.apache.spark.sql.functions.{split, to_timestamp, ltrim}

val departmentLines = lines. where(split(split($"value", " ")(6), "/")(1) === "department"). withColumn("department_name", split(split($"value", " ")(6), "/")(2)).
withColumn("visit_time", to_timestamp(ltrim(split($"value", " ")(3), "["), "dd/MMM/yyyy:HH:mm")). drop($"value")

import org.apache.spark.sql.functions.count

val departmentTraffic = departmentLines. groupBy("visit_time", "department_name"). agg(count("department_name").alias("department_count"))

============================================================================= import com.typesafe.config._

val conf = ConfigFactory.load.getConfig("dev")

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. appName("Get Streaming Department Traffic"). master(conf.getString("execution.mode")). getOrCreate

spark.sparkContext.setLogLevel("ERROR") spark.conf.set("spark.sql.shuffle.partitions", "2")

import spark.implicits._

val lines = spark.read.text("/opt/gen_logs/logs/access.log")

import org.apache.spark.sql.functions.{split, to_timestamp, ltrim}

val departmentLines = lines. where(split(split($"value", " ")(6), "/")(1) === "department"). withColumn("department_name", split(split($"value", " ")(6), "/")(2)).
withColumn("visit_time", to_timestamp(ltrim(split($"value", " ")(3), "["), "dd/MMM/yyyy:HH:mm")). drop($"value")

import org.apache.spark.sql.functions.count

val departmentTraffic = departmentLines. groupBy("visit_time", "department_name"). agg(count("department_name").alias("department_count"))

departmentTraffic.show

import com.typesafe.config._

val conf = ConfigFactory.load.getConfig("dev")

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. appName("Get Streaming Department Traffic"). master(conf.getString("execution.mode")). getOrCreate

spark.sparkContext.setLogLevel("ERROR") spark.conf.set("spark.sql.shuffle.partitions", "2")

import spark.implicits._

val lines = spark. readStream. format("kafka"). option("kafka.bootstrap.servers", conf.getString("bootstrap.servers")). option("subscribe", "retail"). load(). selectExpr("CAST(value AS STRING)"). as[(String)]

import org.apache.spark.sql.streaming.Trigger

val query = lines. writeStream. format("console"). outputMode("append"). trigger(Trigger.ProcessingTime("10 seconds")). start

import com.typesafe.config._

val conf = ConfigFactory.load.getConfig("dev")

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. appName("Get Streaming Department Traffic"). master(conf.getString("execution.mode")). getOrCreate

spark.sparkContext.setLogLevel("ERROR") spark.conf.set("spark.sql.shuffle.partitions", "2")

import spark.implicits._

import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. master("local"). appName("Get Department Traffic"). getOrCreate

spark.sparkContext.setLogLevel("ERROR") import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "2")

val lines = spark.readStream. format("socket"). option("host", "localhost"). option("port", "9999"). load

import org.apache.spark.sql.functions._

val departmentLines = lines. where(split(split($"value", " ")(6), "/")(1) === "department"). withColumn("department_name", split(split($"value", " ")(6), "/")(2)). withColumn("visit_time", to_timestamp(ltrim(split($"value", " ")(3), "["), "dd/MMM/yyyy:HH:mm:ss")). drop($"value")

val departmentTraffic = departmentLines. groupBy(window($"visit_time", "60 seconds", "20 seconds"), $"department_name"). agg(count("department_name").alias("department_count"))

import org.apache.spark.sql.streaming.Trigger spark.conf.set("spark.sql.shuffle.partitions", "2")

val query = departmentTraffic. writeStream. queryName("department_traffic"). format("memory"). outputMode("update"). trigger(Trigger.ProcessingTime("20 seconds")). start

spark.sql("SELECT * FROM department_traffic").show(false)

==================================================================== import org.apache.spark.sql.SparkSession

val spark = SparkSession. builder. master("local"). appName("Get Department Traffic"). getOrCreate

spark.sparkContext.setLogLevel("ERROR") import spark.implicits._

spark.conf.set("spark.sql.shuffle.partitions", "2")

val lines = spark.readStream. format("socket"). option("host", "localhost"). option("port", "9999"). load

lines.createTempView("lines")

val departmentTraffic = spark.sql(s""" SELECT to_timestamp(split(value, ' ')[3], '[dd/MMM/yyyy:HH:mm') visit_time, split(split(value, ' ')[6], '/')[2] department_name, count(1) department_count FROM lines WHERE split(split(value, ' ')[6], '/')[1] = 'department' GROUP BY to_timestamp(split(value, ' ')[3], '[dd/MMM/yyyy:HH:mm'), department_name""")

import org.apache.spark.sql.streaming.Trigger

val query = departmentTraffic. writeStream. queryName("department_traffic"). format("memory"). outputMode("update"). trigger(Trigger.ProcessingTime("20 seconds")). start

spark.sql("SELECT * FROM department_traffic").show(false)

================================================================================

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

================================================================================.

⚠️ **GitHub.com Fallback** ⚠️