End Product Sync - department-of-veterans-affairs/caseflow GitHub Wiki

Overview

The existing end product (EP) sync process has been replaced (but not depracated) with a new process that leverages claim data replicated from VBMS to identify out of sync EPs in Caseflow. The new process prioritizes EndProductEstablishments (EPEs) to be synced when it is cleared ("CLR") or canceled ("CAN") in VBMS but not in Caseflow.

Priority EPs are declared by the update_claim_status_trigger on the vbms_ext_claim table. This trigger script is created by add_pepsq_populate_trigger_to_vbms_ext_claim.rb, which finds the EPEs that have a status delta between their associated VbmsExtClaim record. Each EPE that has a delta will create a corresponding PriorityEndProductSyncQueue (PEPSQ) record.

From here, up to 100 PEPSQ records will be assigned to a unique batch in the BatchProcesses table. The PriorityEpSyncBatchProcessJob will then attempt to sync and process these EPEs. If the record fails to sync after being batched three separate times, it will be inserted into the CaseflowStuckRecords table where it is intended to be checked and synced manually.

If an undocumented error occurs that causes a batch to fail while being processed, the entire batch will be picked up by the BatchProcessRescueJob.

Sections:

  1. Old End Product Sync Process
  2. New End Product Sync Implementation
    1. Database Tables
    2. Implementation Logic
      1. 🔴 Adding Priority EPE Records to the PEPSQ Table 🔴
      2. Creating Batches & Processing PEPSQ Records
      3. Batch Reprocessing
    3. Record State
      1. Priority End Product Sync Queue
      2. Batch Process
    4. Metabase Views
    5. Kibana Dashbaord
    6. Relevant Files
      1. Models
      2. Jobs
      3. Scripts

Old End Product Sync Process

SyncReviewsJob kicks off every 2 minutes and attempts to sync 100 priority EPEs with their associated VBMS claim object one at a time.

sync_reviews_job.rb

The sync! method is called on the EPE after retrieving it from the database.

end_product_sync_job.rb

If the EPE is active, Caseflow calls VBMS to get the claim object. Unless the VBMS object's status is cancelled, any contentions associated with the claim will also be retrieved. The EPE's synced_status is updated to the VBMS object's status and last_synced_at is updated to the current date/time.

end_product_establishment.rb

New End Product Sync Implementation

Database Tables

PriorityEndProductSyncQueue

The priority_end_product_sync_queue table is intended to hold records in priority order that point back to the EPE table. These records are to be used as a queue of EPEs that need to be synced ahead of all others and organized in batched groups via the batch_processes table. The main record-holding table used for this sync implementation. When an EPE is determined to be out of sync, a new PriorityEndProductSyncQueue (PEPSQ) record is created with the associated end_product_establishment_id.

Key Data Point Description Constraints
End Product Establishment ID A link back to the end_product_establishment record to be synced Not null and ID must exist in the epe table
Batch ID A unique UUID for the batch the record is executed with Not null and ID must exist in the batch table
Created At The Date/Time that the record entered teh queue
Status A status to indicate what state the record is in such as PROCESSING, PROCESSED, and ERROR Not null and defaults to "NOT_PROCESSED"
Error Messages An array of the error message(s) containing the Batch ID and specific error if a failure occurs Defaults to an empty array
Last Batched At Data and Time the record was last assigned a batch

BatchProcesses

The batch_processes table houses execution records of batches ran in Caseflow for different sync processes. It acts as a base class for other models and is mainly used for inheritance. Contains the find_records, create_batch!, and process_batch! methods that are to be overridden by the inherited class (see priority_ep_sync_batch_process.rb). This class is also responsible for handling errors for records within the batch via the error_out_record! method. Every table intending to use this table must contain batch_id and last_batched_at columns.

Key Data Point Description Constraints
Batch ID (PK) A unique UUID generated to identify the batch and all records associated with the batch Not null, must be UUID
State The state of the batch such as PROCESSING and PROCESSED Not null and defaults to "PRE_PROCESSING"
Batch Type A field to be able to identify what process the batch was ran for such as Priority Sync Not null and must be process that exists
Started At A Timestamp of when the batch began processing
Ended At A Timestamp of when the batch finished processing
Records Attempted The number of records in the batch attempting to be processed Defaults to 0
Record Completed The number of records in the batch that completed processing successfully Defaults to 0
Records Failed The number of records in the batch that failed processing Defaults to 0

