ICP 13 : Graph Frames and GraphX Algorithms - acikgozmehmet/BigDataProgramming GitHub Wiki

ICP 13: Graph Frames and GraphX Algorithms

Objectives

  • Graph frames
  • Algorithms
  • Implementations

Overview

GraphFrames is a package for Apache Spark which provides DataFrame-based Graphs. It provides high-level APIs in Scala, Java, and Python. It aims to provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames. This extended functionality includes motif finding, DataFrame-based serialization, and highly expressive graph queries.

What are GraphFrames?

  • GraphX is to RDDs as GraphFrames are to DataFrames
  • GraphFrames represent graphs: vertices (e.g., users) and edges (e.g., relationships between users).
  • GraphFrames are based upon Spark DataFrames
  • GraphX are based upon RDDs

Installation Requirement (Dependency)

Graphx, GraphFrame

Limitations:

The biggest limitation of the Spark GraphX library is that its API is not fully supported with popular programming languages such as Python and R.

In Class Programming

Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().appName("GraphFrameApplication").master("local[*]").getOrCreate()

1. Import the dataset as a csv file and create data frames directly on import than create graph out of the data frame

val stationDF = spark.read.format("csv").option("header", true).load("data/201508_station_data.csv")
stationDF.show(10, truncate = false)

val tripDF = spark.read.format("csv").option("header", true).load("data/201508_trip_data.csv")
tripDF.show(10, truncate = false)

val vertices = stationDF.select(col("name").as("id"),
                                concat(col("lat"), lit(","), col("long")).as("lat_long"),
                                col("dockcount"),
                                col("landmark"),
                                col("installation")
                               )

println("vertices")
vertices.show(10, truncate = false)

val edges = tripDF.select(col("Start Station").as("src"),
                          col("End Station").as("dst"),
                          col("Trip ID").as("tripId"),
                          col("Duration").as("duration"),
                          col("Bike #").as("bikeNum"),
                          col("Subscriber Type").as("subscriber"),
                          col("Zip Code").as("zipCode")
                          )
println("edges")
edges.show(10, truncate = false)

// Create the graphframe

val g = GraphFrame(vertices, edges)
g.cache()

2. Triangle count

Compute the number of triangles passing through each vertex. The algorithm is relatively straightforward and can be computed in three steps:

  • Compute the set of neighbors for each vertex
  • For each edge compute the intersection of the sets and send the count to both vertices.
  • Compute the sum at each vertex and divide by two since each triangle is counted twice.

val triangleCount = g.triangleCount.run()

println("Triangles")

triangleCount.select(col("id"), col("landmark"),col("count")).show(10, false)

3. Shortest path

In graph theory, the shortest path problem is the problem of finding a path between two vertices (or nodes) in a graph such that the sum of the weights of its constituent edges is minimized.

The problem of finding the shortest path between two intersections on a road map may be modeled as a special case of the shortest path problem in graphs, where the vertices correspond to intersections and the edges correspond to road segments, each weighted by the length of the segment.

val shortPath = g.shortestPaths.landmarks(Seq("2nd at Folsom", "Townsend at 7th")).run()
shortPath.show(false)

4. PageRank

PageRank (PR) is an algorithm used by Google Search to rank web pages in their search engine results. PageRank was named after Larry Page,one of the founders of Google. PageRank is a way of measuring the importance of website pages.

PageRank is a link analysis algorithm and it assigns a numerical weighting to each element of a hyperlinked set of documents, such as the World Wide Web, with the purpose of "measuring" its relative importance within the set. The algorithm may be applied to any collection of entities with reciprocal quotations and references.

// Display resulting pageranks and final edge weights
val stationPageRank = g.pageRank.resetProbability(0.15).tol(0.01).run()
stationPageRank.vertices.select("id", "pagerank").show(false)
stationPageRank.edges.select("src", "dst", "weight").show(false)

5. Save graphs generated to a file.

g.vertices.write.mode("overwrite").parquet("icp13_output_vertices")
g.edges.write.mode("overwrite").parquet("icp13_output_edges")

Bonus

1. Apply Label Propagation Algorithm

Label propagation is a semi-supervised machine learning algorithm that assigns labels to previously unlabeled data points. At the start of the algorithm, a (generally small) subset of the data points have labels (or classifications). These labels are propagated to the unlabeled points throughout the course of the algorithm.

This algorithm is mainly used for Community detection. In our data set, the stations connected with each other forms a community. Stations having the same label do belong to same community.

val lpa = g.labelPropagation.maxIter(5).run()
lpa.select("id", "label").show()

2. Apply BFS algorithm

Breadth-first search (BFS) finds the shortest path(s) from one vertex (or a set of vertices) to another vertex (or a set of vertices). The beginning and end vertices are specified as Spark DataFrame expressions.

BFS is an algorithm for traversing or searching tree or graph data structures. It starts at the tree root (or some arbitrary node of a graph, sometimes referred to as a 'search key'), and explores all of the neighbor nodes at the present depth prior to moving on to the nodes at the next depth level.

In our dataset, BFS is applied starting from the root node (beginning station), visit all the neighbor nodes (stations) first , before moving to the next layer of nodes (stations) until the condition is met. It is basically graph traversal technique.

val pathBFS = g.bfs.fromExpr("id = '2nd at Folsom'").toExpr("dockcount < 15").run()
pathBFS.show(false)

References

https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/graphx/lib/TriangleCount.html https://en.wikipedia.org/wiki/PageRank https://en.wikipedia.org/wiki/Shortest_path_problem https://en.wikipedia.org/wiki/Label_propagation_algorithm https://en.wikipedia.org/wiki/Breadth-first_search https://graphframes.github.io/graphframes/docs/_site/index.html