Spark - lshhhhh/note GitHub Wiki

Spark-shell

  • with cluster
    • spark2-shell --master yarn

GraphX

  • VD: vertex data type
  • ED: edge data type
  • graph.pregel(list1)(list2)
    • list1: [the initial message, the maximum number of iterations, the edge direction in which to send messages]
    • list2: [receiving messages (the vertex program, vprog), computing messages (sendMsg), combining messages (mergeMsg)]
      • vprog: message를 수신하고 새 vertex 값을 계산하는 vertex program
      • sendMsg: 다른 vertex로 message를 보내는데 사용하는 함수
      • mergeMsg: 두 개의 수신 message를 가져다가 하나의 message로 merge
      (vertex j  ) -> [sendMsg] -> (vertex i [mergeMsg] -> [vprog]) -> vertex k
      (vertex j+1) -> [sendMsg] r>                                  l> vertex k+1
      
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random

val users: RDD[(VertexId, (String, List[Double]))] = sc.parallelize(
Array((1L, ("a1", List(1.1, 1.1))), (2L, ("a2", List(1.2, 1.2))), (3L, ("a3", List(1.3, 1.3))), (4L, ("a4", List(1.4))), 
(5L, ("a5", List(1.5, 1.13))), (6L, ("a6", List(1.6, 1.2))), (7L, ("a7", List(1.6, 5.5))), (8L, ("a8", List(1.6, 4.2))), (9L, ("a9", List(1.6, 2.1))), (10L, ("a10", List(1.6, 32.1))),
(11L, ("b1", List(2.1, 1.2))), (12L, ("b2", List(2.2, 2.2))), (13L, ("b3", List(2.3, 3.3)))))

val relationships: RDD[Edge[String]] =
sc.parallelize(Array(
Edge(1L, 2L, "1"), Edge(1L, 3L, "1"), Edge(1L, 4L, "1"), Edge(3L, 5L, "1"), Edge(3L, 6L, "1"),  
Edge(2L, 1L, "1"), Edge(3L, 1L, "1"), Edge(4L, 1L, "1"), Edge(5L, 3L, "1"), Edge(6L, 3L, "1"), 
Edge(6L, 7L, "1"), Edge(7L, 8L, "1"), Edge(8L, 9L, "1"), Edge(9L, 10L, "1"), 
Edge(7L, 6L, "1"), Edge(8L, 7L, "1"), Edge(9L, 8L, "1"), Edge(10L, 9L, "1"), 
Edge(11L, 12L, "1"), Edge(12L, 13L, "1"),
Edge(12L, 11L, "1"), Edge(13L, 12L, "1")))

// Define a default user in case there are relationship with missing user
val defaultUser = ("z?", List(-0.0))

// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

// Simple usage
graph.vertices.filter { case (id, name) => name != "b1" }.count
graph.edges.filter(e => e.srcId < e.dstId).count

// Pregel
val walkLen = 5
val initialMsg = (0L, List[(String, List[Double])]())
val initialGraph = graph.mapVertices((id, v) => (0L, List(v)))

def vprog(vertexId: VertexId, value: (Long, List[(String, List[Double])]), message: (Long, List[(String, List[Double])])):
(Long, List[(String, List[Double])]) = {
(message._1+1, List(value._2.head) ++ message._2)
}

val sssp = initialGraph.pregel(
initialMsg, walkLen - 1, EdgeDirection.Out)(
vprog, 
triplet => { Iterator((triplet.dstId, triplet.srcAttr)) }, 
(a, b) => Random.shuffle(Array(a, b).toList).head)

println(sssp.vertices.collect.mkString("\n"))