Kafka Setup: Cleint server model over a network - mahuadasgupta/kafkasetup GitHub Wiki
Setup a client-server communication model in which kafka server and topic will be in kafka server,kafka producer will will publish message from a kafka client . on the same clinet the consumer will subscribe and update the mysql db.
kafka-server : ip-172-31-90-244
kafkaclient : ip-172-31-87-211
confluent home:/storage/confluent-4.1.0
mysql installed in cleint machine
so step 1) we need to install java8 in both client and server
step2) we need to download confluent in both . both te server has goos space on /storage so we will download it there
wget http://packages.confluent.io/archive/4.1/confluent-oss-4.1.0-2.11.tar.gz
3) extract it tar -zxvf confluent-oss-4.1.0-2.11.tar.gz
lets do server now.
kafkaserver setup on
sudo bash
kafka-server :ec2-user@ip-172-31-90-244
1) cd /storage/confluent-4.1.0/bin
ps -ef|grep zookeeper.properties ( incase zookeper is already running we have to kill the pervious files )
kill -9 3831
2) ./zookeeper-server-start ../etc/kafka/zookeeper.properties
ps -ef|grep server.properties
kill -9 3977
3)./kafka-server-start ../etc/kafka/server.properties
4) create topic
sudo bash
cd /storage/confluent-4.1.0/bin
./kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic poctopic1
5) start the producer only here:
cd /storage/confluent-4.1.0/bin
./kafka-console-producer --broker-list localhost:9092 --topic poctopic1
kafka client setup:
kafkaclient : root@ip-172-31-87-211
1) just run consumer here.
cd /storage/confluent-4.1.0/bin
2)change localhost to kafkamainserver ip or hostname
export JAVA_HOME=/usr
./kafka-console-consumer --bootstrap-server ip-172-31-90-244:9092 --topic poctopic1 --from-beginning
start mysql
service mysqld restart
mysql -u root
( confluent-4.1.0//share/java/kafka-connect-jdbc/(location) - store the mysql-connector-java.jar )
kafkaconnect-sink on aws
1) export CLASSPATH=$CLASSPATH:.:/storage/confluent-4.1.0//share/java/kafka-connect-jdbc/mysql-connector-java.jar:/storage/confluent-4.1.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0.jar
2) change zookeeper in kafkastore.connection.url=ip-172-31-90-244:2181 in /storage/confluent-4.1.0/etc/schema-registry/schema-registry.properties
./schema-registry-start -daemon ../etc/schema-registry/schema-registry.properties
3) open /storage/confluent-4.1.0//etc/schema-registry/connect-avro-standalone.properties and change rest port to
rest.port=18083 ,
4) create a file at /storage/confluent-4.1.0//etc/kafka-connect-jdbc/sink-mysql-jdbc.properties with bel content
./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties ../etc/kafka-connect-jdbc/sink-mysql-jdbc.properties
if in new terminal - open a putty terminal
mustexecute : export CLASSPATH=$CLASSPATH:.:/storage/confluent-4.1.0//share/java/kafka-connect-jdbc/mysql-connector-java.jar:/storage/confluent-4.1.0//share/java/kafka-connect-jdbc/kafka-connect-jdbc-4.1.0.jar
cd /storage/confluent-4.1.0/bin
./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties ../etc/kafka-connect-jdbc/sink-mysql-jdbc.properties
( this ensures that consumer is runnign and making connectivity to mysql on database pocTest. it must hang.)
step 6)open a putty terminal ,
cd /storage/confluent-4.1.0/bin
./kafka-avro-console-producer \--broker-list ip-172-31-90-244:9092 --topic person \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price","type": "float"}]}'
./kafka-avro-console-producer \--broker-list ip-172-31-90-244:9092 --topic premises \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id1","type":"int"},{"name":"product1", "type": "string"}, {"name":"quantity1", "type": "int"}, {"name":"price1","type": "float"}]}'
this will hang to get an input
{"id": 999, "product": "foo", "quantity": 100, "price": 50} ( press enter)
{"id1": 999, "product1": "foo", "quantity1": 100, "price1": 100}
runn mysql and check : all records should be updated.
mysql -u root
use pocTest
select * from persons
select * from Premises.