OMOP Pipeline User Guide - Analyticsphere/ehr-pipeline-documentation GitHub Wiki

Table of Contents

0. Background

1. File Discovery

2. File Conversion

3. File Validation

4. File Normalization

5. CDM Standardization

6. Vocabulary Harmonization

7. Derived Table Generation

8. BigQuery Loading

9. Reporting

0. Background

The OMOP Pipeline is an API-driven, cloud-based application designed to process OMOP (Observational Medical Outcomes Partnership) Common Data Model files and execute standardized analytics/quality packages against them. The vast majority of data processing is completed against individual files stored in Google Cloud Storage (GCS) using the DuckDB database engine. Fully processed files are loaded into BigQuery for use by analysts.

The application consists of multiple repositories that work together to complete a single 'pipeline run':

  • The ccc-omop-file-processor repository provides APIs to validate, standardize, and load files into BigQuery
  • The ccc-omop-analyzer repository provides APIs to execute OHDSI analytics packages against OMOP datasets in BigQuery
  • The ccc-orchestrator repository provides functionality to automate the execution of a pipeline run

0.1 Incoming Data Files Requirements

OMOP data files must be provided as flat files in .csv or .parquet formats with the following specifications:

CSV File Requirements:

  • UTF-8 encoding
  • Comma delimiter
  • Properly escaped quotes
  • Either Windows or Unix line endings
  • No line breaks within fields

File Naming and Storage Conventions:

  • File names must match OMOP table names (e.g., the person table must be delivered as person.csv)
  • Files must not contain any additional prefixes or suffixes
  • The person_id value in data tables should match a subject's Connect_ID from the Connect for Cancer Prevention study
  • Files must be stored in a Google Cloud Storage (GCS) bucket folder named with the delivery date in YYYY-MM-DD format

Schema Requirements:

  • File structure should match v5.3 or v5.4 of the OMOP Common Data Model (CDM) in terms of, columns, column names, data types, and content.
  • Vocabulary files can be from any vocabulary release version, but should be from one of the three most recent vocabulary releases to ensure compatibility

0.2 Pipeline Configuration Options

The pipeline has several configuration options available across different components:

