Resumable Sessions - abronte/PysparkProxy GitHub Wiki

Resumable sessions is a configurable option that allows the same code to be run client side without re-computing things server side. Only the new code that you produce that hasn't run yet will actually get executed in Spark. This allows you to get a notebook style computation environment in a regular python file.

Lets say we have this simple spark application example.py:

sc = SparkContext(appName='demo')
sqlContext = SQLContext(sc)

df = sqlContext.read.json('data.json')
print(df.count())

When this runs each line of code will get executed.

Now lets say we're actively developing it and changing code around to where example.py now looks like this:

sc = SparkContext(appName='demo')
sqlContext = SQLContext(sc)

df = sqlContext.read.json('data.json')
print(df.count())

filtered_df = df.filter(df.foo == 1)

filtered.show()

When this gets run, only the new lines actually get executed on the server:

sc = SparkContext(appName='demo')    # not run
sqlContext = SQLContext(sc)          # not run

df = sqlContext.read.json('data.json') # not run
print(df.count())                      # not run

filtered_df = df.filter(df.foo == 1)   # new code, runs

filtered.show()                        # new code, runs

Enabling

Resumable sessions are disabled by default. To enable just specify the --resumable parameter when starting pyspark proxy.

Ex: pyspark-proxy-server start --resumable

How It Works

Whenever a Pyspark object get created a /create http call is sent to the server with a bunch of parameters on what object should be created and what arguments to pass in.

For example, creating a dataframe like this

data = [(1,2,'a'),(3,4,'b'),(5,6,'c')]

df1 = sqlContext.createDataFrame(data, ['foo', 'bar', 'baz'])

generates this json payload:

{'path': 'createDataFrame', 'args': [[1, 2, 'a'], [3, 4, 'b'], [5, 6, 'c'](/abronte/PysparkProxy/wiki/[1,-2,-'a'],-[3,-4,-'b'],-[5,-6,-'c'), ['foo', 'bar', 'baz']], 'id': '4d1a8392-838b-410e-9fa6-22aaf408176f', 'kwargs': {}}

This a sha1 hash is created from this payload and stored with the resulting response payload that the server sends to the client.

Since calling the same piece of code will generate the same payload, we can send the client the cached result instead of re-running the same piece of code.

Shortcomings

Initializing many of the same object

The main problem with this current approach is there are some uses cases where you need to create multiple distinct objects but have the same creation params.

For example, specifying column types.

df.select(df.age.cast(StringType()).alias('ages')).collect()

In this case only one StringType() object would actually be created.

I haven't really tested this yet with resumable sessions so I'm not sure what the impact may be. There are probably other use cases I'm not accounting for as well.

Overwriting objects

Currently objects that get created live forever on the server. This may or may not cause problems, but its worth pointing out. Especially if there is a line of code you keep editing and running.

In the future to help solve this I think theres two things that need to happen.

  1. Client hooks into the objects destructor to release objects on the server
  2. The server keeps track of what objects have been called and releases dead ones