1.5 Glob pattern, and the filter_joiner - NEONScience/NEON-IS-data-processing GitHub Wiki

Probably the two most important aspects of glob patterns for NEON's new data processing system are 1) how Pachyderm chooses each datum to process and 2) where to find datums in the container.

What data makes up a datum?

The glob pattern in the Pachyderm pipeline specification defines how data in the input repositor(ies) is divided into discrete units (datums) and sent into each docker container for processing. See the Pachyderm documentation on glob patterns for a great introduction. Please do read that intro - this Wiki touches only briefly on glob patterns.

The glob pattern is specified in the pipeline spec, and will be indicated for every input repo. For example,

- pfs:
      name: DATA_PATH
      repo: hmp155_calibration_group_and_convert
      glob: /hmp155/*/*/*/*

The * in the glob pattern means "each and every", so this glob pattern is saying to Pachyderm "Each and every file or folder that is 4 levels deep beyond the /hmp155 directory is a datum." A datum is essentially one thing that you want to process, and Pachyderm is going to send that thing into a container along with your code to act on.

For example, the input repo hmp155_calibration_group_and_convert is structured in sensor focus (see 1.0-Pipeline-&-repo-structure,-pipeline-naming,-terms), like this:

/SOURCE_TYPE
   /YEAR
      /MONTH
         /DAY
            /SOURCE_ID
               /DATA_TYPE

So, the glob pattern above is saying "each of /hmp155/YEAR/MONTH/DAY/SOURCE_ID" path is a datum. When your pipeline runs, it will send each iteration of /hmp155/YEAR/MONTH/DAY/SOURCE_ID (and all deeper subdirectories and files of SOURCE_ID) into a container that you specified in the image: portion of the pipeline spec. (A container is simply one instance of an image.) If you have data for 4 source IDs for the hmp155 source type for 3 days in January of 2020, you have 12 datums. Note that a single datum is sent into each container, so your code will have zero knowledge of the other datums in the input repo.

What if you changed the glob pattern to end at the DAY? Like this:

- pfs:
      name: DATA_PATH
      repo: hmp155_calibration_group_and_convert
      glob: /hmp155/*/*/*

Note that there are 3 stars instead of 4 in the glob pattern. You are telling Pachyderm "each /hmp155/YEAR/MONTH/DAY" is a datum. Thus, Pachyderm will send the data for all source IDs for one day into the container. This is completely fine, your code just needs to know how to handle the amount of data you could be sending it.

Where's my data?

For your code to operate properly, you'll probably want to know where the data is going to be placed in the container. Pachyderm will always place your data at the following path:

/pfs/[name]

where [name] matches the entry for the name: field for the pfs input in the pipeline spec. Looking at the example above, each datum will be in

/pfs/DATA_PATH

If the name: field is omitted in the pipeline spec, the default will be the value of the repo: field. In the example above, this would be:

/pfs/hmp155_calibration_group_and_convert

See 1.0-Pipeline-&-repo-structure,-pipeline-naming,-terms for why it's better to use the name: field in your pipeline spec.

The path structure beyond this point will be identical to the path structure in the input repo, but limited to each datum. Following the example above (including the name: field in the pfs block of the pipeline spec), the data for each datum will follow the path structure:

/pfs/
   /DATA_PATH/
      /hmp155
         /YEAR
            /MONTH
               /DAY
                  /SOURCE_ID
                     /DATA_TYPE

How many YEAR, MONTH, DAY, and SOURCE_ID will end up in a single container will depend on your glob pattern, but the repo will still follow this path structure.

Since your code might want to know exactly what datum got sent into the container, this will be found in the environment variable with the same name as the name: field in the pipeline spec. In the example above, the environment variable will be DATA_PATH. The value of this environment variable will be a character string equal to the exact path to the datum in the container. Continuing our example, if

- pfs:
      name: DATA_PATH
      repo: hmp155_calibration_group_and_convert
      glob: /hmp155/*/*/*/*

then:

DATA_PATH=/hmp155/YEAR/MONTH/DAY/SOURCE_ID

where YEAR, MONTH, DAY, and SOURCE_ID will be the exact values for the datum. If instead the input block looked like:

- pfs:
      name: DATA_PATH
      repo: hmp155_calibration_group_and_convert
      glob: /hmp155/*/*

(note the change in glob pattern), then the environment variable will be:

DATA_PATH=/hmp155/YEAR/MONTH

If the name: field is not specified in the pipeline spec, the environment variable will be called the value of the repo: field. So if the input block looked like:

