ICP 12 : Graph Frames and GraphX - acikgozmehmet/BigDataProgramming GitHub Wiki

ICP 12: Graph Frames and GraphX

Objectives

  • Graph frames
  • GraphX vs Graph frames
  • Pyspark and Scala environment setup
  • Basic Commands on for creation of data frames
  • Basic commands of graph frame algorithms
  • Loading and saving data to file

Overview

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

A directed graph is a graph where the edges have a direction associated with them. An example of a directed graph is a Twitter follower. The vertices are the objects and the edges are the relationships between them.

A regular graph is a graph where each vertex has the same number of edges. An example of a regular graph is Facebook friends. If Bob is a friend of Carol, then Carol is also a friend of Bob.

Installation requirements

Required dependency Graphx, GraphFrame

Limitations:

The biggest limitation of the Spark GraphX library is that its API is not fully supported with popular programming languages such as Python and R.

In Class Programming :

Part – 1

Please click on the link to reach to the source code

Let's look into the code in details ...

Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().appName("GraphFrameApplication").master("local[*]").getOrCreate()

1. Import the dataset as a csv file and create data frames directly on import than create graph out of the data frame created.

val station_df = spark.read.format("csv").option("header", true).load("data/201508_station_data.csv")
station_df.show(10)

val trip_df = spark.read.format("csv").option("header", true).load("data/201508_trip_data.csv")
trip_df.show(10)

2. Concatenate chunks into list & convert to Data Frame

station_df.select(concat(col("lat"), lit(","), col("long")).as("lat_long")).show(10, false);

3. Remove duplicates

val stationDF = station_df.dropDuplicates()
val tripDF = trip_df.dropDuplicates()

4. Name Columns

val renamed_tripDF = tripDF.withColumnRenamed("Trip ID", "tripId")
  .withColumnRenamed("Start Date", "StartDate")
  .withColumnRenamed("Start Station", "StartStation")
  .withColumnRenamed("Start Terminal", "src")
  .withColumnRenamed("End Date", "EndDate")
  .withColumnRenamed("End Station", "EndStation")
  .withColumnRenamed("End Terminal", "dst")
  .withColumnRenamed("Bike #", "BikeNum")
  .withColumnRenamed("Subscriber Type", "SubscriberType")
  .withColumnRenamed("Zip Code", "ZipCode")

5. Output Data Frame

stationDF.show(10, false)
renamed_tripDF.show(10, false)

6. Create vertices

val vertices = stationDF.select(col("station_id").as("id"),
  col("name"),
  concat(col("lat"), lit(","), col("long")).as("lat_long"),
  col("dockcount"),
  col("landmark"),
  col("installation"))


val edges = renamed_tripDF.select("src", "dst", "tripId", "StartDate", "StartStation", "EndDate", "EndStation", "BikeNum", "SubscriberType", "ZipCode")
edges.show(10, false)

val g = GraphFrame(vertices, edges)

7. Show some vertices

g.vertices.select("*").orderBy("landmark").show()

8. Show some edges

g.edges.groupBy("src", "StartStation", "dst", "EndStation").count().orderBy(desc("count")).show(10)

9. Vertex in-Degree

val in_Degree = g.inDegrees
in_Degree.orderBy(desc("inDegree")).show(8, false)

10. Vertex out-Degree

val out_Degree = g.outDegrees
out_Degree.show(10)
vertices.join(out_Degree, Seq("id")).show(10)

11. Apply the motif findings.

val motifs = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[ca]->(a)").show(10, false)

Bonus

1.Vertex degree

      g.degrees.show(10)

2. what are the most common destinations in the dataset from location to location.

      g.edges.groupBy("src", "dst").count().orderBy(desc("count")).show(10)

3. what is the station with the highest ratio of in degrees but fewest out degrees. As in, what station acts as almost a pure trip sink. A station where trips end at but rarely start from.

      val df1 = in_Degree.orderBy(desc("inDegree"))
      val df2 = out_Degree.orderBy("outDegree")
      val df = df1.join(df2, Seq("id"))
                       .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
      df.orderBy(desc("degreeRatio")).limit(10).show(5, false)

4.Save graphs generated to a file.

      g.vertices.write.mode("overwrite").parquet("output_vertices")
      g.edges.write.mode("overwrite").parquet("output_edges")


spark.stop()

References:

https://spark.apache.org/docs/latest/graphx-programming-guide.html https://www.edureka.co/blog/spark-graphx/ https://mapr.com/blog/how-get-started-using-apache-spark-graphx-scala/ https://hub.packtpub.com/working-with-sparks-graph-processing-library-graphframes/ https://pysparktutorial.blogspot.com/2017/10/graphframes-pyspark.html