Dev Notes : Architecture - Schlumberger/distpy GitHub Wiki

An architecture for asynchronous multiprocessing

Distributed Acoustic Sensing (DAS) data is huge and currently is recorded in a wide range of formats. Typical DAS signal processing can be described by a directed-graph and, generally, the same processing we apply to the first record we would like to apply to all the records. At the same time, compute hardware and storage, particularly large-scale processing and storage are in a period of transition; so sometimes we have data on network attached storage (NAS) connected to a laptop and other times we have cloud compute with blob storage. The architecture for distpy needed to reflect these needs.

  • To handle the wide range of archive storage types and file formats we chose ingestion to an intermediate format of stored numpy arrays.

  • To allow mixed storage types we use a {project} that maps across the set of storage types.

  • To handle the wide range of hardware types we chose python and defined a coding style that is cross-platform compatible

  • To provide directed-graph signal processing chains we use a Command Pattern

  • To allow data processing at scale we encapsulate the processing of a single data chunk in a Worker and duplicate it across all data chunks

  • To allow ingestion across large multifile data archives, we encapsulate the ingestion of a single file or blob in a Worker and duplicate it across all files or blobs

  • To provide an initial capability for parallel we use python's multiprocessing in parallel controllers

  • To provide CPU-enabled signal processing we chose the python cupy package

In distpy we break the workflow into ingestion and processing. These can be executed in a single script as in the CASE00.py example, or they can be separated in time and space, executed independently on different hardware.

The structure of the code

The folders encapsulate the separate ideas in the architecture.

  • calc - all the commands, algorithms, plotting functions and the link to cupy. This is the code that does the compute tasks.

  • controllers - multiprocessing controllers for the signal-processing and plotting workers.

  • ingesters - multiprocessing controllers for the ingesters

  • io_help - file readers and writers The equivalent of calc for I/O

  • workers - encapsulation of a single-task, defined as the execution of a directed graph of commands

Ingestion to numpy arrays

The example multiprocessing controllers for the ingestion are in the ingesters directory.

To handle the wide range of archive storage types and file formats we chose ingestion to an intermediate format of stored numpy arrays.

The project

In distpy the {project} can be viewed as an intermediate directory. It sits between a {storage_location} and a directory {location}. The structures are captured in the JSON configuration for hardware and storage, as explained in the disection of CASE00.py.

The ingestion, being separated from the signal processing, allows for an in_drive with a data location and an out_drive with a results location. These can be on separate filestores. A similar approach is used for the JSON configuration for hardware and storage for the signal processing. Clearly in most situations the in_drive and data for the signal processing would match the out_drive and results from an earlier ingestion. This also means that you can use distpy just for signal processing, if you have your own ingestion codes that provide the data in suitable chunks.

The function distpy.io_helpers.systemConfig reads a JSON configuration for hardware and storage.

Coding style for compatibility

Python is a cross-platform scripting language with very good support for most features.

Several compromises were made to ensure, in particular, that distpy could be used on cloud vms, be parallel via multiprocessing, and GPU-enabled using cupy.

After testing on several cloud platforms, it was found that using 'init.py' files that effectively restrict the user to explicit imports was the most robust approach. This means that a general 'import distpy' is not guaranteed to give access to the whole package, and you will see extended forms such as:

import distpy.controllers.parallel_strainrate_processing

To use the parallel multiprocessing module, we define the behaviour for the case where __name__ == "__main__". For more information on this see the multiprocessing documentation. The short version is that multiprocessing fires off additional python threads of the same script, so you need to catch the time when it is called with the name __main__ as the primary call; subsequent times through the script correspond to the multiprocessing threads.

The cupy module implements a portion of the numpy and scipy modules on CUDA. The implementation is incomplete and cupy can only be installed where CUDA is available on the hardware. This is solved in distpy by supporting a facade for cupy where it is not installed, and to keep the structure manageable all access to cupy is via agnostic.py using calculations defined in extra_numpy.py

TO BE COMPLETED...

  • To provide directed-graph signal processing chains we use a Command Pattern

  • To allow data processing at scale we encapsulate the processing of a single data chunk in a Worker and duplicate it across all data chunks

  • To allow ingestion across large multifile data archives, we encapsulate the ingestion of a single file or blob in a Worker and duplicate it across all files or blobs

  • To provide an initial capability for parallel we use python's multiprocessing in parallel controllers

  • To provide CPU-enabled signal processing we chose the python cupy package