Sprints: PyCon2012 - ShuaiYAN/ipython GitHub Wiki

Table of Contents

Overview and Scope

  • Where: PyCon 2012, Santa Clara
  • When: March 12-15
  • What: Let's sprint on prototyping memory efficient infrastructure tools suitable for distributed, short-lived CPU-bound iterative numerical analysis on small to medium sized clusters.
  • Who:
    • Olivier Grisel (scikit-learn)
    • Min Ragan-Kelley (IPython)
    • Fernando Perez (IPython)
    • Wes McKinney (Pandas & statsmodels)
    • David Cournapeau (SciPy)
    • Timmy Wilson (scikit-learn)
    • Subhodeep Moitra ([email protected])
    • Clay Woolam
    • Jake Vanderplas (scikit-learn - March 12 only)
Let's focus on small to medium sized clusters (a couple of 10s of nodes) that can be deployed by academic and corporate research teams or transiently provisioned on the cloud with an expected runtime in the order of a few seconds to an hour for the typical job once the input data has been partitioned. The goal would be to experiment with a python / numpy friendly Proof of Concept alternative to Hadoop MapReduce & MPI for iterative numerical analysis with low overhead.

Low overhead means:

  • do not force copy of intermediate data to hard drive if the data can fit in memory and if it would hurt performance of the whole process.
  • do not copy big data chunks from kernel space to user space when not necessary so as to limit GC overhead and working memory exhaustion
  • adding a second node should decrease the runtime of single node (once the machine is up and running)
  • it should bring value when used in interactive data exploration sessions with IPython
The goal is not to design a generic replacement for Hadoop MapReduce. In particular we don't plan to address the following issues during this sprint:
  • long term fault tolerant storage with replicated data and checksummed files
  • fault-tolerance of computational nodes while processing jobs (the duration and number of nodes should be small enough to make this unlikely)
  • support for unstructured (e.g. raw text) or sparse numerical data (e.g. Compressed Sparse Rows) that are not as trivially partitionable and "memmappable" as numpy arrays (would be nice to add support for those later though).
  • support for Pregel-style distributed graph processing (e.g. PageRank, transitive closures...).
Replicated data would still be considered if it can speed-up the execution only.

Use Cases

Let's drive the design of this PoC with some real need expressed by sprint participants:

Generic data manipulation / exploration (e.g. with Pandas) use cases:

  • Distributed time series alignment between 2 (or more) large transaction logs "joined" by time codes.
  • Distributed data aggregation operations, e.g. a distributed variant of Pandas GroupBy
  • Distributed & streaming implementation of computation of means, variance and other statistical moments.
The 2 Pandas use cases would probably require an implementation of distributed shuffle (as in MapReduce): this is very likely that a 3 days sprint won't be enough to implement something workable.

Machine learning related use cases (e.g. with scikit-learn):

  • Embarrassingly parallel implementation of Sparse Coding with a fixed dictionary (for non linear feature extraction)
  • Distributed implementation of (A)SGD on a partitioned training set with non blocking averaging using message passing.

Technical solutions to investigate

  • IPython.parallel & ZeroMQ (suitable for low latency small to medium transfer of data in user space)
  • https://github.com/traviscline/gevent-zeromq/ (more low-level speed)
  • numpy memmapped arrays: let the Linux kernel handle the disk access efficiently and free working memory from unused data
  • the UNIX sendfile syscall for sending partitioned memmapped files around the cluster without loading stuff in the user space.
  • Compressed numpy arrays, for instance using carray to speed up both disk / memory access and network transfers.
  • Some kind of notion of partitioned array datastructure build on top of memmapped arrays + shared metadata for leveraging data locality on the cluster to limit the amount of data transferred over the network. We should steal the good ideas of Spark RDDs (resilient distributed datasets) or the FlumeJava API. However we don't necessary need to address resiliency in this first iteration.

References

Logistics/tasks

  • Get ssh access to a small (~10-20 nodes) cluster for all participants. Fernando will do this.
⚠️ **GitHub.com Fallback** ⚠️