- pfs:
      repo: hmp155_calibration_group_and_convert
      glob: /hmp155/*/*/*/*

(note the missing name: field), then:

hmp155_calibration_group_and_convert=/hmp155/YEAR/MONTH/DAY/SOURCE_ID

again, filling in YEAR, MONTH, DAY, and SOURCE_ID with the exact values for the datum.

Tip: In R, you can get the value of an environment variable by using the Sys.getenv() command.

What if I have more than one input repo?

If you have multiple pfs blocks in the pipeline specification, the same thing above applies. Depending on how you configured multiple inputs in the pipeline spec, each datum will be placed in the container in the same place described above and the path to each datum will be indicated in one or more environment variables named for the name: or repo: fields in the pipeline spec, as appropriate.

Want to know more about how to configure multiple inputs? Links to the descriptions of each the union, join, cross, or group inputs can be found at the bottom of the Pachyderm documentation on datums.

The filter-joiner module

Most of the time we process data at a glob pattern that is courser that what we would typically consider a datum. For example, a single unit of processing at the calibration module is the data from single source ID. However, sending each and every source ID into its own container for processing ends up being really inefficient. So, we typically specify the glob pattern at the DAY level and send in all available source IDs for the day into the container to be looped through.

The join method of specifying multiple inputs for a pipeline allows you to match inputs based on their glob patterns. See the Pachyderm documentation on joins linked above for a thorough explanation. In a real-world example, the input block to the hmp155_location_group_and_restructure pipeline looks like this:

input:
  join:
  - pfs:
      name: DATA_PATH
      repo: hmp155_calibration_group_and_convert
      glob: /hmp155/(*)/(*)/(*)
      joinOn: $1/$2/$3
      outer_join: true
  - pfs:
      name: LOCATION_PATH
      repo: hmp155_location_asset_assignment
      glob: /hmp155/(*)/(*)/(*)
      joinOn: $1/$2/$3

Note that the three directories after the hmp155 in the glob pattern are stars enclosed by parentheses. The parentheses combined with the joinOn: field indicates that each match of these combined sub-directories across the two inputs will be sent into a container together for processing. In the example above, data will be joined up the DAY directory, since the path structure of these inputs follows the /SOURCE_TYPE/YEAR/MONTH/DAY/... convention. Going back to the Wiki contents above, there will be environment variables DATA_PATH and LOCATION_PATH in the container that provide the exact paths to the matched datums placed in the container.

Because we process data at a datum granularity that is courser than actual, we developed a module that allows further joining of data from multiple inputs within the code. This module is called the filter-joiner (more on the filtering part later), and it is typically packaged together with other modules like calibration (calibration_group_and_restructure) or location restructuring (location_group_and_restructure).

The configuration for doing additional joining is specified in the pipeline spec in the env block of this module. For example:

env:
    # Environment variables for filter-joiner
    CONFIG: |
      ---
      # In Pachyderm root will be index 0, 'pfs' index 1, and the repo name index 2.
      # Metadata indices will typically begin at index 3.
      input_paths:
        - path:
            name: DATA_PATH
            # Filter for data directory
            glob_pattern: /pfs/DATA_PATH/hmp155/*/*/*/*/**
            # Join on named location (already joined below by day)
            join_indices: [7]
            outer_join: true
        - path:
            name: LOCATION_PATH
            # Filter for data directory
            glob_pattern: /pfs/LOCATION_PATH/hmp155/*/*/*/*/**
            # Join on named location (already joined below by day)
            join_indices: [7]

Here, the path position(s) to perform additional joining are specified in the join_indices: field. In this case, a value of 7 corresponds to the source ID, because the path structure in the container is /pfs/[name]/SOURCE_TYPE/YEAR/MONTH/DAY/SOURCE_ID... and SOURCE_ID is in the 7th position. So in this example, Pachyderm will join up to the DAY level, and our code will further join at the SOURCE_ID level. Additional paths to join on can be indicated by using a comma (e.g. [7,8]). Note that the /** at the end of the glob pattern means "everything after that".

Simple joins vs. outer joins

When a simple join is used, it means that a match must be present in all the input repos to proceed. So if we were using a simple join above, the same SOURCE_ID must be present in both the DATA_PATH and the LOCATION_PATH input repos to be make it through to further processing.

There is a small modification to this behavior in the env block above, and that is the outer_join: true specified for the DATA_PATH repo. When an outer join is used, it means that each datum in the repo with the outer join will always make it to the output for further processing regardless of whether there is a corresponding match in the other input repos.

For example, if the path /pfs/DATA_PATH/hmp155/2019/01/03/44761 is present in the DATA_PATH repo above, but /pfs/LOCATION_PATH/hmp155/2019/01/03/44761 is NOT in the LOCATION_PATH repo, then /pfs/DATA_PATH/hmp155/2019/01/03/44761 will still constitute a valid datum and proceed in processing even though there is not a corresponding match in the LOCATION_PATH repo. Conversely, if /pfs/LOCATION_PATH/hmp155/2019/01/03/44761 exists in the LOCATION_PATH repo, but /pfs/DATA_PATH/hmp155/2019/01/03/44761 does NOT exist in the DATA_PATH repo, /pfs/LOCATION_PATH/hmp155/2019/01/03/44761 will NOT proceed in processing, because this input is a simple join and not an outer join.

Path filtering with the filter-joiner

The filter-joiner has one final capability, and that is filtering for specific directories using the glob_pattern: field in the configuration above. Any path in the glob_pattern: above that is not a * will be an exact match. For example, the 3rd path position above must match hmp155 to make it through the filter-joiner.