CaseflowStuckRecords

The caseflow_stuck_records table is a polymorphic table intended to hold records that have attempted to be executed by multiple batches for a particular process such as Priority Ep Sync. Priority Ep Sync records are currently given 3 attempts to sync (Constant: ERROR_LIMIT). Upon erroring out for the third time, the declare_record_stuck! method is called, updating the record's status and creating an associated record within the caseflow_stuck_records table. This table is meant to aide support in reporting and remediating stuck executions.

Key Data Point Description Constraint
Stuck Record ID The primary key of the record that is stuck Not null and must exist in the related process table
Stuck Record Type The type of process/where the record came from Not null and must be an existing process table
Error Messages An array of the error message(s) containing the Batch ID and specific error if a failure occurs Defaults to an empty array
Determined Stuck At The date/time at which the record was determined to be stuck Not null
Remediated Boolean -- Reflects if the stuck record has been reviewed and fixed Not null, defaults to false
Remediation Notes Brief description of the encountered issue and remediation strategy
Updated At The time an update occurred on the record

🔴 Adding Priority EPE Records to the PEPSQ Table 🔴

add_pepsq_populate_trigger_to_vbms_ext_claim.rb

Screenshot 2023-09-28 at 10 14 38 AM

Logic

  1. The 🔴 trigger script 🔴 is ran when a vbms_ext_claim record is 🔴 INSERTED or UPDATED 🔴
  2. A PriorityEndProductSyncQueue record will be created for the correlated EPE if the following criteria is met:
    1. A vbms_ext_claim record is inserted or updated
    2. Does the record have an EP_CODE that begins with 04, 03, 93, or 68?
    3. Does the record also have a LEVEL_STATUS_CODE of "CLR" or "CAN"?
      1. Cast the record's CLAIM_ID as a string to compare it to the EPE's reference_id column
    4. Find the correlated EPE's id:
      1. Search the EPE table for a record whose reference_id matches the vbms_ext_claim's CLAIM_ID AND check that the EPE's synced_status is either NULL or doesn't match the vbms_ext_claim's LEVEL_STATUS_CODE
    5. Check the PriorityEndProductSyncQueue table to see if there is already a record for the EPE:
      1. Does the EPE's id already exist in the end_product_establishment_id column?
    6. If the EPE's id wasn't found in that column, create a new PriorityEndProductSyncQueue record containing end_product_establishment_id that matches the EPE's id

Creating Batches & Processing PEPSQ Records

priority_ep_sync_batch_process_job.rb

Screenshot 2023-08-09 at 11 46 35 AM

Logic

  1. The PriorityEpSyncBatchProcessJob kicks off
    1. job_expected_end_time is set to 1 hour in the future (Constant: JOB_DURATION)
    2. Starts looping
      1. Breaks out of the job if any of these conditions are true:
        1. Has the job been running for an hour or longer?
        2. Did the last iteration not find any PEPSQ records available to batch?
        3. Was a StandardError raised the last iteration?
      2. Begins with a transaction block that ensures PEPSQ records are only assigned to a single batch
        1. PriorityEpSyncBatchProcess.find_records is called
        2. Records are pulled from PriorityEndProductSyncQueue table and filtered by:
          1. The batch_id of the PEPSQ records must be null OR the associated batch must have a state of COMPLETED
          2. The PEPSQ records do not have a status of SYNCED or STUCK
          3. The PEPSQ records last_batched_at column must be either null OR more than 3 hours old (Constant: ERROR_DELAY)
          4. The number of records will not exceed 100 (Constant: BATCH_LIMIT)
      3. If there are unprocessed PEPSQ records found:
        1. Create a new record in the batch_processes table
        2. Update the new batch_process record's columns:
          1. state: PROCESSING
          2. started_at: the current date/time
          3. type: "PriorityEpSync"
          4. records_attempted: the number of records in the batch (<= 100)
        3. The PEPSQ records collected with the find_records method are updated with the new batch's batch_id
        4. process_batch! is called on the batch
          1. Loop through each PEPSQ record in the batch and load the connected EPE record into memory
            1. Call the sync! method on the EPE record to start syncing the EP and its decision issues
            2. Update the PEPSQ record to reflect sync success or failure
              1. If the sync is successful
                1. Update the PEPSQ record's status to SYNCED
              2. If the sync fails
                1. Update the status of the PEPSQ record to ERROR
                2. Add the error message and batch ID to the PEPSQ record's error_messages column
                3. Check how many error messages each PEPSQ record in the batch has
                  1. If the record has 3 error messages in its array
                    1. Add a record to the caseflow_stuck_records table with determined_stuck_at reflecting the current time/date and all error messages & batch IDs added to the error_messages column
                    2. Send an alert to the appropriate slack channels to support the remediation of the new stuck record
        5. Check that the full batch has been processed
          1. If each record in the batch has a SYNCED or ERROR status AND no longer has a PROCESSING status
            1. Update the batch record's state to COMPLETED
            2. Update the batch record's records_completed column with how many records were successfully processed and records_failed with how many failed to sync
        6. Delete each PEPSQ record containing status: "SYNCED" to ensure PopulateEndProductSyncQueueJob is performant
        7. 5 second sleep (Constant: SLEEP_DURATION)
        8. Loop back through starting at step 2 until 1 hour mark is hit, no unprocessed PEPSQ records are found, or a StandardError is raised
      4. If there aren't any unprocessed PEPSQ records found:
        1. Set variable should_stop_job to true
        2. Log message stating no records were found with job ID and current time
        3. 5 second sleep (Constant: SLEEP_DURATION)
        4. Loop back through step 2
        5. should_stop_job returns true, break out of job
      5. If a StandardError is raised:
        1. Log message with the error that occurred, job ID, and current time
        2. Capture the exception with the error that occurred, job ID, and current time
        3. Set variable should_stop_job to true
        4. 5 second sleep (Constant: SLEEP_DURATION)
        5. Loop back through starting at step 2
        6. should_stop_job returns true, break out of job

