Kafka - cchantra/bigdata.github.io GitHub Wiki

Apache Kafka is an open-source stream-processing software platform developed by the Apache Software Foundation, written in Scala and Java. It provides highly valuable for enterprise infrastructures to process streaming data. Kafka is a scalable, fault-tolerant, publish-subscribe messaging system that enables you to build distributed applications.

Kafka use cases:

  1. Monitoring

  2. Messaging

  3. Database

  4. Log aggregation

  5. ETL

Elements:

Topic: streaming data

Producer: publish message

Consumer: consume message

Broker: message exchange center

There are various ways of partitioning topic.

Number of consumers is less than number of topic partitions then multiple partitions can be assigned to one of the consumer in the group

Number of consumers same as number of topic partitions, then partition and consumer mapping can be like below,

Number of consumers is higher than number of topic partitions, then partition and consumer mapping can be as seen below, Not effective, check Consumer 5

Source: https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions

To run it: start zookeeper, start kafka server

Installation

(full download page. https://kafka.apache.org/downloads)

The latest version is 4.0. I haven't tried it yet, though.

wget  https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz

tar xvf kafka_2.11-2.4.1.tgz 

mv   kafka_2.11-2.4.1  kafka

Go to kafka install directory

cd kafka

Edit conf/zookeeper.properties

vi config/zookeeper.properties 

Make change to dataDir

dataDir=./zookeeper

save file and quit.

Start zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Then, Start kafka on another console:

bin/kafka-server-start.sh config/server.properties

Note: If you get an error about "no such method" on zookeeper, it is likely that zookeeper library in kafka/lib folder is not in your classpath. You may have zookeeper (jar) library from previous installation (either hive/lib or hadoop/common/lib), check your classpath variable and make sure there is no zookeeper jar in your classpath.

Test installation

  1. create topic
bin/kafka-topics.sh   --bootstrap-server  localhost:9092 --replication-factor 1 --partitions 1  --create  --topic test


$Created topic "test"

To list all available topic

bin/kafka-topics.sh --list   --bootstrap-server  localhost:9092

test
  1. sending message from producer

at another terminal (for 2.7+)

bin/kafka-console-producer.sh  --bootstrap-server  localhost:9092 --topic test

>Hello

>World

(for 2.4-)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

--Messages are stored locally on your disk. You can learn about the path of it by checking the value of log.dirs in config/server.properties file. By default they are set to /tmp/kafka-logs/ Look at server.properties for log.dirs variable

--see the log.dirs contents: (Note: log location might be differed)

ls /tmp/kafka-logs/test-0/

00000000000000000000.index 00000000000000000000.timeindex leader-epoch-checkpoint

00000000000000000000.log 00000000000000000001.snapshot

---see content of 00000000000000000000.log


vi   /tmp/kafka-logs/test-0/00000000000000000000.log

--Kafka provides a utility that lets you examine each incoming message.

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /tmp/kafka-logs/test-0/00000000000000000000.log

Dumping /tmp/kafka-logs/test-0/00000000000000000000.log

Starting offset: 0

offset: 0 position: 0 CreateTime: 1528595323503 isvalid: true keysize: -1 valuesize: 5 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: Hello

offset: 1 position: 73 CreateTime: 1528595324799 isvalid: true keysize: -1 valuesize: 5 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: World
  1. consuming the message / consumer side.

Open another terminal.

cd  kafka

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  1. Consider python : Recipe alert system.

Note:

The length of Kafka topic name should not exceed 249.

install kafka python

pip3 install -U kafka-python

pip3 install lxml --user

pip3 install bs4 --user

you can download python notebook file:

wget  https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/kafka/test_kafka_prod_consumer_new2.ipynb

wget  https://raw.githubusercontent.com/cchantra/bigdata.github.io/master/kafka/test_kafka_producer_new2.ipynb

Open in jupyter notebook

Open jupyer notebook and open test_kafka_producer_new2.ipynb and test_kafka_producer_consumer_new2.ipynb on your webbrowser

Optional: you can try to download monitoring tool for kafka: e.g.

http://www.kafkatool.com/download.html

References:

https://gist.github.com/rmoff/fb033086b285655ffe7f9ff0582dedbf

https://www.conduktor.io/kafka/kafka-topics-cli-tutorial/

https://kafka.apache.org/quickstart