Kafka Stretched Cluster with Photon OS - CloudCommandos/JohnChan GitHub Wiki

Introduction

For this guide we will set up a cross-node Kafka cluster on 3 Photon VMs, each running single-node Docker Swarm. On each VM we will have 1 zookeeper and 1 kafka broker containers. This is just for development purposes, therefore TLS setup is not configured.

Assumptions

  • Photon3.0 VM1 192.168.3.151
  • Photon3.0 VM2 192.168.3.152
  • Photon3.0 VM3 192.168.3.153

Prepare your VMs

tdnf update
# so that you can copy and paste into files
tdnf install nano
# for debugging zookeeper
tdnf install netcat

Enable Docker

Photon OS minimal already comes with Docker installed. Enable and start Docker

systemctl enable docker
systemctl start docker

Optional: If you are using your own private image registry, you will need to edit daemon.json accordingly. insecure-registries is for allowing docker to connect to your private registry. allow-nondistributable-artifacts is for allowing docker to push/pull nondistributable image layers (such as official Microsoft base images) to/from your private registry.

nano /etc/docker/daemon.json
{ "insecure-registries" : ["yourprivateregistry.com:5443"], "allow-nondistributable-artifacts": ["yourprivateregistry.com:5443"] }

Restart Docker Service to apply changes in daemon.json

systemctl restart docker

Create your project folder

mkdir -p /opt/dockerswarm/kafka
cd /opt/dockerswarm/kafka

Create the docker stack files on each corresponding VM. You might want to change the configurations or increase the number of brokers.

VM1:

nano docker-stack.yml
version: '3.8'

services:
  zookeeper:
    image: 'yourprivateregistry.com:5443/cp-zookeeper:5.5.1'
    ports:
      - '2181:2181'
      - '2888:2888'
      - '3888:3888'
    environment:
      - ZOOKEEPER_SERVER_ID=1
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SASL_ENABLED=FALSE
      - ZOOKEEPER_MAX_SESSION_TIMEOUT=1200000
      - ZOOKEEPER_SERVERS=0.0.0.0:2888:3888;192.168.3.152:2888:3888;192.168.3.153:2888:3888
      - KAFKA_OPTS=-Dzookeeper.4lw.commands.whitelist=*
    deploy:
      replicas: 1
      restart_policy:
        condition: any
    volumes:
      - 'zookeeper-data:/var/lib/zookeeper/data'
      - 'zookeeper-log:/var/lib/zookeeper/log'

  kafka-broker:
    image: 'yourprivateregistry.com:5443/cp-kafka:5.5.1'
    ports:
      - '9091:9091'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.151:9091
      - KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_NUM_PARTITIONS=3
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3
      - KAFKA_MIN_INSYNC_REPLICAS=2
      - KAFKA_DELETE_TOPIC_ENABLE=true
      - KAFKA_BROKER_RACK=rack1
    deploy:
      replicas: 1
      restart_policy:
        condition: any
    volumes:
      - 'kafka-broker-data:/var/lib/kafka/data'
    depends_on:
      - zookeeper

volumes:
  zookeeper-data:
  zookeeper-log:
  kafka-broker-data:

VM2:

nano docker-stack.yml
version: '3.8'

services:
  zookeeper:
    image: 'yourprivateregistry.com:5443/cp-zookeeper:5.5.1'
    ports:
      - '2181:2181'
      - '2888:2888'
      - '3888:3888'
    environment:
      - ZOOKEEPER_SERVER_ID=2
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SASL_ENABLED=FALSE
      - ZOOKEEPER_MAX_SESSION_TIMEOUT=1200000
      - ZOOKEEPER_SERVERS=192.168.3.151:2888:3888;0.0.0.0:2888:3888;192.168.3.153:2888:3888
      - KAFKA_OPTS=-Dzookeeper.4lw.commands.whitelist=*
    deploy:
      replicas: 1
      restart_policy:
        condition: any
    volumes:
      - 'zookeeper-data:/var/lib/zookeeper/data'
      - 'zookeeper-log:/var/lib/zookeeper/log'

  kafka-broker:
    image: 'yourprivateregistry.com:5443/cp-kafka:5.5.1'
    ports:
      - '9091:9091'
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.152:9091
      - KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_NUM_PARTITIONS=3
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3
      - KAFKA_MIN_INSYNC_REPLICAS=2
      - KAFKA_DELETE_TOPIC_ENABLE=true
      - KAFKA_BROKER_RACK=rack2
    deploy:
      replicas: 1
      restart_policy:
        condition: any
    volumes:
      - 'kafka-broker-data:/var/lib/kafka/data'
    depends_on:
      - zookeeper

volumes:
  zookeeper-data:
  zookeeper-log:
  kafka-broker-data:

VM3:

nano docker-stack.yml
version: '3.8'

