Kafka & MirrorMaker2 - CloudCommandos/JohnChan GitHub Wiki

Introduction

Kafka is a high throughput event streaming platform. This guide lists down the steps to set up two Kafka clusters to test the message replication of MirrorMaker2. This setup only has one ZooKeeper instance per cluster and is therefore not suitable for production environments.

Assumptions

  • The 2 Kafka clusters needs to perform active-active replication for events (topics)
  • The 2 Kafka clusters needs to perform active-passive replication for commands (queues)
  • You have 2 Ubuntu nodes with Docker and Docker-Compose installed on both nodes.
  • You have 1 Ubuntu node to act as the producer to test Kafka
  • You have 2 Ubuntu nodes to act as the consumer to test Kafka
  • Node1 kafka_a 10.0.1.41 will host one Kafka cluster of 1 ZooKeeper and 3 Kafka instances.
  • Node2 kafka_b 10.0.1.42 will host one Kafka cluster of 1 ZooKeeper and 3 Kafka instances.
  • Node3 test_producer 10.0.1.43.
  • Node4 test_consumer1 10.0.1.44.
  • Node5 test_consumer2 10.0.1.45.
  • All test cases in this guide involves producers sending message to kafka_a. The cases for sending messages to kafka_b are assumed to have similar results.

Install Docker and Docker-Compose

Install Docker on kafka_a and kafka_b nodes

sudo apt-get update
sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg-agent \
    software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo add-apt-repository \
   "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
   $(lsb_release -cs) \
   stable"
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io
sudo docker run hello-world

Install Docker-Compose on kafka_a and kafka_b nodes

sudo curl -L "https://github.com/docker/compose/releases/download/1.26.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version

Setup Kafka Clusters

Create directories on kafka_a and kafka_b nodes

