1.0 Pipeline & repo structure, pipeline naming, terms - NEONScience/NEON-IS-data-processing GitHub Wiki

Pipeline structure & processing workflow (DAG)

This diagram shows a model product DAG that depicts a typical processing workflow for a product in Pachyderm, with descriptions of the components. The naming convention for repositories and processing modules is shown in red text below the descriptions. The DAGs for many products will look very similar to this, but the exact components and workflow may vary according to the details of the product. Check out the draft DAGs for other products and add your own as a subpage here.

Pipeline naming

Pipelines and repositories are named with a prefix of either the source type or the data product (DP) short name translated to camelCase.

The general rule is that pipeline modules before the group_path stage use the source type (e.g. prt_), while modules after this step use the DP short name (e.g. tempSoil_).

After the prefix is a descriptor of the repo or module in snake case (delimited with an underscore). Be brief but easily understood (e.g. prt_calibration_group_and_convert) and consistent with other DAGs that use the same modules. In other words, if your produce pipeline uses the same module as another pipeline, the prefix should be the only difference in its name.

Finding the source type to use: The source type is an Engineering-defined term. The avro-schemas repo uses source types in the file names, and inside the schema files themselves. The schema files also list the full name of the sensor, so if you're unsure about which source type to use verify the sensor name is what you need.

Finding the data product short name: The data product short name is a controlled term, viewable through the Data Product Manager in the SOM portal. Use the search feature to find the L1 version of your product. Under your product listing, there will be a field called 'short name'. Use this value in your pipeline name, modified from dash-case to camelCase (e.g. temp-air-single should become tempAirSingle in your pipeline).

Repository structure

Repositories in Pachyderm are akin to the hard drive on your computer, logically organized into directories that hold data. The directory structure differs somewhat depending on where you are in the DAG, but the data are typically organized by source type, date, group ID (if applicable), source ID or location ID, and data type (e.g. data, quality flags, location information, etc).

Sensor focus

All product DAGs start out with data separated by source type, one repository per source type. After conversion from Trino into parquet format (the data_source module), the repository is organized like this:

/SOURCE_TYPE
   /YEAR
      /MONTH
         /DAY
            /SOURCE_ID
               /DATA_TYPE

Each YEAR folder is nested within each SOURCE_TYPE folder, each MONTH folder is nested within each YEAR folder, and so on. For example, the (abbreviated) output repository of the prt_data_source_trino module looks like this:

/prt
   /2019
      /01
         /01
            /13417
               /data
                  /prt_13417_2019-01-01.parquet
            /14491
               /data
                  /prt_14491_2019-01-01.parquet
         /02
            /13417
               /data
                  /prt_13417_2019-01-02.parquet
            /14491
               /data
                  /prt_14491_2019-01-02.parquet

This nested structure allows relevant metadata like calibration or location information to be added to the existing structure so that all of the data & information needed to process each day of data for each source ID is nested within the folder for each day and source ID. For example, after grouping in and applying calibration information in the prt_calibration_group_and_convert module, additional outputs for uncertainty information and quality flags are produced. The output repo looks like this (again, abbreviated):

/prt
   /2019
      /01
         /01
            /13417
               /data
                  /prt_13417_2019-01-01.parquet
               /uncertainty_data
                  /prt_13417_2019-01-01_uncertaintyData.parquet
               /uncertainty_coef
                  /prt_13417_2019-01-01_uncertaintyCoef.json
               /flags
                  /prt_13417_2019-01-01_flagsCal.json
            /14491
               /data
                  /prt_14491_2019-01-01.parquet
               /uncertainty_data
                  /prt_14491_2019-01-01_uncertaintyData.parquet
               /uncertainty_coef
                  /prt_14491_2019-01-01_uncertaintyCoef.json
               /flags
                  /prt_14491_2019-01-01_flagsCal.json

Location focus

