DDL for Debezium JDBC Sink Connector (MSSQL, PostgreSQL, MySql) - anakinwon/KafkaConnect GitHub Wiki

<์›๋ณธ ์Šคํ‚ค๋งˆ(Source Schema)>

use oc;

DROP TABLE oc.dbo.customers ;
DROP TABLE oc.dbo.products ;
DROP TABLE oc.dbo.orders ;
DROP TABLE oc.dbo.order_items ;

CREATE TABLE dbo.customers (
	customer_id int NOT NULL PRIMARY KEY,
	email_address varchar(255) NOT NULL,
	full_name varchar(255) NOT NULL
);

CREATE TABLE dbo.products (
	product_id int NOT NULL PRIMARY KEY,
	product_name varchar(100) NULL,
	product_category varchar(200) NULL,
	unit_price numeric NULL
);

CREATE TABLE dbo.orders (
	order_id int NOT NULL PRIMARY KEY,
	order_datetime datetime NOT NULL,
	customer_id int NOT NULL,
	order_status varchar(10) NOT NULL,
	store_id int NOT NULL
) ;

CREATE TABLE dbo.order_items (
	order_id int NOT NULL,
	line_item_id int NOT NULL,
	product_id int NOT NULL,
	unit_price numeric(10, 2) NOT NULL,
	quantity int NOT NULL,
	primary key (order_id, line_item_id)
);

[์›๋ณธ๋ฐ์ดํ„ฐ ์ƒ์„ฑ ํ”„๋กœ์‹œ์ €]

CREATE procedure CONNECT_DML_TEST_01(  
    @max_id INT 
  , @repeat_cnt INT
  , @upd_mod INT
)
AS 
BEGIN
  DECLARE @customer_idx INT;
  DECLARE @product_idx INT;
  DECLARE @order_idx INT;
 
  DECLARE @iter_idx INT;
  
  SET @iter_idx = 1; 

  WHILE @iter_idx     <=  @repeat_cnt 
  BEGIN
    SET @customer_idx  = @max_id + @iter_idx;
    SET @order_idx     = @max_id + @iter_idx;
    SET @product_idx   = @max_id + @iter_idx;
    
    insert into customers values (@customer_idx, concat('testuser_', @customer_idx) , concat('testuser_', @customer_idx));
    insert into products values (@product_idx, concat('testproduct_', @product_idx) , concat('testcat_', @product_idx), 100* @iter_idx/@upd_mod);
    insert into orders values (@order_idx, getdate(), @customer_idx, 'delivered', 1);                   
    insert into order_items values (@order_idx, (@iter_idx%@upd_mod)+1,  (@iter_idx%@upd_mod)+1, 100* @iter_idx/@upd_mod, 1); 

   /*
    if ((@upd_mod > 0) and ((@iter_idx%@upd_mod) = 0)) 
    begin
       update customers set full_name = concat('updateduser_', @customer_idx) where customer_id = @customer_idx;
       update products set product_name = concat('updproduct_', @product_idx) where product_id  = @product_idx;
       update orders set  order_status = 'updated' where order_id = @order_idx;
       update order_items set quantity = 2 where order_id = @order_idx;
       
       delete from customers where customer_id = @customer_idx -1;
       delete from products where product_id = @product_idx - 1;
       delete from orders where order_id = @order_idx - 1;
       delete from order_items where order_id = @order_idx - 1;
 
    end;
    */

    SET @iter_idx = @iter_idx + 1;
  END;
END;

[ํ”„๋กœ์‹œ์ € ์‹คํ–‰ - 10๋งŒ ๊ฑด ์ƒ์„ฑ]

EXEC CONNECT_DML_TEST_01 1,10000,100 ;

<ํƒ€๊ฒŸ ์Šคํ‚ค๋งˆ(Sink Schema)>

[CREATE TABLE for MSSQL]

use oc_sink;

DROP TABLE oc_sink.dbo.customers_sink ;
DROP TABLE oc_sink.dbo.products_sink ;
DROP TABLE oc_sink.dbo.orders_sink ;
DROP TABLE oc_sink.dbo.order_items_sink ;

CREATE TABLE dbo.customers_sink (
	customer_id int NOT NULL PRIMARY KEY,
	email_address varchar(255) NOT NULL,
	full_name varchar(255) NOT NULL,
	cre_dtm DATETIME default getdate()
);

CREATE TABLE dbo.products_sink (
	product_id int NOT NULL PRIMARY KEY,
	product_name varchar(100) NULL,
	product_category varchar(200) NULL,
	unit_price numeric NULL,
	cre_dtm DATETIME default getdate()
);

CREATE TABLE dbo.orders_sink (
	order_id int NOT NULL PRIMARY KEY,
	order_datetime datetime NOT NULL,
	customer_id int NOT NULL,
	order_status varchar(10) NOT NULL,
	store_id int NOT NULL,
	cre_dtm DATETIME default getdate()
) ;

CREATE TABLE dbo.order_items_sink (
	order_id int NOT NULL,
	line_item_id int NOT NULL,
	product_id int NOT NULL,
	unit_price numeric(10, 2) NOT NULL,
	quantity int NOT NULL,
	cre_dtm DATETIME default getdate(),
	primary key (order_id, line_item_id)
);