Batch Reprocessing

batch_process_rescue_job.rb

  1. BatchProcessRescueJob kicks off
    1. Gathers any Batch Process records that need to be reprocessed
    2. If there are batches found:
      1. Loops through all batches found
        1. process_batch! is called on the batch (see PriorityEpSyncBatchProcess)
        2. If a standard error is raised:
          1. Log message with the error, job ID, and current date/time
          2. Capture the exception with the error, job ID, and current date/time
          3. Move on to next batch record
    3. If there aren't any batches found:
      1. Log message stating no unfinished batches found with the current date/time
    4. Job ends

Priority End Product Sync Queue Record State

Screenshot 2023-09-12 at 8 45 08 AM Screenshot 2023-07-17 at 1 22 09 PM

Batch Process Record State

Screenshot 2023-07-17 at 1 34 12 PM Screenshot 2023-07-17 at 1 34 30 PM

Metabase Views:

Priority EP Sync Metrics Dashboard - (Tier 4/EP Sync/Priority EP Sync Metrics Dashboard)

  • Priority EP Sync Batch Process Duration in Seconds
SELECT to_char(bp.started_at, 'YYYY-MM-DD HH24:MI:SS') as started_at, to_char(bp.ended_at, 'YYYY-MM-DD HH24:MI:SS') as ended_at, EXTRACT(EPOCH FROM (ended_at - started_at)) AS duration_in_seconds, *
FROM batch_processes bp
WHERE batch_type = 'PriorityEpSyncBatchProcess'
ORDER BY duration_in_seconds desc
  • All Time Average Priority EP Sync Batch Process Duration in Seconds
SELECT AVG(EXTRACT(EPOCH FROM (bp.ended_at - bp.started_at))) AS avg_duration_in_seconds
FROM batch_processes bp
WHERE batch_type = 'PriorityEpSyncBatchProcess'
  • Last Week's Average Priority EP Sync Batch Process Duration in Seconds
SELECT AVG(EXTRACT(EPOCH FROM (bp.ended_at - bp.started_at))) AS last_week_avg_duration_in_seconds
FROM batch_processes bp
WHERE batch_type = 'PriorityEpSyncBatchProcess'
AND created_at >= CURRENT_DATE - 7
  • Two Weeks Ago's Average Priority EP Sync Batch Process Duration in Seconds
SELECT AVG(EXTRACT(EPOCH FROM (bp.ended_at - bp.started_at))) AS two_weeks_ago_avg_duration_in_seconds
FROM batch_processes bp
WHERE created_at >= CURRENT_DATE - 14
AND created_at <= CURRENT_DATE - 7
AND batch_type = 'PriorityEpSyncBatchProcess'
  • Last 24 Hour's Average Priority EP Sync Batch Process Duration in Seconds