Although L0 data is received by HQ and stored by sensor (for good reasons we won't go into here), it is published on the NEON Data Portal by location (e.g. soil temperature at soil plot 1, depth 1). Thus, relatively early in the DAG, after any calibrations are applied, the structure of the repository changes to be organized by day and location ID instead of day and source ID. The location ID replaces the source ID in the repo structure and in the file names:

/SOURCE_TYPE
   /YEAR
      /MONTH
         /DAY
            /LOCATION_ID
               /DATA_TYPE

In a real-world example, the (abbreviated) output repository of the prt_location_group_and_restructure module is shown below. Note the addition of the location folder which hold metadata about the specific named location for use in downstream processing. The source ID is retained in the file name because if there are multiple source IDs installed at that location within the same day (i.e. sensor swap), location files for both source IDs will be retained to show the time of the swap. Data in the other files has been merged for all sensors at that location over the course of the day.

/prt
   /2019
      /01
         /01
            /CFGLOC100238
               /data
                  /prt_CFGLOC100238_2019-01-01.parquet
               /flags
                  /prt_CFGLOC100238_2019-01-01_flagsCal.parquet
               /location 
                  /prt_3144_locations.json
               /uncertainty_coef
                  /prt_CFGLOC100238_2019-01-01_uncertaintyCoef.json 
               /uncertainty_data
                  /prt_CFGLOC100238_2019-01-01_uncertaintyData.parquet
            /CFGLOC100241
               /data
                  /prt_CFGLOC100241_2019-01-01.parquet
               /flags
                  /prt_CFGLOC100241_2019-01-01_flagsCal.parquet
               /location 
                  /prt_3119_locations.json
               /uncertainty_coef
                  /prt_CFGLOC100241_2019-01-01_uncertaintyCoef.json 
               /uncertainty_data
                  /prt_CFGLOC100241_2019-01-01_uncertaintyData.parquet

Group focus

If each location of each source type resulted in one instance of a data product, there would be no need to group or differentiate sensors after converting to location focus. However, many data products involve multiple source types and/or multiple sensor locations. One final adjustment to the repository structure is made which adds in the group ID into the directory structure. The group ID is the identifier used to group together the specific named locations that are involved in the computation of each instance of a data product (see Wiki page on Groups and Context for more info). Keeping with the concept above, all the data/information needed to process each instance of the data product for each day is nested within the folder for the day and group ID:

/YEAR
   /MONTH
      /DAY
         /GROUP_ID
            /SOURCE_TYPE
               /LOCATION_ID
                  /DATA_TYPE

Note that a folder for the group ID has been inserted into the path and the source type folder has moved from the root of the repo to within the folder for each group ID. This is because a group may include data from multiple source types and even other groups. Even if a data product is derived from a single sensor location from a single source type, a group is still used in order to create consistency in DAGs and traceability from sensors to locations to products. In a real-world example, below is the (abbreviated) output repo for the tempAirSingle_group module. The Single Aspirated Air Temperature product requires data from the prt and dualfan located in the same aspirated shield as well as the windobserverii from the same measurement level.

/2019
   /01
      /01
         /temp-air-single-114
            /prt
               /CFGLOC101255
                  /data
                     /prt_CFGLOC101255_2019-01-01.parquet
                  /flags
                     /prt_CFGLOC101255_2019-01-01_flagsCal.parquet
                  /location
                     /CFGLOC101255.json 
                     /prt_20182_locations.json 
                  /uncertainty_coef
                     /prt_CFGLOC101255_2019-01-01_uncertaintyCoef.json
                  /uncertainty_data
                     /prt_CFGLOC101255_2019-01-01_uncertaintyData.parquet
            /dualfan
               /CFGLOC101255
                  /data
                     /dualfan_CFGLOC101255_2019-01-01.parquet 
                  /location
                     /dualfan_35666_locations.json
            /windobserverii
               /CFGLOC101251
                  /data
                     /windobserverii_CFGLOC101251_2019-01-01.parquet  
                  /location
                     /windobserverii_42735_locations.json
         /temp-air-single-152
            /prt
               /CFGLOC110722
                  /data
                     /prt_CFGLOC110722_2019-01-01.parquet
                  /flags
                     /prt_CFGLOC110722_2019-01-01_flagsCal.parquet
                  /location
                     /CFGLOC110722.json 
                     /prt_20187_locations.json 
                  /uncertainty_coef
                     /prt_CFGLOC110722_2019-01-01_uncertaintyCoef.json
                  /uncertainty_data
                     /prt_CFGLOC110722_2019-01-01_uncertaintyData.parquet
            /dualfan
               /CFGLOC110722
                  /data
                     /dualfan_CFGLOC110722_2019-01-01.parquet 
                  /location
                     /dualfan_42290_locations.json
            /windobserverii
               /CFGLOC110718
                  /data
                     /windobserverii_CFGLOC110718_2019-01-01.parquet  
                  /location
                     /windobserverii_8859_locations.json

Terms

At various stages of pipeline construction, you'll want to specify the term in a module or your own code. In nearly all cases, terms found in data files are controlled by schemas. Schemas specify the column (term) naming for the data in a data file and also contain documentation and units for each term.

L0 schemas are created by Engineering for each source type (a.k.a. sensor type, e.g. prt). They are located in the Engineering/avro_schemas Git repo.

The L0 schema for the prt source type (prt.avsc):

{
    "type": "record",
    "name": "prt",
    "namespace": "org.neonscience.schema.device",
    "doc": "100 Ohm Platinum Resistance Thermometer",
    "__version": "1.0",
    "__neon_parts": [
        "CA00110002",
        "AB03950006",
        "AB03960000",
        "CF00610000",
        "CF00620000",
        "AB03950012",
        "AB03950075",
        "CD03220010",
        "CD03220000"
    ],
    "fields": [
        {
            "name": "source_id",
            "type": "string",
            "doc": "Source serial number or MAC address"
        },
        {
            "name": "site_id",
            "type": "string",
            "doc": "NEON site identifier"
        },
        {
            "name": "readout_time",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            },
            "doc": "Timestamp of readout expressed in milliseconds since epoch",
            "__neon_units": "millisecond"
        },
        {
            "name": "resistance",
            "type": "float",
            "doc": "Measured resistance of the platinum resistance thermometer",
            "__neon_units": "ohm",
            "__neon_stream_id": "0"
        }
    ]
}

