KAFKA KSQL mysql SINC - mahuadasgupta/kafkasetup GitHub Wiki

Use Case : 

  1) How to stream out a message of kafka topic  to another topic ASIS
  2) How to  stream out  a input  delimited formated message  from a topid to another topic as  JSON format
  3) How to  merge 2 topics data and send it to  a master topic
  4)  How to  merge 2 topics data and send it to a master topci and pass through it to   JDBC SINC


  Using KSql this can be done because ksql can read the kafka topic stream and can fire sql like joins on the same and yield out a new output to  a new stream which in  turn is connected to  a topic. 

Below is how to
from putty
1) start  zookeeper server:
$cd /opt/confluent-4.1.0/bin
$./zookeeper-server-start ../etc/kafka/zookeeper.properties
(if port 0.0.0.0/0.0.0.0:2181 is seen in output press ctrl-c and run below)
$./zookeeper-server-start  -daemon ../etc/kafka/zookeeper.properties
#run netstat command to ensure zookeeper running in 2181
$ netstat -anp|grep  2181

tcp6       0      0 :::2181                 :::*                    LISTEN      15418/java

2)start kafka server:
$cd /opt/confluent-4.1.0/bin
$./kafka-server-start ../etc/kafka/server.properties
(if socket connections on 0.0.0.0:9092. is seen in output press ctrl-c and run below)
$ ./kafka-server-start -daemon ../etc/kafka/server.properties
run netstat command to ensure kafka running in 9092
$netstat -anp|grep  9092
tcp6       0      0 :::9092                 :::*                    LISTEN      15656/java


3) start KSQL:
$cd /opt/confluent-4.1.0/bin

$./ksql-server-start  ../etc/ksql/ksql-server.properties

(if socket connections on 0.0.0.0:8081. is seen in output press ctrl-c and run below)
$ ./ksql-server-start  -daemon ../etc/ksql/ksql-server.properties

$netstat -anp|grep  8088


4) start Schema registrty
$ cd /opt/confluent-4.1.0/bin

$ ./schema-registry-start ../etc/schema-registry/schema-registry.properties 
incase of error 
ps -ef|rep schema-registry.properties
kill 9 26355
( daemon mode)
cd /storage/confluent-4.1.0/bin
./schema-registry-start  -daemon ../etc/schema-registry/schema-registry.properties
netstat -anp|grep  8081


5 ) 
Consolidating Messages: 

every time we open a new terminal, have to export CONFLUENT_HOME=/opt/confluent-4.1.0/

#in a terminal 
#this command will also create topic 

export CONFLUENT_HOME=/opt/confluent-4.1.0/
cd $CONFLUENT_HOME/bin
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=5000

in another terminal 
export CONFLUENT_HOME=/opt/confluent-4.1.0/
cd $CONFLUENT_HOME/bin
./ksql-datagen quickstart=users format=json topic=users maxInterval=5000

in another terminal: 
cd $CONFLUENT_HOME/bin 
./ksql


single topic to  master topic:(pageviews->PAGEVIEWMASTER) use case description

1) using ./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=5000 generate pageview data on topic pageviews
export CONFLUENT_HOME=/opt/confluent-4.1.0/
cd $CONFLUENT_HOME/bin
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=5000
2) create a  ksql stream/table called PAGEVIEWMASTER and consume a avro message format
export CONFLUENT_HOME=/opt/confluent-4.1.0/
cd $CONFLUENT_HOME/bin
./ksql
( create a ksql stream so that it can read a runtime topic where the data is flowing( pageviews)
CREATE STREAM pageviews (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='avro');
(create another ksql stream as the output of  above stream and send it to the topic ( topicname =streamname eg  PAGEVIEW_MASTER=pageviews_master)
CREATE STREAM pageviews_master AS SELECT * from pageviews;


3) using  the kfkaconnecct jdbc sink consume the avro messages   and put into mysql databases in the form of tabel name PAGEVIEWMASTER
export CONFLUENT_HOME=/opt/confluent-4.1.0/
cd $CONFLUENT_HOME/bin
export CLASSPATH=$CLASSPATH:.:/opt/confluent-4.1.0//share/java/kafka-connect-jdbc/mysql-connector-java.jar:/opt/confluent-4.1.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0.jar

edit /opt/confluent-4.1.0//etc/kafka-connect-jdbc/sink-mysql-jdbc.properties

 name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=PAGEVIEWS_MASTER
connection.url=jdbc:mysql://127.0.0.1:3306/pocTest?user=root
auto.create=true

run  ./connect-standalone  ../etc/schema-registry/connect-avro-standalone.properties  ../etc/kafka-connect-jdbc/sink-mysql-jdbc.properties



[2018-06-14 20:41:24,634] ERROR WorkerSinkTask{id=test-sink1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: PAGEVIEWS_MASTER
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:467)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-06-14 20:41:24,655] ERROR WorkerSinkTask{id=test-sink1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-06-14 20:41:24,657] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:100)


if it fails with magic byte error to prove  that its issue with deserializer reproduce and resolve the same through avro_consumer command. as mentioned below 
1) ./kafka-avro-console-consumer  --bootstrap-server localhost:9092 --topic PAGEVIEWS_MASTERNEW
{"VIEWTIME":{"long":1529035095648},"USERID":{"string":"User_3"},"PAGEID":{"string":"Page_21"}}
{"VIEWTIME":{"long":1529035097443},"USERID":{"string":"User_8"},"PAGEID":{"string":"Page_17"}}
{"VIEWTIME":{"long":1529035101352},"USERID":{"string":"User_1"},"PAGEID":{"string":"Page_25"}}
{"VIEWTIME":{"long":1529035104463},"USERID":{"string":"User_7"},"PAGEID":{"string":"Page_76"}}


⚠️ **GitHub.com Fallback** ⚠️