select 'mssql-customer' as tname   , count(*) from oc.dbo.customers  UNION ALL
select 'mssql-products' as tname   , count(*) from oc.dbo.products  UNION ALL
select 'mssql-orders' as tname     , count(*) from oc.dbo.orders  UNION ALL
select 'mssql-order_items' as tname, count(*) from oc.dbo.order_items ;


select 'mssql-customer_sink' as tname   , count(*) from oc_sink.dbo.customers_sink  UNION ALL
select 'mssql-products_sink' as tname   , count(*) from oc_sink.dbo.products_sink  UNION ALL
select 'mssql-orders_sink' as tname     , count(*) from oc_sink.dbo.orders_sink  UNION ALL
select 'mssql-order_items_sink' as tname, count(*) from oc_sink.dbo.order_items_sink;


select * from customers_sink;
select * from products_sink;
select * from orders_sink;
select * from order_items_sink;

SELECT max(cre_dtm) - min(cre_dtm) from customers_sink;

select 'mssql-customer' as tname   , count(*) from oc.dbo.customers  UNION ALL
select 'mssql-products' as tname   , count(*) from oc.dbo.products  UNION ALL
select 'mssql-orders' as tname     , count(*) from oc.dbo.orders  UNION ALL
select 'mssql-order_items' as tname, count(*) from oc.dbo.order_items;

[CREATE TABLE for PostgreSQL]


DROP TABLE public.customers_sink ;
DROP TABLE public.products_sink ;
DROP TABLE public.orders_sink ;
DROP TABLE public.order_items_sink ;


CREATE TABLE public.customers_sink (
	customer_id int NOT NULL PRIMARY KEY,
	email_address varchar(255) NOT NULL,
	full_name varchar(255) NOT null,
	cre_dtm date default CURRENT_DATE
);

CREATE TABLE public.products_sink (
	product_id int NOT NULL PRIMARY KEY,
	product_name varchar(100) NULL,
	product_category varchar(200) NULL,
	unit_price numeric null,
	cre_dtm date default CURRENT_DATE
);

CREATE TABLE public.orders_sink (
	order_id int NOT NULL PRIMARY KEY,
	order_datetime timestamp NOT NULL,
	customer_id int NOT NULL,
	order_status varchar(10) NOT NULL,
	store_id int NOT null,
	cre_dtm date default CURRENT_DATE
) ;

CREATE TABLE public.order_items_sink (
	order_id int NOT NULL,
	line_item_id int NOT NULL,
	product_id int NOT NULL,
	unit_price numeric(10, 2) NOT NULL,
	quantity int NOT NULL,
	cre_dtm date default CURRENT_DATE,
	primary key (order_id, line_item_id)
);


select 'postgresql-customers_sink;  ' as tname, count(*) as cnt from customers_sink UNION ALL
select 'postgresql-products_sink;   ' as tname, count(*) as cnt from products_sink UNION ALL
select 'postgresql-orders_sink;     ' as tname, count(*) as cnt from orders_sink UNION ALL
select 'postgresql-order_items_sink;' as tname, count(*) as cnt from order_items_sink;

SELECT max(cre_dtm) - min(cre_dtm) from customers_sink;


select * from customers_sink;
select * from products_sink;
select * from orders_sink;
select * from order_items_sink;

[CREATE TABLE for MySql]

use oc_sink;

drop table if exists customers_sink;
drop table if exists products_sink;
drop table if exists orders_sink;
drop table if exists order_items_sink;

-- ์•„๋ž˜ Create Table ์Šคํฌ๋ฆฝํŠธ์ˆ˜ํ–‰.

CREATE TABLE customers_sink (
	customer_id int NOT NULL PRIMARY KEY,
	email_address varchar(255) NOT NULL,
	full_name varchar(255) NOT NULL,
	cre_dtm DATETIME default now()
) ENGINE=InnoDB; 

CREATE TABLE products_sink (
	product_id int NOT NULL PRIMARY KEY,
	product_name varchar(100) NULL,
	product_category varchar(200) NULL,
	unit_price numeric NULL,
	cre_dtm DATETIME default now()
) ENGINE=InnoDB;

CREATE TABLE orders_sink (
	order_id int NOT NULL PRIMARY KEY,
	order_datetime datetime NOT NULL,
	customer_id int NOT NULL,
	order_status varchar(10) NOT NULL,
	store_id int NOT NULL,
	cre_dtm DATETIME default now()
) ENGINE=InnoDB;

CREATE TABLE order_items_sink (
	order_id int NOT NULL,
	line_item_id int NOT NULL,
	product_id int NOT NULL,
	unit_price numeric(10, 2) NOT NULL,
	quantity int NOT NULL,
	cre_dtm DATETIME default now(),
	primary key (order_id, line_item_id)
) ENGINE=InnoDB;



select 'mysql-customer_sink' as tname   , count(*) from customers_sink  UNION ALL
select 'mysql-products_sink' as tname   , count(*) from products_sink  UNION ALL
select 'mysql-orders_sink' as tname     , count(*) from orders_sink  UNION ALL
select 'mysql-order_items_sink' as tname, count(*) from order_items_sink;


SELECT max(cre_dtm) - min(cre_dtm) from customers_sink;



select * from customers_sink;
select * from products_sink;
select * from orders_sink;
select * from order_items_sink;