services:
  zookeeper:
    image: 'yourprivateregistry.com:5443/cp-zookeeper:5.5.1'
    ports:
      - '2181:2181'
      - '2888:2888'
      - '3888:3888'
    environment:
      - ZOOKEEPER_SERVER_ID=3
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_INIT_LIMIT=5
      - ZOOKEEPER_SYNC_LIMIT=2
      - ZOOKEEPER_SASL_ENABLED=FALSE
      - ZOOKEEPER_MAX_SESSION_TIMEOUT=1200000
      - ZOOKEEPER_SERVERS=192.168.3.151:2888:3888;192.168.3.152:2888:3888;0.0.0.0:2888:3888
      - KAFKA_OPTS=-Dzookeeper.4lw.commands.whitelist=*
    deploy:
      replicas: 1
      restart_policy:
        condition: any
    volumes:
      - 'zookeeper-data:/var/lib/zookeeper/data'
      - 'zookeeper-log:/var/lib/zookeeper/log'

  kafka-broker:
    image: 'yourprivateregistry.com:5443/cp-kafka:5.5.1'
    ports:
      - '9091:9091'
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.153:9091
      - KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_NUM_PARTITIONS=3
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3
      - KAFKA_MIN_INSYNC_REPLICAS=2
      - KAFKA_DELETE_TOPIC_ENABLE=true
      - KAFKA_BROKER_RACK=rack3
    deploy:
      replicas: 1
      restart_policy:
        condition: any
    volumes:
      - 'kafka-broker-data:/var/lib/kafka/data'
    depends_on:
      - zookeeper

volumes:
  zookeeper-data:
  zookeeper-log:
  kafka-broker-data:

On each VM, initialize single-node Docker Swarms

# change ip accordingly depending on which VM you are on
docker swarm init --advertise-addr 192.168.3.151

On each VM, deploy the the kafka Docker stack

cd /opt/dockerswarm/kafka
docker stack deploy --compose-file docker-stack.yml kafka

Check the containers

docker ps

You can see the status of each zookeeper and check if there is 1 leader and 2 followers. Refer to https://zookeeper.apache.org/doc/r3.4.8/zookeeperAdmin.html 4-letter commands for other commands that you can use to check your zookeeper cluster.

echo srvr | nc 192.168.3.151 2181
echo srvr | nc 192.168.3.152 2181
echo srvr | nc 192.168.3.153 2181

Test Kafka Brokers

Download Kafka binaries into each of your VMs (or other servers/VMs that you have).

cd ~
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -zxvf kafka_2.13-2.5.0.tgz

Install dependencies

# we will need to untar kafka binaries tar file
tdnf install tar
# the kafka binaries will need Java in order to run
tdnf install openjre8

For the following tests, it should not matter which VM you are subscribing to and where you are subscribing from. You just need to be able to access the VMs through the network.

Event (Topic)

Optional: Create a topic manually

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.3.151:2181 --replication-factor 2 --partitions 3 --topic test-event

On VM1, start a consumer subscription to topic test-event

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 192.168.3.151:9091 --topic test-event

On VM2, start a consumer subscription to topic test-event

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 192.168.3.152:9091 --topic test-event

On VM3, start a producer connection and test with any message. The subscribed consumers should receive the messages.

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 192.168.3.153:2181 --topic test-event

Command (Queue)

Optional: Create a topic manually

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 192.168.3.151:2181 --replication-factor 2 --partitions 3 --topic test-cmd

On VM1, start a consumer subscription to topic test-cmd with group id consumer-grp1

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 192.168.3.151:9091 --topic test-cmd --group consumer-grp1

On VM2, start a consumer subscription to topic test-cmd with group id consumer-grp1

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 192.168.3.152:9091 --topic test-cmd --group consumer-grp1

On VM3, start a producer connection and test with any message. Only one subscribed consumer should receive each message.

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --broker-list 192.168.3.153:9091 --topic test-cmd

You can also execute into the kafka broker containers and run the in-built kafka binaries for testing

sudo docker exec -it <your_kafka_container_id> sh

# To create a topic
kafka-topics --create --topic test-topic --partitions 3 --replication-factor 2 --if-not-exists --bootstrap-server localhost:9091

# To list all topics
kafka-topics --list --bootstrap-server localhost:9091

# To show topic partition and in-sync-replica arrangement
kafka-topics --describe --topic test-topic --bootstrap-server localhost:9091

# To produce messages
kafka-console-producer --broker-list localhost:9091 --topic test-topic

# To consume messages
kafka-console-consumer --topic test-topic --group test-group --from-beginning --bootstrap-server localhost:9091

# To get all partition offsets and topics of a specific consumer group
kafka-consumer-groups --bootstrap-server localhost:9091 --group consumer-grp1 --describe