GraphFrame PySpark - cchantra/bigdata.github.io GitHub Wiki

Use python with graph. We need graphFrame.

(https://spark-packages.org/package/graphframes/graphframes)

Select the right version of your spark.

(e.g. my spark version is 3.1.1)

wget  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.1-s_2.12/graphframes-0.8.2-spark3.1-s_2.12.jar

unpack it. Assume at /home/hadoop

jar xf  graphframes-0.8.2-spark3.1-s_2.12.jar

You get the folder graphframes under /home/hadoop

mv graphframes-0.8.2-spark3.1-s_2.12.jar /home/hadoop/spark/jars

Next,

use jars.tar.gz given below.

wget https://github.com/cchantra/bigdata.github.io/raw/refs/heads/master/spark/jars.tar.gz

untar it and copy to /home/hadoop/.ivy2/jars

tar xvf jars.tar.gz
mkdir -p ~/.ivy2/jars
mv jars ~/.ivy2/jars

(* ./ivy2/jars may be created ** if not exists.)

(it is the default cache. ".ivy2/jars" to store downloaded dependency)

Then

add pythonpath

in /home/hadoop/spark/conf/spark-env.sh add the line.

(if /home/hadoop/spark/conf/spark-env.sh not exist, create one by

cp /home/hadoop/spark/conf/spark-env.sh.template  /home/hadoop/spark/conf/spark-env.sh

obtain the library

wget https://github.com/cchantra/bigdata.github.io/raw/refs/heads/master/spark/graphframes-0.8.2-spark3.1-s_2.12.jar

move to spark folder

mv graphframes-0.8.2-spark3.1-s_2.12.jar ~/spark/jars

add the line vi /home/hadoop/spark/conf/spark-env.sh


export PYTHONPATH=$PYTHONPATH:/home/hadoop/.ivy2/jars:/home/hadoop/spark/jars:.
export PATH=$PATH:/home/hadoop/.local/bin
export SPARK_HOME=/home/hadoop/spark
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip:$PYTHONPATH

Invoke pyspark at the current directory where your graphframes folder is below.

pyspark

OR *** This does not work any more since it's repo is down ***

you can put option port if you have problem with port conflict.


pyspark --jars   /home/hadoop/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar --conf spark.ui.port=4099

OR

pyspark --packages graphframes:graphframes:0.8.2-spark3.1-s_2.12   --conf spark.ui.port=4099

Following the instructions in https://graphframes.github.io/quick-start.html

1.We need to prepare two Data Frames, one for edges and one for vertices (nodes). The DataFrames can be constructed from a set of manually-type given data points (which is ideal for testing and small set of data), or from a given Hive query or simply constructing DataFrame from a CSV (text file) using the approaches explained in the first post

(CSV -> RDD -> DataFrame).

Try Example

(https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5)

To import

>>> from graphframes import *

create nodes dataframe

>>>  

#create edges dataframe

>>> #construct graphframe

>>v = spark.createDataFrame([('1', 'Carter', 'Derrick', 50),
('2', 'May', 'Derrick', 26),
('3', 'Mills', 'Jeff', 80),
('4', 'Hood', 'Robert', 65),
('5', 'Banks', 'Mike', 93),
('98', 'Berg', 'Tim', 28),
('99', 'Page', 'Allan', 16)],
['id', 'name', 'firstname', 'age'])

>> e = spark.createDataFrame([('1', '2', 'friend'),
('2', '1', 'friend'),
('3', '1', 'friend'),
('1', '3', 'friend'),
('2', '3', 'follows'),
('3', '4', 'friend'),
('4', '3', 'friend'),
('5', '3', 'friend'),
('3', '5', 'friend'),
('4', '5', 'follows'),
('98', '99', 'friend'),
('99', '98', 'friend')],
['src', 'dst', 'type'])

>>> g = GraphFrame(v, e)

>>>

The basic graph functions that can be used in PySpark are the following:

vertices

edges

inDegrees

outDegrees

degrees

#show indegree

>>> g.inDegrees.show()


+---+--------+

+---+--------+
| id|inDegree|
+---+--------+
|  1|       2|
|  2|       1|
|  3|       4|
|  4|       1|
|  5|       2|
| 98|       1|
| 99|       1|
+---+--------+


#count edges with "follow" label

>>> g.edges.filter("type = 'follows'").count()

2

>>> g.vertices.show()

+---+------+---------+---+
| id|  name|firstname|age|
+---+------+---------+---+
|  1|Carter|  Derrick| 50|
|  2|   May|  Derrick| 26|
|  3| Mills|     Jeff| 80|
|  4|  Hood|   Robert| 65|
|  5| Banks|     Mike| 93|
| 98|  Berg|      Tim| 28|
| 99|  Page|    Allan| 16|
+---+------+---------+---+

>>> g.edges.show()

 +---+---+-------+
|src|dst|   type|
+---+---+-------+
|  1|  2| friend|
|  2|  1| friend|
|  3|  1| friend|
|  1|  3| friend|
|  2|  3|follows|
|  3|  4| friend|
|  4|  3| friend|
|  5|  3| friend|
|  3|  5| friend|
|  4|  5|follows|
| 98| 99| friend|
| 99| 98| friend|
+---+---+-------+

#sort graph

>>> inDegreeDF=g.inDegrees
>>> outDegreeDF=g.outDegrees
>>> degreeDF=g.degrees

>>>

>>> inDegreeDF.sort(['inDegree'],ascending=[0]).show() # Sort and show

---+--------+

| id|inDegree|

+---+--------+

|  3|       4|

|  5|       2|

|  1|       2|

|  4|       1|

| 99|       1|

| 98|       1|

|  2|       1|

+---+--------+

>>> outDegreeDF.sort(['outDegree'],ascending=[0]).show()

+---+---------+
| id|outDegree|
+---+---------+
|  3|        3|
|  1|        2|
|  2|        2|
|  4|        2|
|  5|        1|
| 98|        1|
| 99|        1|
+---+---------+

The graph algorithms which so far have been introduced to Spark and are super handy are the followings :

Motif finding ()

Subgraphs

Breadth-first search (BFS)

Connected components

Strongly connected components

Label Propagation

PageRank

Shortest paths

Triangle count

Example: Page rank

#find page rank

>>> results = g.pageRank(resetProbability=0.01, maxIter=20)
>>> results.vertices.select("id", "pagerank").show()

+---+-------------------+
| id|           pagerank|
+---+-------------------+
|  3| 1.9926565142372492|
| 98| 1.0000000000000004|
| 99| 1.0000000000000004|
|  5| 0.9980278491870183|
|  1|  0.890802837490737|
|  4|   0.66757817627641|
|  2|0.45093462280858443|
+---+-------------------+

# Run PageRank personalized for vertex "1"

>>> results3 = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="1")
>>> results3.vertices.select("id", "pagerank").show()

 +---+-------------------+

| id|           pagerank|

+---+-------------------+

|  1|0.30079283470815427|

|  3|0.33891806394017526|

|  2| 0.1272219398157718|

|  4|0.09616868544059574|

| 98|                0.0|

|  5|0.13689847609530284|

| 99|                0.0|

+---+-------------------+

# Run pagerank until convergence

>>>> results = g.pageRank(resetProbability=0.15, tol=0.01)

>>> results.vertices.select("id", "pagerank").show()

+---+------------------+
| id|          pagerank|
+---+------------------+
|  1|0.9055074972891308|
|  3| 1.853919642738813|
|  2|0.5377967999474921|
|  4|0.6873519241384106|
| 98|1.0225331112091938|
|  5|0.9703579134677663|
| 99|1.0225331112091938|
+---+------------------+

Example: Motif finding:

http://graphframes.github.io/user-guide.html#motif-finding

Motif finding refers to searching for structural patterns in a graph.

GraphFrame motif finding uses a simple Domain-Specific Language (DSL) for expressing structural queries. For example, graph.find("(a)-[e]->(b); (b)-[e2]->(a)") will search for pairs of vertices a,b connected by edges in both directions. It will return a DataFrame of all such structures in the graph, with columns for each of the named elements (vertices or edges) in the motif.

## motif finding

>>> from graphframes.examples import Graphs
>>> g = Graphs(sqlContext).friends()  # Get example graph
>>> g.edges.show()

 
+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
+---+---+------------+

>>> g.vertices.show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
+---+-------+---+

 # Search for pairs of vertices with edges in both directions between them.

>>> motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")

>>> motifs.show()

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|    {b, Bob, 36}|{b, c, follow}|
|    {b, Bob, 36}|{b, c, follow}|{c, Charlie, 30}|{c, b, follow}|
+----------------+--------------+----------------+--------------+

 # More complex queries can be expressed by applying filters.

>>> motifs.filter("b.age > 30").show()

+----------------+--------------+------------+--------------+
|               a|             e|           b|            e2|
+----------------+--------------+------------+--------------+
|{c, Charlie, 30}|{c, b, follow}|{b, Bob, 36}|{b, c, follow}|
+----------------+--------------+------------+--------------+

we can try to find the mutual friends for any pair of users a and c. In order to be a mutual friend b, b #must be a friend with both a and c (and not just followed by c, for example).

>>> mutualFriends = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(b); (b)-[]->(a)").dropDuplicates()
>>> mutualFriends.filter('a.id == 2 and c.id == 3').show()

+--------------------+--------------------+--------------------+

|                   a|                   b|                   c|

+--------------------+--------------------+--------------------+

|[2, May, Derrick,...|[1, Carter, Derri...|[3, Mills, Jeff, 80]|

+--------------------+--------------------+--------------------+

-Many motif queries are stateless and simple to express, as in the examples above.

The next examples demonstrate more complex queries which carry state along a path in the motif.

combining GraphFrame motif finding with filters on the result, where the filters use sequence operations to construct a series of DataFrame Columns.

For example, suppose one wishes to identify a chain of 4 vertices with some property defined by a sequence of functions. That is, among chains of 4 vertices a->b->c->d, identify the subset of chains matching this complex filter:

1.Initialize state on path.

2.Update state based on vertex a.

  1. Update state based on vertex b.

4.Etc. for c and d.

If final state matches some condition, then the chain is accepted by the filter.

>>> from pyspark.sql.functions import col, lit, when

>>> from pyspark.sql.types import IntegerType

>>> chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

Query on sequence, with state (cnt)

(a) Define method for updating state given the next element of the motif.

sumFriends =\

lambda cnt,relationship: when(relationship == "friend", cnt+1).otherwise(cnt)

(b) Use sequence operation to apply method to sequence of elements in motif.

In this case, the elements are the 3 edges.

condition =\

  reduce(lambda cnt,e: sumFriends(cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))

(c) Apply filter to DataFrame.

>>> chainWith2Friends2 = chain4.where(condition >= 2)

>>> chainWith2Friends2.show()

+---------------+--------------+--------------+--------------+--------------+--------------+----------------+

|              a|            ab|             b|            bc|             c|            cd|               d|

+---------------+--------------+--------------+--------------+--------------+--------------+----------------+

| [d, David, 29]|[d, a, friend]|[a, Alice, 34]|[a, b, friend]|  [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|

|[e, Esther, 32]|[e, d, friend]|[d, David, 29]|[d, a, friend]|[a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|

+---------------+--------------+--------------+--------------+--------------+--------------+----------------+

Example: subgraph()

an edge triplet (edge, src vertex, and dst vertex, plus attributes) and allows the user to select a subgraph based on triplet and vertex filters.

There are three helper methods for subgraph selection. filterVertices(condition), filterEdges(condition), and dropIsolatedVertices().

-Select subgraph of users older than 30, and relationships of type "friend".

-Drop isolated vertices (users) which are not contained in any edges (relationships).

>>> g1 = g.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

>>> g1.edges.show()

+---+---+------------+

|src|dst|relationship|

+---+---+------------+

|  a|  b|      friend|

+---+---+------------+

Select subgraph based on edges "e" of type "follow"

pointing from a younger user "a" to an older user "b".


paths = g.find("(a)-[e]->(b)")\

  .filter("e.relationship = 'follow'")\

  .filter("a.age < b.age")

"paths" contains vertex info. Extract the edges.

e2 = paths.select("e.src", "e.dst", "e.relationship")

In Spark 1.5+, the user may simplify this call:

  val e2 = paths.select("e.*")

Construct the subgraph

g2 = GraphFrame(g.vertices, e2)

g2.edges.show()

+---+---+------------+

|src|dst|relationship|

+---+---+------------+

|  e|  f|      follow|

|  c|  b|      follow|

+---+---+------------+

Example: Breadth first search

The beginning and end vertices are specified as Spark DataFrame expressions.

Search from "Esther" for users of age < 32.

>>> paths = g.bfs("name = 'Esther'", "age < 32")

>>> paths.show()

+---------------+--------------+--------------+

|           from|            e0|            to|

+---------------+--------------+--------------+

|[e, Esther, 32]|[e, d, friend]|[d, David, 29]|

+---------------+--------------+--------------+

Specify edge filters or max path lengths.

>>> path2 = g.bfs("name = 'Esther'", "age < 32",edgeFilter="relationship != 'friend'", maxPathLength=3)

>>> path2.show()

+---------------+--------------+--------------+--------------+----------------+

|           from|            e0|            v1|            e1|              to|

+---------------+--------------+--------------+--------------+----------------+

|[e, Esther, 32]|[e, f, follow]|[f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|

+---------------+--------------+--------------+--------------+----------------+

Example: (Strongly) connected components

NOTE: With GraphFrames 0.3.0 and later releases, the default Connected Components algorithm requires setting a Spark checkpoint directory. Users can revert to the old algorithm using connectedComponents.setAlgorithm("graphx").

>>> from pyspark import SparkContext

>>> from pyspark.streaming import StreamingContext

>>> ssc = StreamingContext(sc, 1)

>>> ssc.checkpoint("/tmp/spark")

>>> result = g.connectedComponents()

>>> result.select("id", "component").orderBy("component").show()

+---+------------+

| id|   component|

+---+------------+

|  b|412316860416|

|  c|412316860416|

|  d|412316860416|

|  a|412316860416|

|  e|412316860416|

|  f|412316860416|

+---+------------+

>>> result.select("id", "component").orderBy("component").show()

+---+-------------+

| id|    component|

+---+-------------+

|  f| 412316860416|

|  e| 670014898176|

|  d| 807453851648|

|  b|1047972020224|

|  c|1047972020224|

|  a|1460288880640|

+---+-------------+

Example: LPA

static Label Propagation Algorithm for detecting communities in networks.

Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.

LPA is a standard community detection algorithm for graphs. It is very inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes are identified into a single community).

>>> result = g.labelPropagation(maxIter=5)

>>> result.select("id", "label").show()

+---+-------------+

| id|        label|

+---+-------------+
|  b|1047972020224|
|  e| 412316860416|
|  a|1382979469312|
|  f| 670014898176|
|  d| 670014898176|
|  c|1382979469312|
+---+-------------+

Example: Shortest path

Computes shortest paths from each vertex to the given set of landmark vertices, where landmarks are specified by vertex ID.

>>> results = g.shortestPaths(landmarks=["a", "d"])

>>> results.select("id", "distances").show()

+---+----------------+
| id|       distances|
+---+----------------+
|  b|              []|
|  e|[d -> 1, a -> 2]|
|  a|        [a -> 0]|
|  f|              []|
|  d|[d -> 0, a -> 1]|
|  c|              []|
+---+----------------+

Example: saving/loading graphframe

support saving and loading to and from the same set of datasources.

>>> g.edges.write.parquet("hdfs:///user/edges")

>>> g.vertices.write.parquet("hdfs:///user/vertices")

>>> sameV = sqlContext.read.parquet("hdfs:///user/vertices")

>>> sameE = sqlContext.read.parquet("hdfs:///user/edges")

>>> sameG = GraphFrame(sameV, sameE)

Example: communications

GraphFrames provides primitives for developing graph algorithms. The two key components are:

1.aggregateMessages: Send messages between vertices, and aggregate messages for each vertex. GraphFrames provides a native aggregateMessages method implemented using DataFrame operations. This may be used analogously to the GraphX API.

  1. joins: Join message aggregates with the original graph. GraphFrames rely on DataFrame joins, which provide the full functionality of GraphX joins.
from pyspark.sql.functions import sum as sqlsum

from graphframes.lib import AggregateMessages as AM# For each user, sum the ages of the adjacent users.

msgToSrc = AM.dst["age"]

msgToDst = AM.src["age"]

>>> agg.show()

>>> agg = g.aggregateMessages(

    sqlsum(AM.msg).alias("summedAges"),

    sendToSrc=msgToSrc,

    sendToDst=msgToDst)

+---+----------+

| id|summedAges|

+---+----------+
|  f|        62|
|  e|        65|
|  d|        66|
|  c|       108|
|  b|        94|
|  a|        65|
+---+----------+

for invoking jupyter with pyspark

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser" pyspark --py-files  graphframes-0.8.2-spark3.1-s_2.12.jar --jars graphframes-0.8.2-spark3.1-s_2.12.jar 

Or if using jupyter-lab

PYSPARK_DRIVER_PYTHON=jupyter-lab PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser" pyspark --py-files  graphframes-0.8.2-spark3.1-s_2.12.jar --jars graphframes-0.8.2-spark3.1-s_2.12.jar

wget https://raw.githubusercontent.com/cchantra/bigdata.github.io/refs/heads/master/spark/graphframe.ipynb

You can use tunnel jupyter port and then open jupyter notebook in your web browser.

References

http://pysparktutorial.blogspot.com/2017/10/graphframes-pyspark.html

https://spark-packages.org/package/graphframes/graphframes