mkdir -p /opt/docker/kafka
mkdir -p /opt/docker/kafka/data/kafka1_data
mkdir -p /opt/docker/kafka/data/kafka2_data
mkdir -p /opt/docker/kafka/data/kafka3_data
chown 1001:root /opt/docker/kafka/data/*

Create file docker-compose.yml on kafka_a and kafka_b nodes. Remember to change 10.0.1.41 to 10.0.1.42 for kafka_b.

nano /opt/docker/kafka/docker-compose.yml
version: '2'

services:
  zookeeper:
    image: 'docker.io/bitnami/zookeeper:3-debian-10'
    ports:
     - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka1:
    image: 'docker.io/bitnami/kafka:2.5.0'
    ports:
      - '32780:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.1.41:32780
      - KAFKA_HEAP_OPTS=-Xmx1024m -Xms1024m
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
      - KAFKA_CFG_STATUS_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_CONFIG_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_OFFSET_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_NUM_PARTITIONS=2
    volumes:
      - './data/kafka1_data:/bitnami'
    depends_on:
      - zookeeper

  kafka2:
    image: 'docker.io/bitnami/kafka:2.5.0'
    ports:
      - '32781:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.1.41:32781
      - KAFKA_HEAP_OPTS=-Xmx1024m -Xms1024m
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
      - KAFKA_CFG_STATUS_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_CONFIG_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_OFFSET_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_NUM_PARTITIONS=2
    volumes:
      - './data/kafka2_data:/bitnami'
    depends_on:
      - zookeeper

  kafka3:
    image: 'docker.io/bitnami/kafka:2.5.0'
    ports:
      - '32782:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.1.41:32782
      - KAFKA_HEAP_OPTS=-Xmx1024m -Xms1024m
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
      - KAFKA_CFG_STATUS_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_CONFIG_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_OFFSET_STORAGE_REPLICATION_FACTOR=2
      - KAFKA_CFG_NUM_PARTITIONS=2
    volumes:
      - './data/kafka3_data:/bitnami'
    depends_on:
      - zookeeper

Start the containers

cd /opt/docker/kafka
docker-compose up -d

Test the Kafka Instances

Install Java dependency on test_producer, test_consumer1, and test_consumer2

apt install openjdk-8-jre-headless

Download Kafka binaries into test_producer, test_consumer1, and test_consumer2

cd ~
wget https://apachemirror.sg.wuchna.com/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -zxvf kafka_2.13-2.5.0.tgz

Event (Topic)

On kafka_a, run docker ps and get the exposed ports that are mapped to the Kafka containers. For this example, ports 32780, 32781, and 32782 are used. On any Kafka binary node, create a topic on kafka_a

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.41:2181 --replication-factor 2 --partitions 3 --topic test-event

On test_consumer1 node, start a consumer subscription to kafka_a at port 32780

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32780 --topic test-event

On test_consumer2 node, start a consumer subscription to kafka_a at port 32781

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32781 --topic test-event

On test_producer node, start a producer connection to kafka_a and test with any message. The subscribed consumers should receive the message.

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 10.0.1.41:32780 --topic test-event

You can repeat this test for any combination of ports, then for kafka_b as well.

Command (Queue)

Run docker ps to get the exposed ports that are mapped to the Kafka containers. For this example, ports 32780, 32781, and 32782 are used. On any Kafka binary node, create a topic on kafka_a

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.41:2181 --replication-factor 2 --partitions 3 --topic test-cmd

On test_consumer1 node, start a consumer subscription to kafka_a at port 32780 with group id consumer-grp1

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32780 --topic test-cmd --group consumer-grp1

On test_consumer2 node, start a consumer subscription to kafka_a at port 32781 with group id consumer-grp1

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32781 --topic test-cmd --group consumer-grp1

On test_producer node, start a producer connection to kafka_a and test with any message. Only one subscribed consumer should receive each message.

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 10.0.1.41:32780 --topic test-cmd

You can repeat this test for any combination of ports, then for kafka_b as well.

Other helpful commands

Get Topic Offset Information for Cluster

~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.41:32780 --topic test-event

List All Topics of a Cluster

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --list --zookeeper 10.0.1.41:2181

Get Topic Information for Cluster

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --describe --zookeeper 10.0.1.41:2181 --topic test-event

Get Group Offset Information for Cluster

~/kafka_2.13-2.5.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.0.1.41:32780 --describe --group consumer-grp1


Set up Kafka MirrorMaker2

For this guide we will be setting up active-passive replication from kafka_a cluster to kafka_b. Consequently, Kafka MirrorMaker2 containers will reside in kafka_b (the passive site).

On kafka_b node, build a custom Kafka MirrorMaker2 container image. Create a new directory

mkdir -p /opt/docker/kafka-mirrormaker2

Create Dockerfile

nano /opt/docker/kafka-mirrormaker2/Dockerfile
FROM docker.io/bitnami/kafka:2.5.0

# Default MM2 config file path
ENV MM2_CONFIG_PATH /opt/bitnami/kafka/config/mm2.properties

COPY run.sh /run.sh

ENTRYPOINT ["/bin/bash", "-c"]
CMD ["/run.sh"]

Create script run.sh

nano /opt/docker/kafka-mirrormaker2/run.sh
#!/bin/bash
/opt/bitnami/kafka/bin/connect-mirror-maker.sh $(echo "${MM2_CONFIG_PATH}")

Configure script ownership and permission

chown 1001:root /opt/docker/kafka-mirrormaker2/run.sh
chmod 770 /opt/docker/kafka-mirrormaker2/run.sh

Build the image

cd /opt/docker/kafka-mirrormaker2
docker build -t custom-kafka-mm2 .

Create your MirrorMaker2 config file

cd /opt/docker/kafka-mirrormaker2
nano mm2.properties
# Kafka datacenters, the cluster names will determine the topic prefixes for topics that are replicated
clusters = c1, c2
c1.bootstrap.servers = 10.0.1.41:32780, 10.0.1.41:32781, 10.0.1.41:32782
c2.bootstrap.servers = 10.0.1.42:32780, 10.0.1.42:32781, 10.0.1.42:32782

# Cluster configurations
c1.config.storage.replication.factor = 2
c2.config.storage.replication.factor = 2
c1.offset.storage.replication.factor = 2
c2.offset.storage.replication.factor = 2
c1.status.storage.replication.factor = 2
c2.status.storage.replication.factor = 2
c1->target.enabled = true
c2->source.enabled = true

# Mirror maker configurations
offset-syncs.topic.replication.factor = 2
heartbeats.topic.replication.factor = 2
checkpoints.topic.replication.factor = 2
topics = .*
groups = .*
tasks.max = 1
replication.factor = 2
refresh.topics.enabled = true
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 30
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
groups.blacklist = console-consumer-.*, connect-.*, __.*

# Enable heartbeats and checkpoints
c1->c2.emit.heartbeats.enabled = true
c1->c2.emit.checkpoints.enabled = true

Start the container

cd /opt/docker/kafka-mirrormaker2
docker-compose up -d

Test Kafka MirrorMaker2 Setup

Install Java dependency on test_producer, test_consumer1, and test_consumer2 if you have not done so

apt install openjdk-8-jre-headless

Download Kafka binaries into test_producer, test_consumer1, and test_consumer2 if you have not done so

cd ~
wget https://apachemirror.sg.wuchna.com/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -zxvf kafka_2.13-2.5.0.tgz

Active-Active Event (Topic)

Run docker ps to get the exposed ports that are mapped to the Kafka containers. For this example, ports 32780, 32781, and 32782 are used. On any Kafka binary node, create a topic on kafka_a

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.41:2181 --replication-factor 2 --partitions 3 --topic test-mm-event

On test_consumer1 node, start a consumer subscription to kafka_a

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32781 --topic test-mm-event

On test_consumer2 node, start a consumer subscription to kafka_b

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.42:32781 --topic c1.test-mm-event

On test_producer node, start a producer connection to kafka_a and test with any message. The subscribed consumers should receive the message on the replicated topic.

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 10.0.1.41:32780 --topic test-mm-event

Active-Passive Command (Queue)

Run docker ps to get the exposed ports that are mapped to the Kafka containers. For this example, ports 32780, 32781, and 32782 are used. On any Kafka binary node, create a topic on kafka_a

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.41:2181 --replication-factor 2 --partitions 3 --topic test-mm-cmd

On test_consumer1 node, start a consumer subscription to kafka_a with group id consumer-grp1

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32781 --topic test-mm-cmd --group consumer-grp1

On test_consumer2 node, start a consumer subscription to kafka_b with group id consumer-grp1

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.42:32781 --topic c1.test-mm-cmd --group consumer-grp1

On test_producer node, start a producer connection to kafka_a and test with any message. Both consumers receive the message, therefore this command (queue) message fails) (not a queue anymore because it's consumed more than once).

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 10.0.1.41:32780 --topic test-mm-cmd
  • Clusters are only aware of their own consumer groups. For an active-passive command (queue), our consumer needs to have a custom logic/controller to switch over to the passive cluster when the active cluster fails.
  • Also take note that the timestamps reported for both the original topic and the replicated topics are the same. This means that we can make use of the following command to get the last offset that comes before a specific timestamp (unit in milliseconds).
    ~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.41:32780 --topic test-mm-cmd --time 1594097084243
    ~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.42:32780 --topic c1.test-mm-cmd --time 1594097084243
    
    We can reset the offset of a topic for a specific consumer group based on our last known timestamp. The timestamp string needs to be in this format 'YYYY-MM-DDTHH:mm:SS.sss'. The obtained offset has a timestamp that is exclusive of your specified timestamp. You will need to unsubscribe all active consumers of this group for this topic before you can proceed. Note that the unit of the input to the date function is in seconds.
    ~/kafka_2.13-2.5.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.0.1.42:32781 --reset-offsets --topic c1.test-mm-cmd --group consumer-grp1 --execute --to-datetime $(date -d @1594097084.243 +'%Y-%m-%dT%H:%M:%S.%3N')
    
    Get a consumer to subscribe to this topic with the same group and it will receive all messages that came in at/after your specified timestamp.
    ~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.42:32781 --topic c1.test-mm-cmd --group consumer-grp1
    
    You can test that other consumer groups are not affected.

Offset Replication

On any Kafka binary node, check the offset information of the original topic and the replicated topic, they should be the same

~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.41:32780 --topic test-mm-event
~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.42:32780 --topic c1.test-mm-event

~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.41:32780 --topic test-mm-cmd
~/kafka_2.13-2.5.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.1.42:32780 --topic c1.test-mm-cmd

Take note that Kafka does not guarantee replicated offsets to be in sync.