KAFKA REST CDC - mahuadasgupta/kafkasetup GitHub Wiki

 

Referrence:https://docs.confluent.io/current/kafka-rest/docs/intro.html
		     su - 
[root@prathyusha ~]# $service mysqld restart
[root@prathyusha ~]# $service sshd restart

confluent kafka setup:( linux )
[root@prathyusha ~]# wget http://packages.confluent.io/archive/4.1/confluent-oss-4.1.0-2.11.tar.gz
[root@prathyusha ~]# tar -zxvf confluent-oss-4.1.0-2.11.tar.gz


from putty
1) start  zookeeper server:
$cd /opt/confluent-4.1.0/bin
$./zookeeper-server-start ../etc/kafka/zookeeper.properties
(if port 0.0.0.0/0.0.0.0:2181 is seen in output press ctrl-c and run below)
$./zookeeper-server-start  -daemon ../etc/kafka/zookeeper.properties
#run netstat command to ensure zookeeper running in 2181
$netstat -anp|grep  2181

tcp6       0      0 :::2181                 :::*                    LISTEN      15418/java

2)start kafka server:
$cd /opt/confluent-4.1.0/bin
$./kafka-server-start ../etc/kafka/server.properties
(if socket connections on 0.0.0.0:9092. is seen in output press ctrl-c and run below)
$./kafka-server-start -daemon ../etc/kafka/server.properties
run netstat command to ensure kafka running in 9092
$netstat -anp|grep  9092
tcp6       0      0 :::9092                 :::*                    LISTEN      15656/java

Create topic
$cd /opt/confluent-4.1.0/bin
$./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkarestpoc1

List Topics: ( incase if topic is laready created)

$curl "http://localhost:8082/topics"

start kafka rest :
cd /opt/confluent-4.1.0/bin
./kafka-rest-start ../etc/kafka-rest/kafka-rest.properties
(if its up and running press ctrl-c and run below)
./kafka-rest-start -daemon ../etc/kafka-rest/kafka-rest.properties
netstat -anp|grep  8082


non rest consumer :
cd /opt/confluent-4.1.0/bin
./kafka-console-consumer --bootstrap-server localhost:9092 --topic kafkarestpoc1 --from-beginning

Kafka rest producer

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json"  -H "Accept: application/vnd.kafka.v2+json"  --data 

'{"records":[{"value":{"foo":"bar"}}]}' "http://localhost:8082/topics/kafkarestpoc1" 
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json"  -H "Accept: application/vnd.kafka.v2+json"  --data 

'{"records":[{"value":{"pratz":"bunny"}}]}' "http://localhost:8082/topics/kafkarestpoc1"


kafka rest consumer 
# Create a consumer for JSON data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.

#creating the consumer skeleton:
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance", "format": "json", 

"auto.offset.reset": "earliest"}' http://localhost:8082/consumers/my_json_consumer  
#output :
#

{"instance_id":"my_consumer_instance","base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consu

mer_instance"}


#subscribe to the topic
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["kafkarestpoc1"]}' 

http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# No content in response

#read the data from the topic
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" 

http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
#output:
#[{"topic":"kafkarestpoc1","key":null,"value":{"foo":"bar"},"partition":0,"offset":0},{"topic":"kafkarestpoc1","key":null,"value":

{"pratz":"bunny"},"partition":0,"offset":1}]

$ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" 

http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
  # No content in response
_______________________________________________________________________________________________________________

____________________________________________________________________
CDC RESTpart :
login to  a terminal


start mysqld restart (ignore if already running)
sudo bash 
mysql -u root
use pocTest 
show tables;
select * from person;

login to  another terminal 
start schema registry :
cd /storage/confluent-4.1.0/bin
./schema-registry-start - daemon ../etc/schema-registry/schema-registry.properties
netstat -anp|grep 8081

creat a file at /opt/confluent-4.1.0//etc/kafka-connect-jdbc/source-mysql-jdbc.properties with bel content

name=REST-CDC-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/pocTest?user=root
table.whitelist=bunnyPersons
#query=select * from accounts
#mode=incrementing
mode=timestamp
timestamp.column.name=ts
incrementing.column.name=id
topic.prefix=cdc-poc-

(contiune in previous terminal if in daemon mode or else need to class path )
./connect-standalone  ../etc/schema-registry/connect-avro-standalone.properties  ../etc/kafka-connect-jdbc/source-mysql-

jdbc.properties

if in new terminal - open a putty terminal 
must execute : export CLASSPATH=$CLASSPATH:.:/opt/confluent-4.1.0//share/java/kafka-connect-jdbc/mysql-connector-java.jar:/opt/confluent-4.1.0//share/java/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0.jar

cd /storage/confluent-4.1.0/bin 
./connect-standalone  ../etc/schema-registry/connect-avro-standalone.properties  ../etc/kafka-connect-jdbc/source-mysql-

jdbc.properties
( this ensures that consumer is running and making connectivity to mysql on database pocTest. it must  hang.)

insert some data in bunnyPersons
insert into bunnyPersons(LastName,FirstName) values('testp','testq');

test with non rest consumer from a terminal 
cd /opt/confluent-4.1.0/bin
./kafka-console-consumer --bootstrap-server localhost:9092 --topic cdc-poc-bunnyPersons --from-beginning

test with rest consumer cdc rest consumer-

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_person_consumer", "format": "json", 

"auto.offset.reset": "earliest"}' http://localhost:8082/consumers/my_person_consumer  
#output :
{"instance_id":"my_person_consumer","base_uri":"http://localhost:8082/consumers/my_person_consumer/instances/my_pers

on_consumer"}

cdc-poc-bunnyPersons topic:
#subscribe to the topic
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["cdc-poc-bunnyPersons"]}' 

http://localhost:8082/consumers/my_person_consumer/instances/my_consumer_instance/subscription
# No content in response

#read the data from the topic
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" 

http://localhost:8082/consumers/my_person_consumer/instances/my_consumer_instance/records


Error:

{"error_code":50002,"message":"Kafka error: java.io.CharConversionException: Invalid UTF-32 character 0x2a010a72 (above 

0x0010ffff) at char #1, byte #7)"


kafka-rest-cdc-Avro(success)

curl -X POST  -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "my_consumer_instance", "format": "avro"}' 

http://localhost:8082/consumers/my_avro_consumer


$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["cdc-poc-bunnyPersons"]}

http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
 # No content in response

$ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" 

http://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records


⚠️ **GitHub.com Fallback** ⚠️