User Tutorial : distpy on cloud - Schlumberger/distpy GitHub Wiki

Creating Noise Logs using distpy on Azure Databricks and Datalake Gen 2

In this tutorial we will cover using distpy in a cloud environment. We have chosen to use Azure for the example, the source data are in SEG-Y format and stored on a Datalake Gen 2 storage.

We assume familiarity with setting up a Databricks ApacheSpark cluster and starting a Workspace.

There are 3 steps:

  1. Attach the Datalake Gen 2 to the Databricks cluster
  2. Create or locate configuration files
  3. Run the basic script

Step 1: Attach the Datalake Gen 2 to the Dabricks Cluster

We mount the Datalake Gen 2 store so that it appears in dbfs:/mnt/segy/ on the Databricks cluster. Details on mounting the storage can be found in Microsoft's Azure documentation

Step 2: Create or locate configuation files

On Azure, for the SEG-Y ingestion step, the mounted datalake will be the in_drive for ingesting SEG-Y. The name must include the prefix /dbfs/, which means "data base file system", even though you will have apparently mounted to /mnt/ in the previous step.

When generating the resulting WITSML files, the in_drive is the temporary storage, and the out_drive will be our datalake.

distpy has a convention of project naming, so the datalake directory should be {projectname}/{segy_directory} and we will create a similar {projectname}/{ingested_directory} structure on the temporary storage. In this example we will assume that this is the first project at Schlumberger Cambridge Research in 2020, so our local naming convention gives us a project name of 2020scr0001, you are free to choose any project naming convention you want this is just a root directory name for your project.

We are running in parallel and, for this example, we started a cluster with 8 cpus. You can adjust the parallel and ncpu to suit your own requested cluster.

The complete configuration JSON for the ingestion part looks like:

{
 "in_drive"  : "/dbfs/mnt/segy/",
 "out_drive" : "/dbfs/tmp/hdvs/",
 "project"   : "2020scr0001",
 "data"      : "sgy",
 "results"   : "sgy_data",
 "parallel_flag"  : 1,
 "ncpu"      : 8
}

and for the processing part we have:

{
 "in_drive"  : "/dbfs/tmp/hdvs/",
 "out_drive" : "/dbfs/mnt/segy/",
 "project"   : "2020scr0001",
 "data"      : "sgy_data",
 "results"   : "witsml",
 "parallel_flag"  : 1,
 "ncpu"      : 8
}

Additionally, we need to provide a set of commands corresponding to the signal processing graph. For noise logs we have high pass filters on total energy (see the paper by McKinley et al. (1973) or the Wikipedia entry on Spectral Noise Logging for an introduction). Noise log bands are not the only choice and there are reasons to choose other schemes for automated detection systems, a discussion on this topic in a machine-learning context can be found in the SPE Journal paper by van der Spek et al., (1999).

For the standard noise log attributes, the command JSON for the lowest 3 bands looks like:

{
"document" : 0,
"name" : "Standard 3-band Noise Log",
"description" : "Creates total energy outputs for the 200*, 600* and 1000* bands of a conventional Noise Log.",
"command_list" :
[
 { "name" : "fft",            "uid" :  1, "in_uid" :  0 },
 { "name" : "rms_from_fft",   "uid" :  2, "in_uid" :  1, "low_freq" : 0, "high_freq" : -1 },
 { "name" : "multiple_calcs", "uid" :  3, "in_uid" :  1,   "func" : "te_from_fft",     "low_freq" : [200,600,1000], "high_freq" : [-1,-1,-1] },
 { "name" : "write_witsml",   "uid" :  4, "in_uid" :  3, "directory_out" : "NoiseLog", "low_freq" : [200,600,1000], "high_freq" : [-1,-1,-1],  "gather_uids" : [2], "data_style" : "NoiseLog" }
]
}

A few key features that are generic to many distpy processing flows are worth highlighting in this example.

Every command step has a unique identifier, uid, in a purely linear flow these increment. Each command receives the output from a previously executed command via the in_uid, and can receive output from multiple additional commands via the gather_uids list. There is an unlisted command with "uid" : 0, corresponding to reading of a data chunk. This root-node of the signal processing tree is assumed.

To help clarify the command system we will provide information on each command in turn. This information can be generated by distpy for a given flow as a LaTeX appendix, together with a dot graph picture of the overall flow, by setting the "document" : 1

The following text comes from this AUTODOCUMENT LINK option:

We suggest putting files containing the two configurations and the signal processing flow in the temporary storage. In what follows we assume that the files are respectively:

/dbfs/tmp/hdvs/config/sgyConfig.json
/dbfs/tmp/hdvs/config/strainrate2noiselogConfig.json
/dbfs/tmp/hdvs/config/strainrate2noiselog.json

Step 3: Run the script

The distpy python script for Noise logging includes a final averaging step because conventionally Noise Logs would be over a 5 minute average per station. The averageFBE() function constructs an extending average window, returning both average and standard deviation results. This allows review of the standard deviation to determine when sufficient time have been averaged for the Noise Log.

import distpy.ingesters.parallel_ingest_sgy
import distpy.controllers.parallel_strainrate_processing
import distpy.io_help.witsmlfbe
import os

def main(config_segy, config_segy2witsml, witsmldir, outdir_avg, outdir_std):
    # ingest SEGY
    distpy.ingesters.parallel_ingest_sgy.main(config_segy)

    # SEGY to Noise Logs
    distpy.controllers.parallel_strainrate_processing.main(config_segy2witsml)

    # Make sure that the additional output directories exist to hold the long-term average and standard deviation
    if not os.path.exists(outdir):
        os.makedirs(outdir)
    if not os.path.exists(outdir_std):
        os.makedirs(outdir_std)

    # Average up the Noise Logs
    for root, dirs, files in os.walk(witsmldir):
        distpy.io_help.witsmlfbe.averageFBE(files, root, outdir, outdir_std)

if __name__ == '__main__':
    # Some paths
    TMP_PATH = '/dbfs/tmp/hdvs'
    STORAGE_PATH = '/dbfs/mnt/hdvs'
    PROJECT = '2020scr0001'
    CONFIG = 'config'
    WITSML = 'witsml'

    config_segy = os.path.join(STORAGE_PATH,PROJECT,CONFIG,'sgyConfig.json')
    config_segy2witsml = os.path.join(STORAGE_PATH,PROJECT,CONFIG,'strainrate2noiselogConfig.json')
    witsmldir = os.path.join(TMP_PATH,PROJECT,WITSML,'NoiseLog')
    outdir = os.path.join(STORAGE_PATH,PROJECT,WITSML,'NoiseLog_avg')
    outdir_std = os.path.join(STORAGE_PATH,WITSML,'NoiseLog_std')

    main(config_segy, config_segy2witsml, witsmldir, outdir, outdir_std)