How To Implement CDC Load for Business Satellite - OptimalBI/optimal-data-engine-mssql GitHub Wiki

ODE supports two types of data loads, Full load and Delta load. Delta load could be used for capturing the changes to the data set happened since the previous data load. While Full load could be executed any time, it makes sense to run Delta loads on the regular schedule to catch up with differential changes.

If raw data in the Data Vault is updated on regular basis, in simple cases the downstream ETL loads could be switched to partial loads. This is only applicable if business satellite uses the subset of the raw satellite data and business satellite ETL uses only one raw satellite for rules calculation. This type of load takes care of any inserts, updates and deletes happened since the last data load execution. Note that the logic for ETL which uses multiple tables as a source is more complex than that. Also we wouldn't recommend this approach for loading data into Links.

One source table could support both types of load by using load type input parameter of the source stored procedure.

Before you start, create access functions for the raw satellite which is a source for your business satellite, more details on it here. The following code is a sample of the ETL stored procedure for Change Data Capture (ODE CDC) load. Complex logic at the top of procedure finds the information about the last successful CDC load for both raw and business (source and target) satellites from ODE config. Don't change it, just provide your satellite names.

CREATE PROCEDURE [stage].[usp_Customer]
--ETL procedure is only parametrised if the switch between Delta and Full loads
(@Load_Type varchar(50))
AS
BEGIN
IF EXISTS
	(SELECT * FROM sys.objects WHERE object_id = object_id(N'[Stage].[Customer]') AND type IN (N'U'))
DROP TABLE [Stage].[Customer];
-------------------------------------------------------
--Don't change this part
-- Declare local variables.
DECLARE 
	@low_date datetime,
	@source_unique_name varchar(256),
	@satellite_name varchar(256),
	@source_version_key int,
	@source_hw_date datetimeoffset,
	@local_hw_datetime datetimeoffset

-- Obtain the default value for the low date from our defaults - this could be used later to replace any NULL values.
SELECT @low_date = CAST([ODE_Config].[dbo].[fn_get_default_value] ('LowDate','Global') AS datetime);

-------------------------------------------------------
--Provide your satellites names
-- Specify the source (raw vault) satellite
SET @source_unique_name = 'DBCUST'
-- Specify the satellite name, this should match the stage table name
SET @satellite_name = 'Customer'

-------------------------------------------------------

-- Determine the source_version_key, it will be required as part of the output payload for the staging table.
SELECT @source_version_key = sv.source_version_key
FROM [ODE_Config].[dbo].[dv_source_table] st
INNER JOIN [ODE_Config].[dbo].[dv_source_version] sv 
	ON sv.source_table_key = st.source_table_key 
	AND sv.is_current = 1
WHERE st.source_unique_name = @satellite_name;

-- Source High Water Mark
-- source_hw_date is the latest date record in dv_task_state for the rawsat source table.  
SELECT TOP 1 @source_hw_date = [high_water_date]
FROM [ODE_Vehicle_Vault].dbo.[dv_task_state]
WHERE source_unique_name = @source_unique_name
	AND [object_type] = 'sat'
ORDER BY [task_end_datetime] DESC


-- Local High Water Mark
-- local_hw_datetime is the latest source_high_water_date record in dv_task_state for the target sat table.  
SELECT TOP 1 @local_hw_datetime = source_high_water_date
FROM [ODE_Vehicle_Vault].dbo.[dv_task_state]
WHERE source_unique_name = @satellite_name
	AND [object_type] = 'sat'
ORDER BY [task_end_datetime] DESC

-- A quick check that if derived value is NULL, then we replace it with global low date to ensure that
-- we will extract a range of data.  
IF @local_hw_datetime IS NULL
	SET @local_hw_datetime = @low_date


IF @Load_Type = 'Delta' 
BEGIN 
-------------------------------------------------------
--Provide your partial load logic
WITH Cust AS (SELECT * FROM [ODE_Vault].[Access].[get_DBCUST_all](@local_hw_datetime, @source_hw_date))

SELECT
--Some maintenance fields for enabling CDC load
ROW_NUMBER() OVER (ORDER BY CUSTID) AS _stgCustomer_key -- Arbitrary surrogate key, not important for load, but it has to be here 
, SYSDATETIMEOFFSET() AS dv_stage_datetime 
, @source_version_key AS dv_source_version 
, dv_cdc_action 
, @source_hw_date AS dv_cdc_high_water_date 
, dv_cdc_start_date
--Actual table fields 
, [CUSTID] 
, [ADDR_FLG] 
, [CUST_TYP] 
, [NAME1] + [NAME2] AS FullName
INTO [stage].[Customer] 
FROM Cust 
END 

ELSE 
BEGIN 
--Provide your Full load logic 
WITH Cust AS (SELECT * FROM [ODE_Vault].[Access].[get_DBCUST_pit](@source_hw_date)) 

SELECT ROW_NUMBER() OVER (ORDER BY CUSTID) AS _stgCustomer_key 
, SYSDATETIMEOFFSET() AS dv_stage_datetime 
, @source_version_key AS dv_source_version 
, dv_cdc_action 
, @source_hw_date AS dv_cdc_high_water_date 
, dv_cdc_start_date 
, [CUSTID] 
, [ADDR_FLG] 
, [CUST_TYP] 
, [NAME1] + [NAME2] AS FullName
INTO [stage].[Customer] 
FROM Cust 

END 
END

Create hub and satellite as usual. However, set parameter StageLoadType to 'ODEcdc' and parameter StagePassLoadTypeToProc to 1.

Now this table could be scheduled in two different loads. Let's say we have created two schedules already:

USE [ODE_Config]
GO
EXECUTE [dv_scheduler].[dv_schedule_insert] 
   @schedule_name          = 'Full_Load'
  ,@schedule_description   = 'Runs all Loads which only need to be refreshed daily'
  ,@schedule_frequency     = 'Daily'
  ,@release_number         = 2017010901
GO
EXECUTE [dv_scheduler].[dv_schedule_insert] 
   @schedule_name          = 'Delta_Load'
  ,@schedule_description   = 'Runs all Loads which need to be refreshed hourly'
  ,@schedule_frequency     = 'Hourly'
  ,@release_number         = 2017010901
GO 

Add source table to both schedules:

EXEC	[dv_scheduler].[dv_schedule_source_table_insert]
@schedule_name = 'Full_load'
,@source_unique_name = 'Customer'
,@source_table_load_type = 'Full'
,@priority = 'Low'
,@queue = 'Agent001'
,@release_number = 2017010901
GO

EXEC	[dv_scheduler].[dv_schedule_source_table_insert]
@schedule_name = 'Delta_load'
,@source_unique_name = 'Customer'
,@source_table_load_type = 'Delta'
,@priority = 'Low'
,@queue = 'Agent001'
,@release_number = 2017010901
GO

Don't forget to execute Full load first before the first Delta, otherwise there will be no CDC start date and load won't be successful.

⚠️ **GitHub.com Fallback** ⚠️