Custom Kafka Forwarder with Fluentd - CloudCommandos/JohnChan GitHub Wiki

Introduction

Confluent Replicator and Kafka MirrorMaker2 are available tools that can replicate data across Kafka clusters. However, currently active-active Kafka clusters do not support the sharing of consumer groups. Even with the same consumer group id, messages will be consumed by the consumers on each Kafka cluster.

One solution is to form a single Kafka cluster. For multi-datacentre setups, this will involve a Kafka cluster that spans across multiple sites. If latency is an issue, another solution is to have a custom built kafka forwarder (Let's call it "Custom Forwarder"). Each Kafka cluster should have one Custom Forwarder. Each Custom Forwarder is expected to subscribe their Kafka cluster's brokers as a consumer, belonging to the consumer group in charge of consuming command messages (Queue). The Custom Forwarders are also supposed to publish their received messages to other Kafka cluster(s). Each Custom Forwarder should only forward the message to one of the Kafka clusters. Depending on the number of clusters, Custom Forwarders may have secondary forward target in case the primary target is not available. All primary forward targets should ideally form a closed loop, as messages need to be passed around the clusters until they expire or are consumed by a non-Custom-Forwarder consumer.

Depending on the number of non-Custom-Forwarder consumer, there will be a portion of messages that are consumed by a cluster's Custom Forwarder even when there are non-Custom-Forwarders present. To further improve the effectiveness of this solution, you can write your own custom partition.assignment.strategy for your Kafka brokers. The partition assignment strategy should only assign partitions to a Custom Forwarder when there are no non-Custom-Forwarders present in the cluster. This ensures that under normal operation all command messages (Queue) are consumed by the first Kafka cluster that they arrive at. This part of the solution is not covered in this guide.

Assumptions

  • You have 2 active Kafka clusters
  • Node1 Ubuntu 18.04, 10.0.1.41
  • Node2 Ubuntu 18.04, 10.0.1.42
  • Both nodes already have Docker installed
  • Each node already has a Kafka cluster running with MirrorMaker2
  • (Optional) Test-Node Ubuntu 18.04, 10.0.1.43

Build your custom fluentd-kafka container image on Node1

Create your work directory

mkdir -p /opt/docker/fluentd-kafka
cd /opt/docker/fluentd-kafka

Create Dockerfile

nano Dockerfile
FROM fluent/fluentd:v1.11-1

# Use root account to use apk
USER root

RUN apk add --no-cache --update --virtual .build-deps \
        sudo build-base ruby-dev \
 && sudo gem install fluent-plugin-elasticsearch \     
 && sudo gem install ruby-kafka -v 1.0.0 \
 && sudo gem install fluent-plugin-kafka -v 0.13.0 \
 && sudo gem sources --clear-all \
 && apk del .build-deps \
 && rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems/*/cache/*.gem

COPY fluent.conf /fluentd/etc/
COPY entrypoint.sh /bin/

USER fluent

Create script entrypoint.sh

nano entrypoint.sh
#!/bin/sh

#source vars if file exists
DEFAULT=/etc/default/fluentd

if [ -r $DEFAULT ]; then
    set -o allexport
    . $DEFAULT
    set +o allexport
fi

# If the user has supplied only arguments append them to `fluentd` command
if [ "${1#-}" != "$1" ]; then
    set -- fluentd "$@"
fi

# If user does not supply config file or plugins, use the default
if [ "$1" = "fluentd" ]; then
    if ! echo $@ | grep ' \-c' ; then
       set -- "$@" -c /fluentd/etc/${FLUENTD_CONF}
    fi

    if ! echo $@ | grep ' \-p' ; then
       set -- "$@" -p /fluentd/plugins
    fi
fi

exec "$@"

Make entrypoint.sh executable

chmod +x entrypoint.sh

Create a default fluent.conf for the container image

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<filter **>
  @type record_modifier

  <record>
    hostname "#{Socket.gethostname}"
  </record>
</filter>

<match **>
  @type copy
  <store>
    @type stdout
    output_type json
  </store>
  <store>
    @type kafka_buffered
    brokers 127.0.0.1:9092
    default_topic fluentd
    compression_codec gzip
    output_include_tag true
    output_include_time true
  </store>
</match>

Create your myfluent.conf that is to replace the default fluent.conf on container start-up to serve as a Custom Forwarder on Node1

<source>
  @type kafka_group

  brokers 10.0.1.41:32780,10.0.1.41:32781,10.0.1.41:32782
  consumer_group cmd-consumer
  topics cmd,fwd.c2.cmd
  format text
  add_prefix kafkacmd
  retry_emit_limit 10
  time_source kafka
  start_from_beginning false
</source>

<match kafkacmd.**>
  @type kafka2

  # list of seed brokers
  brokers 10.0.1.42:32780,10.0.1.42:32781,10.0.1.42:32782
  use_event_time true

  # buffer settings
  <buffer topic>
    @type file
    path /fluentd/log/td
    flush_interval 1s
  </buffer>

  # data type settings
  <format>
    @type single_value
  </format>

  # topic settings
  topic_key topic
  default_topic fwd.c1.cmd

  # producer settings
  required_acks 1
  compression_codec gzip
</match>

Build the container image (You can of course name it as you like)

docker build -t fluentd-kafka:v1 .

Start the container

docker run --rm -d -v $PWD/myfluent.conf:/fluentd/etc/fluent.conf fluentd-kafka:v1

Start your custom container on Node2

Save your container image on Node1 first, then pass it to Node2

docker save -o fluentd-kafka.tar fluentd-kafka:v1
rsync --progress fluentd-kafka.tar 10.0.1.42:

On Node2, load the container image

docker load -i fluentd-kafka.tar

Create your work directory

mkdir -p /opt/docker/fluentd-kafka
cd /opt/docker/fluentd-kafka

Create your myfluent2.conf that is to replace the default fluent.conf on container start-up to serve as a Custom Forwarder on Node2

<source>
  @type kafka_group

  brokers 10.0.1.42:32780,10.0.1.42:32781,10.0.1.42:32782
  consumer_group cmd-consumer
  topics cmd,fwd.c1.cmd
  format text
  add_prefix kafkacmd
  retry_emit_limit 10
  time_source kafka
  start_from_beginning false
</source>

<match kafkacmd.**>
  @type kafka2

  # list of seed brokers
  brokers 10.0.1.41:32780,10.0.1.41:32781,10.0.1.41:32782
  use_event_time true

  # buffer settings
  <buffer topic>
    @type file
    path /fluentd/log/td
    flush_interval 1s
  </buffer>

  # data type settings
  <format>
    @type single_value
  </format>

  # topic settings
  topic_key topic
  default_topic fwd.c2.cmd

  # producer settings
  required_acks 1
  compression_codec gzip
</match>

Start the container

docker run --rm -d -v $PWD/myfluent2.conf:/fluentd/etc/fluent.conf fluentd-kafka:v1

Test the Setup

Make sure that your MirrorMaker2 has blacklisted the topics cmd, fwd.c2.cmd, and fwd.c1.cmd

...
topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets, cmd, fwd.*
...

Install Java dependency on Test-Node if you have not done so

apt install openjdk-8-jre-headless

Download Kafka binaries into Test-Node if you have not done so

cd ~
wget https://apachemirror.sg.wuchna.com/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -zxvf kafka_2.13-2.5.0.tgz

Create all necessary topics

~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.41:2181 --replication-factor 2 --partitions 3 --topic cmd
~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.41:2181 --replication-factor 2 --partitions 3 --topic fwd.c2.cmd
~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.42:2181 --replication-factor 2 --partitions 3 --topic cmd
~/kafka_2.13-2.5.0/bin/kafka-topics.sh --create --zookeeper 10.0.1.42:2181 --replication-factor 2 --partitions 3 --topic fwd.c1.cmd

Open multiple SSH terminals for the following Subscribe to topics cmd and fwd.c2.cmd of cluster1 with group id cmd-consumer

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.41:32781 --whitelist 'cmd|fwd.c2.cmd' --group cmd-consumer --consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

In another session, subscribe to topic cmd and fwd.c1.cmd of cluster2 with group id cmd-consumer

~/kafka_2.13-2.5.0/bin/kafka-console-consumer.sh --property print.timestamp=true --bootstrap-server 10.0.1.42:32781 --whitelist 'cmd|fwd.c1.cmd' --group cmd-consumer --consumer-property partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

In another session, publish to topic cmd of either cluster

~/kafka_2.13-2.5.0/bin/kafka-console-producer.sh --bootstrap-server 10.0.1.41:32780 --topic cmd

You should observe that the published messages are only consumed once by either consumer.

⚠️ **GitHub.com Fallback** ⚠️