In the calibration module, L0 terms that undergo calibration will change to L0' terms. For example, resistance changes to temp in the prt pipeline. To change the terms from input to output, you will create a schema that specifies the new terms and send this as an input into the module (note that some terms may not change). L0' and higher schemas are created by Science and housed in the NEONScience/NEON-IS-avro-schemas Git repo.

The L0' schema for the prt source type (prt_calibrated.avsc):

{
  "type": "record",
  "name": "prt_calibrated",
  "namespace": "org.neonscience.schema.dp0p",
  "doc": "Calibrated Platinum Resistance Thermometer.",
  "fields": [
    {
      "name": "source_id",
      "type": "string",
      "doc": "Source serial number or mac address"
    },
    {
      "name": "site_id",
      "type": [
          "null",
          {
            "type": "fixed",
            "size": 4,
            "name": "utf8"
          }
      ],
      "doc": "NEON site identifier"
    },
    {
      "name": "readout_time",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      },
      "doc": "Timestamp of readout expressed in milliseconds since epoch",
      "__neon_units": "millisecond"
    },
    {
      "name": "temp",
      "type": [
        "null",
        "float"
        ],
      "default": null,
      "doc": "The temperature measured by the PRT as determined by applying the calibration polynomial coefficients to the measured resistance.",
      "__neon_units": "celcius"
    }
    ]
}

Christine Laney's Workbook Checker App or the pub workbook is an excellent tool to look up the L1 terms used in your data product. One thing to keep in mind when using that tool is what kind of data your module is working on. For example, prior to aggregation of data up to 1, 2, 5 or 30 minute data, your terms will not have 'Mean' in them (e.g. linePAR), while after the aggregation step the terms will have 'Mean" appended to them (e.g. linePARMean).

Another important thing to keep in mind is that the L0 terms (and L0' terms) used in your data product will likely change in the new processing system! This is because data products that use data from the same source type will have a common portion of the pipeline that applies calibration, so all products that use that source type must use the same L0 term name. This is not the case in the existing processing system. For example, the soil temperature and single aspirated air temperature data products both use prt data. The existing L0 term for soil temperature is the terms database is soilPRTResistance, whereas the existing L0 term for single-aspirated air temperature is PRTResistance. In the new system, the L0 term for prts across all product pipelines is resistance, and the L0' term for prts across all product pipelines is temp.

If you're unsure what term to invoke in the input parameters of the pipeline spec for your module, it's usually helpful to look at the upstream output repo to see what terms are used in those files.

IMPORTANT: In the QA/QC module, the term(s) listed in the thresholds.json must match the terms in the data file. These may not automatically match! The existing processing system uses L0 terms for assignment of thresholds. In the new processing system, the L0' terms are used. You will need to work with CI to change the terms assigned to thresholds on INT to the new L0' terms for the source type. For example, the thresholds for soil temperature and single-aspirated air temperature in the current processing system are associated with the L0 terms soilPRTResistance and PRTResistance, respectively. In the new system, we changed the threshold terms on INT for both products to temp, which is the new L0' term for prt sensors. But then how do we differentiate between thresholds for soil temperature vs. air temperature? Context! The 'soil' context is used in the threshold filter for soil temperature, and the 'single-aspirated' context is used in the threshold filter for single-aspirated air temperature. See the Wiki pages on Groups and Context and Thresholds for more information on these topics.