General: Other Processes: Incremental Heal Logic - FlipsideCrypto/fsc-evm GitHub Wiki

Bypass incremental logic

Summary:

When adding new data into incremental models or updating existing rows/columns, typically a full-refresh of the model is required, as the incremental logic and model configuration may prevent it from loading historically. (e.g. _inserted_timestamp >= (SELECT MAX(_inserted_timestamp) FROM {{ this }})). We can avoid the full-refresh by simply bypassing the incremental logic on a specific CTE with the HEAL_MODEL variable.

Best Practices, Tips & Tricks:

  1. The variable is set as an array, so it can take multiple STRING values at once.
  2. It is recommended to match the variable name with the name of the CTE, and if possible, use unique, standardized names across models. This makes it easy for non-familiar engineers to reload records for a specific protocol or function without too much domain knowledge.
  3. Approach usage with caution and test thoroughly against the model's incremental_strategy. This implementation uses delete+insert to ensure that all rows are properly replaced.
  4. Usage of this variable on an incremental run doesn't actually run the "full-refresh" on the model, it simply negatives the "if is_incremental" jinja block.

Implementation Steps, Variables & Notes:

Use a variable to negate incremental logic:

The recommended implementation is to add this variable as a check within individual CTEs. This check essentially looks for the STRING that we set, and if that STRING is included in the variable, then it negates the is_incremental logic. See example below.

  • Variable: HEAL_MODELS
    • Default is an empty array []

    • When item is included in var array [], incremental logic will be skipped for that CTE / code block

    • When item is not included in var array [] or does not match specified item in model, incremental logic will apply

    • Example set up: {% if is_incremental() and 'axelar' not in var('HEAL_MODELS') %}

    • Example use case: reload records in a curated complete table without a full-refresh, such as silver_bridge.complete_bridge_activity

    • Usage:

      • Single CTE: dbt run --vars '{"HEAL_MODELS":"axelar"}' -m ...
      • Multiple CTEs: dbt run --vars '{"HEAL_MODELS":["axelar","across","celer_cbridge"]}' -m ...

Examples, References & Sources:

polygon.silver_bridge.axelar_contractcallwithtoken

...
axelar AS (
    SELECT
        block_number,
        tx_hash,
        event_index,
        bridge_address,
        event_name,
        platform,
        ...
        destination_chain,
        token_address,
        token_symbol,
        amount_unadj,
        _id,
        _inserted_timestamp
    FROM
        {{ ref('silver_bridge__axelar_contractcallwithtoken') }}

{% if is_incremental() and 'axelar' not in var('HEAL_MODELS') %}
WHERE
    _inserted_timestamp >= (
        SELECT
            MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
        FROM
            {{ this }}
    )
{% endif %}
),
...

Handle late-arriving data with "Heal" logic

Summary:

When curating data into a model that joins in multiple sources and runs incrementally, there may be portions of the data that arrive late. This can occur for a variety of reasons, but a common one is that the underlying tables that make up the joins run at different intervals. Often times, this will result in NULL values where data should otherwise exist. To resolve this, we apply "Heal" logic that is comprised of an additional CTE that looks for NULL values from {{ this }}, re-joins the underlying tables and unions it back into the FINAL model results.

Best Practices, Tips & Tricks:

  1. When applying transformations or healing columns in the "heal_models" CTE, it is recommended to use new column names such as adding _heal to the alias. If not, the model may compile with an alias conflict or invalid identifier and produce inconsistent results.
  2. Dependent on the incremental_strategy defined in the DBT model configuration (delete+insert, merge etc.) and the unique_key, the WHERE statement will need to pull in all of the rows applicable to the incremental run. E.g. if the model uses delete+insert on block_number, then the "heal_models" CTE will need to include all transactions within the impacted block, not just the impacted transaction or event.

Implementation Steps, Variables & Notes:

Use a variable to heal a model incrementally:

The recommended implementation is to not run the "Heal" logic on every incremental. It should require tags = 'heal' in the model config, which is triggered by the dbt_run_heal_models workflow. See below for details on setting the variable.

  • Variable: HEAL_MODEL
    • Default is FALSE (Boolean)

    • When FALSE, logic will be negated

    • When TRUE, heal logic will apply

    • Include heal in model tags within the config block for inclusion in the dbt_run_heal_models workflow, e.g. tags = 'heal'

    • Usage: dbt run --vars '{"HEAL_MODEL":True}' -m ...