Storage Configuration:

  • Vocabulary GCS bucket: Stores OMOP vocabulary data downloaded from Athena. The GCS path is configured as an environmental variable in Cloud Run.
    • Vocabulary data should be stored in a folder named with the version (e.g., gs://ehr_vocab_bucket/v5.0 30-AUG-24/)
  • DuckDB GCS bucket: Used to store temporary files generated by DuckDB during pipeline execution. The GCS path is configured as an environmental variable in Cloud Run.

Version Control:

  • Target vocabulary version:
    • Configured in the constants.py file of the Airflow DAG project
    • Determines which version of the vocabulary data deliveries will be harmonized to
    • Supports harmonizing to/from any vocabulary version
  • Target CDM version:
    • Configured in the constants.py file of the Airflow DAG project
    • Determines which version of the CDM data deliveries will be standardized to
    • Currently supports 5.4

Performance and Logging:

  • BigQuery logging table: A table in BigQuery that stores historical pipeline runs and their status. The GCS path is configured as an environmental variable in Cloud Run.
  • cpu: Number of CPU cores to dedicate to the omop-file-processor API endpoint (default: 4). Configured in the cloudbuild.yml file of the omop-file-processor.
  • memory: Amount of memory to dedicate to the omop-file-processor API endpoint (default: 12GB). Configured in the cloudbuild.yml file of the omop-file-processor.

0.3 Study Site Configuration

Site-specific information is stored in the site_config.yml file, which is part of the Airflow DAG. The file contains the version of OMOP the site is using, the file delivery format, and where data should be stored in GCP. It's structured as follows:

site:
  'site_1':
    display_name: 'New Synthea Synthetic Data'
    gcs_bucket: 'synthea_54_bucket'
    file_delivery_format: '.csv'
    project_id: 'connect-dev'
    bq_dataset: 'synthea_54'
    omop_version: '5.4'
    post_processing: ['custom_syn_54_fix']
  'site_2':
    display_name: 'Synthea Synthetic Data'
    gcs_bucket: 'synthea_test_bucket'
    file_delivery_format: '.parquet'
    project_id: 'connect-dev'
    bq_dataset: 'synthea_test'
    omop_version: '5.3'
    post_processing: ['']

Update this file when new partners join, or if existing site configurations (OMOP version, GCS bucket, etc.) change.

0.4 Pipeline Operation

The pipeline follows a serverless API architecture, in which API calls are made to various endpoints to process OMOP data files. These API calls can be made manually, but are automated through Airflow.

Airflow Integration:

Airflow is a platform used to programmatically author, schedule, and monitor workflows (in this case, a series of API calls made in a particular order). In this application:

  • Workflows are defined as Directed Acyclic Graphs (DAGs) in Python
  • Each DAG represents a complete pipeline execution
  • DAGs orchestrate the sequence of API calls needed to process OMOP files
  • Tasks within the DAG handle specific operations like validation, conversion, and loading

Processing Flow:

  1. File discovery: The pipeline scans configured GCS buckets for new data deliveries
  2. Conversion: File is converted to standardized Parquet format
  3. Validation: File is checked against OMOP schema requirements
  4. Normalization: File is updated to meet OMOP schema requirements
  5. CDM standardization: Data file structure is transformed to target CDM version
  6. Vocabulary harmonization: Data file content is harmonized to target vocabulary version
  7. Derived table generation: Standardized derived data tables are created
  8. BigQuery loading: Processed data is loaded into BigQuery datasets
  9. Reporting: Processing results and validation findings are stored

These steps will be described in further detail below.

Pipeline Execution:

The pipeline automatically executes on a daily basis to perform file discovery. The pipeline parses the site_config.yml file to identify GCS buckets which may contain delivery files; searches for delivery folders within the GCS bucket; finds the date of the latest delivery folder; then queries the BigQuery logging table to determine if that delivery has been previously processed.

If the latest data deliveries for all sites have been processed, the pipeline will skip the remaining tasks of the Airflow DAG; otherwise, the tasks will be executed to process the data. The pipeline can process multiple files from multiple sites in parallel.

The pipeline can be manually executed through the Airflow UI to trigger this process at any time.

Data artifact files are generated by the pipeline during execution, and are stored in an artifacts/ directory within the GCS delivery folder.

0.5 Pipeline Logging

The Airflow UI provides logs describing API calls being made and their return values. Simply click on a task within the Airflow UI to view its logging output. Google Cloud Run logs have more detailed information about the functions triggered by an API call from Airflow.

In addition to being available in the Airflow DAG and Google Cloud Run logs, pipeline execution information is stored in a BigQuery logging table. The table is queried by the file discovery process to determine if a delivery needs processing. Each delivery from each partner is tracked separately in the table.

During execution, the table reflects the started/running status of the pipeline, and any errors during processing are written to the BigQuery logging table.

The pipeline will execute when the BigQuery logging table:

  • Does not contain an entry for the latest delivery from a site, or
  • Reflects an error status for the latest delivery

The pipeline will not execute when the BigQuery logging table:

  • Reflects a completed, running, or started status for the latest delivery

Troubleshooting tips:

  • To 'skip' processing a data delivery, create an empty YYYY-MM-DD folder in a site's GCS bucket (where YYYY-MM-DD has a later date than the delivery)
  • To reprocess a site's delivery, delete the delivery's information from the BigQuery table and execute the pipeline DAG
  • BigQuery does not support concurrent writes to tables. Because multiple files are processed at once, and each file results in BigQuery logging, concurrent write attempts may occur and cause a task to fail with error Too many DML statements outstanding against table. These should resolve upon automatic task retry attempts.

0.6 OMOP Pipeline Directory Structure

The OMOP File Processor creates the following directory structure in Google Cloud Storage for each data delivery:

gs://{site_bucket}/{YYYY-MM-DD}/
│
└── artifacts/
    ├── fixed_files/             # Files with fixed encoding or quoting issues
    │   └── table_name_pipeline_fix_formatting.csv
    │
    ├── converted_files/         # Standardized Parquet versions of input files
    │   ├── person.parquet
    │   ├── condition_occurrence.parquet
    │   └── ...
    │
    ├── invalid_rows/            # Rows that failed normalization
    │   ├── person.parquet
    │   ├── condition_occurrence.parquet
    │   └── ...
    │
    ├── harmonized_files/        # Files after vocabulary harmonization
    │   ├── condition_occurrence/
    │   │   ├── condition_occurrence_source_target_remap.parquet
    │   │   ├── condition_occurrence_target_remap.parquet
    │   │   ├── condition_occurrence_target_replacement.parquet
    │   │   ├── condition_occurrence_domain_check.parquet
    │   │   └── transformed/     # Data moved to different tables due to domain changes
    │   └── ...
    │
    ├── created_files/           # Generated derived tables
    │   ├── condition_era.parquet
    │   ├── drug_era.parquet
    │   ├── observation_period.parquet
    │   └── ...
    │
    ├── delivery_report/         # Data delivery report location
    │   ├── delivery_report_{site}_{date}.csv  # Final consolidated report
    │   └── tmp/                 # Individual report artifacts
    │       ├── delivery_report_part_{uuid1}.parquet
    │       ├── delivery_report_part_{uuid2}.parquet
    │       └── ...
    │
    ├── dqd/                     # Reserved for Data Quality Dashboard artifacts
    │
    └── achilles/                # Reserved for Achilles data characterization artifacts

Key Directories:

  • fixed_files: Contains corrected versions of CSV files that had encoding issues or unescaped quotes
  • converted_files: Contains all data files converted to Parquet format with standardized column names
  • invalid_rows: Contains rows that could not be normalized (e.g., required fields with invalid values)
  • harmonized_files: Contains the results of vocabulary harmonization, including concepts that moved between tables
  • created_files: Contains derived tables that were generated from other tables (e.g., drug_era from drug_exposure)
  • delivery_report: Contains the final data delivery report and temporary report artifacts

This directory structure is automatically created during pipeline execution through the create_artifact_buckets endpoint.

1. File Discovery

The pipeline contains automated functionality to identify and process new data deliveries from study sites without manual intervention. This section describes how the pipeline discovers files and determines which deliveries need processing.

1.1 Folder Structure Requirements

For automatic discovery to function correctly, data deliveries must be stored following this structure:

gs://{site_bucket}/{YYYY-MM-DD}/{files}

Where:

  • {site_bucket} is the GCS bucket configured for the site in site_config.yml
  • {YYYY-MM-DD} is a folder named with the delivery date in ISO format (e.g., 2025-01-01)
  • {files} are the OMOP data files (e.g., person.csv, condition_occurrence.csv)

The pipeline relies on this consistent folder naming convention to determine which deliveries are new and need processing.

1.2 Discovery Process

The file discovery process begins with the prepare_for_run task, which completes several steps:

  1. Retrieve Site Information: The pipeline reads the site_config.yml file to get configuration details for all sites.

  2. Vocabulary Preparation: Before scanning for new deliveries, the pipeline ensures that optimized vocabulary files are available for the target vocabulary version. This step runs once per DAG execution regardless of which deliveries need processing. These files are used in processing all deliveries by all sites, and need to be generated only one time.

  3. Identify Latest Deliveries: For each site, the pipeline:

    • Scans the site's GCS bucket for folders matching the date format pattern
    • Identifies the most recent date-formatted folder (representing the latest delivery)
  4. Check Processing Status: The pipeline queries the BigQuery logging table to determine if each site's latest delivery has already been processed:

    • If no log entry exists for the site and delivery date combination, the delivery is marked for processing
    • If a log entry exists with an error status, the delivery is marked for reprocessing
    • If a log entry exists with completed, running, or started status, the delivery is skipped
  5. Compilation: The pipeline creates a list of unprocessed deliveries, where each entry is a tuple containing the site name and delivery date. The list is returned as output by the prepare_for_run task.

1.3 Processing Decision

After compiling the list of unprocessed deliveries:

  1. The check_for_unprocessed task evaluates whether any unprocessed deliveries were found:

    • If the list is empty (no new deliveries to process), all downstream tasks are skipped
    • If the list contains at least one delivery, the pipeline continues to the processing tasks
  2. When processing continues, the get_files task executes and:

    • Logs the start of processing in the BigQuery logging table for each delivery
    • Creates artifact bucket folders to store processing outputs
    • Retrieves the list of files within each delivery folder
    • Creates a configuration object for each file, containing necessary metadata

This file discovery mechanism allows the pipeline to run on a scheduled basis (e.g., daily) while only processing new or previously failed deliveries.

Subsequent tasks in the pipeline execute against the file configuration objects returned by get_files and the list returned by prepare_for_run.

2. File Conversion

To enable consistent and predictable file processing, all incoming data is converted to Parquet format in which all columns are explicitly defined as strings. This is completed by the process_file Airflow task. Converted Parquet files are stored in the artifacts/converted_files/ GCS bucket.

The pipeline currently supports incoming CSV and Parquet files. To support more incoming file formats, functionality only needs to be created that converts incoming files to Parquet.

Note: The note_nlp table of the OMOP CDM contains a column called "offset". offset is a protected keyword in DuckDB, BigQuery, and other SQL engines, and requires special handling which differs based on the incoming file format.

2.1 Processing Parquet Input

Incoming Parquet files are copied to artifacts/converted_files/. During the copying process, file names and column names are converted to lower case, and all column types are saved as string. The implementation uses DuckDB's COPY command with a SELECT statement that explicitly aliases each column to ensure consistent naming and casing.

2.2 Processing CSV Input

CSV files can be problematic. They lack a unified standard, resulting in inconsistent implementations across tools that handle encoding, delimiters, and quoting differently. Quoting mechanisms create parsing challenges where a single misplaced quote can affect data interpretation throughout a file. Regional variations exist, with applications like Excel using different delimiters based on locale settings. Inconsistent line endings and character encodings commonly disrupt data exchange between systems.

The pipeline heavily relies on the robust CSV parser implemented by DuckDB to automatically handle these potential issues when converting files to Parquet, but it cannot resolve every problem.

If a file is not in an encoding supported by DuckDB (as of this writing, UTF-8, UTF-16, and Latin-1), the pipeline will attempt to convert the file to UTF-8 and then reprocess the file. Similarly, if an unescaped quote error is detected, the pipeline will attempt to escape the quote character and then reprocess the file. Both corrections are done row-by-row, limiting performance. Fixed files are stored in artifacts/fixed_files/ before being reprocessed. Upon reprocessing, the DuckDB configuration parameter store_rejects is set to True to allow more permissive CSV parsing.

All file and column names are converted to lower case, and all column types in the final Parquet file are saved as string.

3. File Validation

Converted Parquet files are evaluated for their adherence to OMOP CDM standards by the validate_file task. In this task, the file name is used to identify which OMOP table the file represents (i.e., person.csv is the OMOP person table). A lookup is performed against a JSON representation of the OMOP schema to determine if the table is present in the OMOP CDM. If the table exists in OMOP, the columns within the file are evaluated against the expected columns from the CDM.

3.1 Table Name Validation

Verifies that the filename (without extension) matches one of the OMOP CDM tables defined in the schema.json file. This function returns a boolean indicating whether the table name is valid.

3.2 Column Name Validation

Examines each column in the file to ensure it is a valid column in the CDM schema for that table. This function also identifies:

  • Valid columns (present in both the file and schema)
  • Invalid columns (present in the file but not in the schema)
  • Missing columns (present in the schema but not in the file)

Report artifacts are created during the file validation process and are stored in a temporary location. Later, these artifacts are compiled to generate a final data delivery report.

4. File Normalization

The file normalization process, executed by the normalize_file task, transforms the converted Parquet files to ensure they strictly adhere to OMOP CDM requirements. This step actively modifies files to conform to the expected schema structure and data types.

4.1 Core Normalization Functions

The normalization process performs several key operations:

  1. Data Type Conversion: Converts all columns to their appropriate OMOP CDM data types (e.g., VARCHAR, DATE, BIGINT, etc.) using the TRY_CAST function, which attempts conversion without failing on invalid values.

  2. Schema Completion: Adds any missing columns required by the OMOP schema with appropriate default values:

    • Missing _concept_id columns are populated with value 0
    • Missing required columns are populated with type-appropriate defaults (e.g., empty strings for text, '1970-01-01' for dates, -1 for numeric fields)
    • Missing non-required columns are populated with NULL values
    • Removes columns not present in the OMOP CDM
  3. Column Standardization: Ensures consistent column ordering and naming conventions:

    • All column names are converted to lowercase
    • Columns are arranged in the order specified by the OMOP CDM
  4. Primary Key Management: For tables with surrogate primary keys (like condition_occurrence_id), generates deterministic composite keys based on the values in all other columns. Primary keys, at this step in the pipeline, are meant to uniquely identify values within rows, rather than uniquely identify rows (i.e., if two rows have the same values across all columns, this step will result in the same primary key being generated for both rows). The primary keys are later made unique-per-row by the vocabulary harmonization step.

  5. Special Field Handling: Implements special handling for reserved keywords like the offset column in the note_nlp table.

The normalization is implemented through a SQL script generated by the get_normalization_sql_statement function, which customizes the transformation based on the specific table being processed and its schema requirements.

4.2 Invalid Row Processing

A significant feature of the normalization process is its handling of rows that don't conform to schema requirements:

  1. Invalid Row Detection: The pipeline identifies invalid rows, in which required columns contain a value which cannot be cast to its correct type.

    Note: If a required column is NULL, or a required column is not provided in the delivery file, a default value is used and the row is not considered invalid.

  2. Row Segregation: Invalid rows are:

    • Removed from the main dataset
    • Stored in a separate Parquet file in artifacts/invalid_rows/ with the same table name
    • Preserved with their original values for later examination
  3. Row Counts: The process generates row count artifacts for both valid and invalid rows, which are used in the final delivery report to document data quality issues.

4.3 Connect ID Handling

As part of normalization, the pipeline:

  1. Detects if a special Connect ID field has been provided in the data
  2. Maps this Connect ID to the person_id field if present
  3. Ensures consistent person identification across all tables

The detection is performed by searching for column names containing 'connectid' or 'connect_id' (case-insensitive), and when found, this value is used as the person identifier.

4.4 Normalization Output

The normalization process produces:

  1. Normalized Data Files: The primary output is a compliant Parquet file stored in the original location with the same name
  2. Invalid Row Files: A secondary output containing rejected rows in artifacts/invalid_rows/
  3. Row Count Artifacts: Metadata about the number of valid and invalid rows processed

This normalization step is crucial for ensuring that downstream processing can operate on consistently structured data that adheres to OMOP conventions, greatly reducing errors in later stages of the pipeline.

5. CDM Standardization

The CDM standardization process, executed by the cdm_upgrade task, transforms data files from their original OMOP Common Data Model version to the target version configured in the pipeline. This standardization step ensures that all data files from all partners conform to a single CDM version, providing a consistent foundation for subsequent processing steps or downstream analytics, regardless of the original delivery version.

Currently, the pipeline primarily supports upgrading from OMOP CDM v5.3 to v5.4, though it is designed to be extensible for future version upgrades.

5.1 Upgrade Determination

The process begins by comparing the source CDM version (the version used by the site for data delivery) with the target CDM version configured in the pipeline:

  1. If the source and target versions match, no upgrade is necessary, and the task completes without modifications
  2. If the versions differ, the pipeline identifies which tables need modification

This determination is made for each file individually.

5.2 Table Handling During Upgrade

When upgrading from CDM v5.3 to v5.4, tables are handled in one of three ways:

  1. Tables Removed in v5.4: Some tables that existed in v5.3 have been removed in v5.4. In this case, the file is deleted from the processed dataset. For example, the attribute_definition table from v5.3 is not present in v5.4.

  2. Tables Changed in v5.4: Many tables have structural changes between versions, such as new columns, changed data types, or modified constraints. For these tables, version-specific SQL transformation scripts are applied. Tables with structural changes include:

    • visit_occurrence
    • visit_detail
    • procedure_occurrence
    • device_exposure
    • measurement
    • observation
    • note
    • location
    • metadata
    • cdm_source
  3. Tables Added in v5.4: New tables introduced in v5.4 (like episode, episode_event, and cohort) get created as empty tables in BigQuery through later stages in the pipeline.

  4. Unchanged Tables: Tables with no structural changes between versions are passed through without modification.

5.3 Upgrade Implementation

The actual upgrade process uses version-specific SQL scripts stored in the pipeline's repository:

  1. The appropriate SQL upgrade script is selected based on the source table and version transition (e.g., 5.3_to_5.4/visit_occurrence.sql)

  2. The SQL script is applied to the Parquet file using DuckDB, which:

    • Adds new required columns with appropriate default values
    • Removes deprecated columns
    • Modifies data types and constraints as needed
    • Transforms data values where semantics have changed
  3. The transformed data is saved back to the original file location, overwriting the source file

The SQL scripts are carefully designed to maintain data integrity during the transition, preserving all meaningful information while adapting to the new structural requirements. The scripts are stored in a separate directory structure organized by version transition for easy maintenance and future extension.

6. Vocabulary Harmonization

Vocabulary harmonization, executed by the harmonize_vocab task, ensures that all concept codes in the data conform to a standardized target vocabulary version. This critical step enables accurate and consistent data analysis across sites that might be using different vocabulary versions in their source data.

6.1 Overview and Purpose

OMOP vocabularies evolve over time as medical knowledge advances. Changes include:

  • New concepts being added
  • Deprecated concepts being retired
  • Concept mappings being updated
  • Concept domains being reassigned

The harmonization process transforms the data from its source vocabulary version to the target vocabulary version specified in the pipeline configuration. This ensures that when querying for a particular concept across multiple sites' data, the results are consistent and comparable regardless of what vocabulary version was used during data preparation.

6.2 Harmonization Process

The vocabulary harmonization process applies several transformations in sequence, each implemented as a method in the VocabHarmonizer class in vocab_harmonization.py:

  1. Source Concept Harmonization (source_target_remapping method): Maps source_concept_id values to their updated target codes based on the target vocabulary version. This ensures that non-standard source codes point to the most current standard concept.

  2. Target Concept Remapping (check_new_targets method with TARGET_REMAP): Identifies cases where a non-standard target_concept_id has a mapping to a standard concept in the target vocabulary. This captures cases where previously non-standard concepts now have standard mappings.

  3. Target Concept Replacement (check_new_targets method with TARGET_REPLACEMENT): Identifies cases where a target_concept_id has been replaced by a newer concept, updating to the replacement.

  4. Domain Validation (domain_table_check method): Checks if the domain of each concept is appropriate for the table it appears in, and updates target concepts accordingly.

Each transformation step creates a separate output file in the harmonized files directory, preserving a record of which mappings were applied to each row. Each output file is then written to BigQuery to generate a final, complete OMOP table.

The harmonization process relies on an optimized vocabulary file created in the prepare_for_run task, which pre-computes the necessary mapping relationships to speed up processing.

6.3 Domain-Based Table Reassignment

A powerful feature of vocabulary harmonization is its ability to handle domain changes between vocabulary versions:

  1. Domain Detection: The pipeline checks if a concept's domain in the target vocabulary matches the expected domain for the table it appears in, using the domain_table_check method.

  2. Data Movement: If a concept's domain has changed (e.g., a concept previously classified as an Observation is now a Measurement), the data is:

    • Extracted from the original table
    • Transformed to match the structure of the appropriate target table using the omop_etl method
    • Loaded into the correct target table
  3. Primary Key Management: During this process, primary keys are regenerated to ensure uniqueness within the destination table using the handle_duplicate_primary_keys method.

This ensures that all concepts are stored in the tables appropriate for their domains according to the target vocabulary version.

6.4 Measurement Value Handling

Special handling is implemented for the Measurement domain:

  1. Value Concept Mapping: value_as_concept_id fields are updated to use the target vocabulary concepts
  2. Associated Mappings: When a measurement concept is moved between OMOP tables, its associated value concepts are also appropriately transformed

This is handled through custom SQL logic in the harmonization process that identifies measurement value concepts and ensures they are properly associated with their parent measurement concepts.

6.5 Implementation Details

The harmonization process:

  1. Uses an optimized vocabulary file that contains all necessary mapping relationships in a pre-computed format. This is generated during the initial file discovery process.
  2. Executes SQL transformations through DuckDB against the Parquet files
  3. Loads harmonized data to BigQuery tables directly

This process may significantly change the distribution of data across tables compared to the source data, particularly if there have been substantial domain reassignments between vocabulary versions. The resulting dataset will reflect the latest concept hierarchies and relationships defined in the target vocabulary version.

7. Derived Table Generation

The derived_data_tables task generates OMOP CDM tables that are derived from other tables rather than being directly populated from source data. These derived tables are required by OHDSI analytical tools.

7.1 Purpose of Derived Tables

Derived tables in OMOP serve several important functions:

  1. Data Aggregation: They aggregate individual clinical events into meaningful episodes or periods, making analysis more efficient
  2. Standardized Analytics: They enable consistent analytical approaches across different datasets
  3. Temporal Analysis: They support time-based analysis by defining periods of observation or exposure

These tables are considered part of the standard OMOP CDM, but they're derived from base tables rather than being directly populated from source data.

7.2 Generated Tables

The pipeline currently generates three key derived tables:

  1. condition_era: Combines individual condition occurrences into continuous periods of a given condition, collapsing closely occurring condition records into a single era. The logic to populate this table is provided by OHDSI, but rewritten for use in DuckDB/BigQuery.

  2. drug_era: Combines individual drug exposures into continuous periods of treatment, accounting for expected persistence of medications. The logic to populate this table is provided by OHDSI, but rewritten for use in DuckDB/BigQuery.

  3. observation_period: While this table is not generally considered one of the derived data tables, the pipeline standardizes the definition of an observation period across all OMOP datasets. The observation_period table provided by a site is ignored and replaced with a table populated by a SQL script bespoke to this pipeline, written to meet the OHDSI definition of an EHR observation period. There are three SQL scripts to generate the observation_period records, depending on the contents of a data delivery:

    • If visit_occurrence and death tables are delivered, the latter of the latest visit_end_date or death_date are used as the end of the observation period.

    • If the visit_occurrence table is delivered but death is not, the latest visit_end_date is used as the end of the observation period.

    • If neither visit_occurrence nor death tables are delivered, a generic observation period from 1970-01-01 through the date of pipeline execution is used.

7.3 Implementation Approach

The derived table generation is implemented in the generate_derived_data function in omop_client.py. The process follows these steps:

  1. Dependency Check: The function first checks if the required source tables for a derived table are present in the delivery. For example, condition_era requires condition_occurrence, and drug_era requires drug_exposure. If the required tables are missing, the function logs a warning and skips generating that derived table.

  2. SQL Script Selection: The appropriate SQL script is selected based on the derived table and the available source data. For observation_period, the function selectively chooses between three different scripts (observation_period_vod.sql, observation_period_vo.sql, or observation_period.sql) based on the availability of visit_occurrence and death tables.

  3. Parameter Substitution: The SQL scripts contain placeholders for file paths and other parameters. The placeholder_to_file_path function replaces these placeholders with actual GCS paths to the appropriate files, including:

    • Clinical data tables (e.g., @CONDITION_OCCURRENCE becomes the path to the condition_occurrence.parquet file)
    • Vocabulary tables (e.g., @CONCEPT_ANCESTOR becomes the path to the concept_ancestor.parquet file)
    • Site name and current date
  4. Execution: The prepared SQL is executed using DuckDB to:

    • Read the source data from Parquet files
    • Apply the transformation logic
    • Write the results to a new Parquet file in the artifacts/created_files/ directory
  5. BigQuery Loading: Finally, the newly created derived table is loaded into BigQuery using the load_parquet_to_bigquery function, allowing it to be queried alongside the other OMOP tables.

The SQL scripts for derived table generation are stored in the reference/sql/derived_tables/ directory. The drug_era script is split into two parts (drug_era_create.sql and drug_era.sql) due to its resource-intensive nature, with the first part creating intermediate tables to offload data from memory to disk.

This implementation ensures that derived tables are consistently generated across all site deliveries, regardless of whether the site provided these tables in their original data.

8. BigQuery Loading

The final stage of the OMOP pipeline transfers the processed and standardized data from GCS Parquet files into BigQuery tables where they can be queried and analyzed. This section explains how data is loaded into BigQuery and what happens during this process.

8.1 Data Loading Sequence

Before loading any new data, the prepare_bq task removes all existing tables from the dataset to prevent mixing old and new data.

The pipeline loads different types of data in a specific sequence:

  1. Harmonized Data: The first data to be loaded into BigQuery are the harmonized data files, which occurs as part of the harmonize_vocab task.

  2. Vocabulary Tables: Standard vocabulary tables (concept, concept_relationship, etc.) are then loaded into BigQuery by the load_target_vocab task.

    • If the constants.LOAD_ONLY_TARGET_VOCAB value in the DAG is set to True (the default), the target vocabulary tables will be loaded to BigQuery, replacing any site-provided vocabulary tables.
    • If the value is set to False, the site's vocabulary tables will be loaded to BigQuery, but any missing tables not provided by the site will come from the target vocabulary.
  3. Clinical Data Tables: Any remaining non-vocabulary or non-harmonized data files are loaded by the load_to_bq task. These include tables like person, care_site, and others that didn't require vocabulary harmonization.

  4. Derived Tables: Generated tables like condition_era and drug_era are loaded last by the derived_data_tables task. These tables depend on the clinical data tables already being loaded.

8.2 Loading Behaviors

Different tables are loaded with different behaviors, controlled by the write type parameter:

Table Type Loading Behavior Write Type Purpose
Standard Vocabulary Replace SPECIFIC_FILE Ensures consistent vocabulary across all sites
OMOP Clinical Data Replace PROCESSED_FILE Provides clean, standardized data for each delivery
Harmonized Data Fragments Append ETLed_FILE Preserves data moved between tables during harmonization
Derived Tables Replace SPECIFIC_FILE Regenerates derived data based on latest source data

These behaviors ensure data integrity while handling the complexity of vocabulary harmonization and derived table generation. The different write types are implemented in the constants.BQWriteTypes enum and control how the BigQuery load jobs interact with existing tables.

8.3 Final Cleanup

After all tables are loaded, the pipeline performs final cleanup tasks through the final_cleanup task:

  1. Missing Table Creation: The create_missing_omop_tables function creates empty tables for any standard OMOP tables not included in the delivery, ensuring a complete CDM schema.

  2. CDM Source Population: The populate_cdm_source function adds a record to the cdm_source table with delivery metadata if not already provided. This information includes:

    • Source name and abbreviation
    • CDM version and vocabulary version
    • Data delivery date
    • Processing date
  3. Completion Logging: The bq_log_complete function marks the delivery as complete in the pipeline logging table, allowing the pipeline to skip this delivery in future runs.

The BigQuery dataset is now ready for querying. Analysts can use standard SQL to query the data, and the dataset can be accessed through the BigQuery console, connected to visualization tools, or queried programmatically through the BigQuery API.

9. Reporting

Throughout the pipeline's execution, information about the data delivery and processing is captured as report artifacts. These artifacts are combined into a comprehensive data delivery report during the final cleanup phase. This section explains how the reporting system works and what information is included in the final report.

9.1 Report Artifact Generation

A report artifact is a single piece of information that describes an aspect of the data delivery or processing. It is implemented via the ReportArtifact class in report_artifact.py, and artifacts are created at various stages throughout the pipeline.

Each report artifact is saved as a small, individual Parquet file in the artifacts/delivery_report/tmp/ directory within the site's GCS bucket. The files are named with random UUIDs to prevent collisions when multiple artifacts are created in parallel.

9.2 When Artifacts Are Created

Report artifacts are generated throughout the pipeline execution at various stages:

  1. File Validation:

    • Valid/invalid table name detection
    • Valid/invalid/missing column detection
    • Schema compatibility checks
  2. Normalization:

    • Row count reporting for valid rows
    • Row count reporting for invalid rows that couldn't be normalized
    • Data type conversion results
  3. Vocabulary Harmonization:

    • Source concept mappings
    • Target concept remappings
    • Domain changes
    • Concept replacements
  4. Final Delivery Information:

    • Delivery date
    • Site name
    • File delivery format
    • Delivered CDM version
    • Delivered vocabulary version
    • Target CDM version
    • Target vocabulary version
    • Pipeline version
    • Processing date

Each component of the pipeline that creates these artifacts uses the ReportArtifact class's save_artifact() method to write the information to a temporary Parquet file.

9.3 Report Compilation

During the final cleanup phase, the generate_report() function in utils.py compiles all the individual report artifacts into a single comprehensive CSV file:

  1. The function first creates final delivery information artifacts (metadata about the delivery itself)
  2. It then identifies all temporary artifact files in the artifacts/delivery_report/tmp/ directory
  3. A DuckDB SQL statement with multiple UNION ALL clauses is constructed to combine all artifacts
  4. The results are written to a single CSV file named delivery_report_{site}_{delivery_date}.csv in the artifacts/delivery_report/ directory

This consolidated report provides a complete record of the delivery processing, including:

  • All validation results
  • Data quality metrics
  • Transformation statistics
  • Processing metadata

9.4 Report Structure

The final delivery report CSV file contains these columns:

  • metadata_id: Unique identifier for each row in the report
  • metadata_concept_id: OMOP concept ID related to the information
  • metadata_type_concept_id: Fixed value (32880) indicating this is pipeline metadata
  • name: Description of what the artifact represents
  • value_as_string: String representation of the artifact's value
  • value_as_concept_id: OMOP concept ID representation of the value
  • value_as_number: Numeric representation of the value
  • metadata_date: Date when the artifact was created
  • metadata_datetime: Timestamp when the artifact was created

This structure allows users to filter, group, and query the report data to understand the delivery processing results.

9.5 Using the Report

The delivery report serves several important purposes:

  1. Data Quality Assessment: Identifies issues with the delivered data, such as:

    • Missing required tables or columns
    • Invalid column values
    • Schema inconsistencies
  2. Processing Transparency: Documents all transformations applied to the data:

    • Number of rows processed
    • Number of invalid rows
    • Vocabulary mappings applied
  3. Delivery Documentation: Provides metadata about the delivery itself:

    • Original CDM and vocabulary versions
    • Target CDM and vocabulary versions
    • Processing date and pipeline version
  4. Audit Trail: Creates a permanent record of the processing for compliance and reproducibility

The report can be downloaded from GCS and opened in any CSV-compatible application. For data quality monitoring across multiple deliveries, the reports can be loaded into a database or analytics tool to track trends and improvements over time.