SELECT AVG(EXTRACT(EPOCH FROM(bp.ended_at - bp.started_at))) AS one_day_avg_duration_in_seconds
FROM batch_processes bp
WHERE batch_type = 'PriorityEpSyncBatchProcess'
AND records_attempted = records_completed
AND created_at >+ CURRENT_DATE - 1
  • Total Number of PEPSQ Caseflow Stuck Records needing to be remediated
SELECT COUNT(*) as records_needing_remediation_count
FROM caseflow_stuck_records csr
WHERE remediated = false
AND stuck_record_type = 'PriorityEndProductSyncQueue'
  • Last 24 hour's Number of PEPSQ Caseflow Stuck Records needing remediation
SELECT COUNT(*)
FROM caseflow_stuck_records csr
WHERE stuck_record_type = 'PriorityEndProductSyncQueue'
AND remediated = false
AND determined_stuck_at >= CURRENT_DATE - 1
  • Priority End Product Sync Queue Audit EPE Search
SELECT *
FROM caseflow_audit.priority_end_product_sync_queue_audit
WHERE end_product_establishment_id = CAST({{end_product_establishment_id}} AS bigint)
  • Priority EP Sync Batch Processes containing Failed Records
SELECT bp.batch_id, bp.created_at, bp.records_attempted, bp.records_failed
FROM batch_processes bp
WHERE bp.batch_type = 'PriorityEpSyncBatchProcess'
AND records_failed > 0
  • Priority End Product Sync Queue Records that Failed to Sync Batch ID Search
SELECT * FROM priority_end_product_sync_queue
WHERE batch_id = {{batch_id}}::uuid
AND array_length(error_messages, 1) > 0

Tier 4 Metrics Dashboard - (Tier 4/EP Sync/Tier 4 Metrics Dashboard)

  • Total Number of Caseflow Stuck Records needing remediation by Stuck Record Type
SELECT csr.stuck_record_type, COUNT(*) as records_needing_remediation_count
FROM caseflow_stuck_records csr
WHERE remediated = false
GROUP BY csr.stuck_record_type
  • Priority End Product Sync Queue Audit EPE Search
SELECT *
FROM caseflow_audit.priority_end_product_sync_queue_audit
WHERE end_product_establishment_id = CAST({{end_product_establishment_id}} AS bigint)
  • Last 24 Hour's Number of Caseflow Stuck Records needing Remediation by Stuck Record Type
SELECT csr.stuck_record_type, COUNT(*)
FROM caseflow_stuck_records csr
WHERE remediated = false
AND determined_stuck_at >= CURRENT_DATE - 1
GROUP BY csr.stuck_record_type
  • Last Week's Number of Caseflow Stuck Records needing remediation by Stuck Record Type
SELECT csr.stuck_record_type, COUNT(*)
FROM caseflow_stuck_records csr
WHERE remediated = false
AND determined_stuck_at >= CURRENT_DATE - 7
GROUP BY csr.stuck_record_type
  • Two Weeks Ago's Number of Caseflow Stuck Records needing remediation by Stuck Record Type
SELECT csr.stuck_record_type, COUNT(*)
FROM caseflow_stuck_records csr
WHERE remediated = false
AND determined_stuck_at >= CURRENT_DATE - 14
AND determined_stuck_at <= CURRENT_DATE - 7
GROUP BY csr.stuck_record_type
  • Total Number of Remediated Caseflow Stuck Records by Stuck Record Type
SELECT csr.stuck_record_type, COUNT(*)
FROM caseflow_stuck_records csr
WHERE remediated = true
GROUP BY csr.stuck_record_type

Kibana Dashboard - (EP Sync Dashboard)

Currently, the new EP Sync Process has 4 metrics being tracked in the Kibana production-global tenant. These include:

  • PriorityEpSyncBatchProcessJob Count
    • Single metric on the number of PriorityEpSyncBatchProcessJob runs
  • BatchProcessRescueJob Count
    • Single metric on the number of BatchProcessRescueJob runs

The default time span is "the last 3 weeks".

Relevant Files:

Models

Jobs

Scripts

Creating and removing VbmsExtClaim table locally

Audit table used to track insertions, updates, and deletions on the PriorityEndProductSyncQueue table

Removing seeds used to locally test this epic

Trigger to populate the PriorityEndProductSyncQueue upon vbms_ext_claim record insertions and updates