Examples, References & Sources:

polygon.silver_bridge.complete_bridge_activity

...
{% if is_incremental() and var(
  'HEAL_MODEL'
) %}
heal_model AS (
  SELECT
    block_number,
    tx_hash,
    event_index,
    ...
    t0.token_address,
    C.symbol AS token_symbol,
    C.decimals AS token_decimals,
    amount_unadj,
    CASE
        WHEN C.decimals IS NOT NULL THEN (amount_unadj / pow(10, C.decimals))
        ELSE amount_unadj
    END AS amount_heal,
    CASE
        WHEN C.decimals IS NOT NULL THEN ROUND(
             amount_heal * p.price,
              2
            )
        ELSE NULL
    END AS amount_usd_heal,
    ...
    FROM
        {{ this }}
        t0
        LEFT JOIN {{ ref('silver__contracts') }} C
        ON t0.token_address = C.address
        LEFT JOIN {{ ref('price__ez_prices_hourly') }}
        p
        ON t0.token_address = p.token_address
        AND DATE_TRUNC(
            'hour',
            block_timestamp
        ) = p.hour
    WHERE
        CONCAT(
            t0.block_number,
            '-',
            t0.platform,
            '-',
            t0.version
        ) IN (
            SELECT
                CONCAT(
                    t1.block_number,
                    '-',
                    t1.platform,
                    '-',
                    t1.version
                )
            FROM
                {{ this }}
                t1
            WHERE
                t1.token_decimals IS NULL
                AND t1._inserted_timestamp < (
                    SELECT
                        MAX(
                            _inserted_timestamp
                        ) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'
                    FROM
                        {{ this }}
                )
                AND EXISTS (
                    SELECT
                        1
                    FROM
                        {{ ref('silver__contracts') }} C
                    WHERE
                        C._inserted_timestamp > DATEADD('DAY', -14, SYSDATE())
                        AND C.decimals IS NOT NULL
                        AND C.address = t1.token_address)
                    GROUP BY
                        1
                )
             OR ... (repeat WHERE logic for every column you want to heal)
),
    {% endif %}

    FINAL AS (
        SELECT
            *
        FROM
            complete_bridge_activity

{% if is_incremental() and var(
    'HEAL_MODEL'
) %}
UNION ALL
SELECT
    block_number,
    tx_hash,
    event_index,
    ...
    token_address,
    token_symbol,
    token_decimals,
    amount_unadj,
    amount_heal AS amount,
    amount_usd_heal AS amount_usd,
    ...
FROM
    heal_model
{% endif %}
)
...

Dynamic lookbacks on incremental runs

Summary:

When running an incremental DBT model, we often add logic that "looks back" at the last few hours of data from the source. This ensures we capture all new rows and 100% of the data we need from the source's prior run and can be helpful in handling late-arriving data. However, this lookback period is typically hardcoded and cumbersome to change without a PR or macro (see [Tech Specs for Lookback] Macros(https://github.com/FlipsideCrypto/fsc-evm/wiki/Macros:-Utility:-Lookbacks) for more detail), and in some cases engineers may need to reload records that are beyond this period (debugging, outages etc.). To resolve this, we use the LOOKBACK variable to allow new time intervals to be passed through on a one-off basis within dbt run commands.

Best Practices, Tips & Tricks:

  • Records will load incrementally to a model based on the period of time that the lookback window is set to. This may result in duplicate rows or other issues. Please test thoroughly prior to deploying to PROD.

Implementation Steps, Variables & Notes:

Use a variable to extend the incremental lookback period:

  • Variable: LOOKBACK
    • Default is a string representing the specified time interval e.g. '12 hours', '7 days' etc.

    • Example set up: SELECT MAX(_inserted_timestamp) - INTERVAL '{{ var("LOOKBACK", "4 hours") }}'

    • Usage: dbt run --vars '{"LOOKBACK":"36 hours"}' -m ...

Examples, References & Sources:

polygon.silver_bridge.complete_bridge_activity