CDC MYSQL :KAFKA SOURCE JDBC Setup - mahuadasgupta/kafkasetup GitHub Wiki
Here ,the need of the hour is to do a POC around CDC concept. change data capture is a concept in which if a new record is inserted in a table then its taken as change in source . lets elaborate the same by below step.
1) Connect to a rdbms (mysql) ( mysql -u root)
2) Connect to a tablespace/databas(pocTest)( use pocTest)
3) Create a small table schema
CREATE TABLE Persons ( Id int NOT NULL AUTO_INCREMENT, LastName varchar(255) NOT NULL, FirstName varchar(255), ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL ,PRIMARY KEY (Id) );
3) configure the source properties file with below content /opt/confluent-4.1.0/etc/kafka-connect-jdbc/source-mysql-jdbc.properties
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/pocTest?user=root
table.whitelist=accounts,Persons
#query=select * from accounts
#mode=incrementing
mode=timestamp
timestamp.column.name=ts
incrementing.column.name=id
topic.prefix=test-
4) start schemregistry
./schema-registry-start -daemon ../etc/schema-registry/schema-registry.properties( if not allready running)
6) start the connector in new putty
cd /opt/confluent-4.1.0/bin
export CLASSPATH=$CLASSPATH:.:/opt/confluent-4.1.0//share/java/kafka-connect-jdbc/mysql-connector-java.jar:/opt/confluent-4.1.0//share/java/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0.jar
./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties ../etc/kafka-connect-jdbc/source-mysql-jdbc.properties
7) go to the terminal where you created the table in (step 1,3)
insert some data on table
insert into Persons(LastName,FirstName) values('dasgupta','mahua'); this all i will change later
8)open a putty terminal cd /opt/confluent-4.1.0/bin
./kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test-Persons --from-beginning
it should start showing below.
{"Id":1,"LastName":"mogili","FirstName":{"string":pratyusha"},"ts":1525288606000}
9) repeat 7 couple of times , watch the automnatic update on the step 8
{"Id":1,"LastName":"das","FirstName":{"string":"mahua"},"ts":1525288606000}
{"Id":2,"LastName":"dasgupta","FirstName":{"string":"khounish"},"ts":1525289068000}