Kafka Client for JVM - munichbughunter/SevenFacette GitHub Wiki

Introduction

This first part of the reference documentation is a high-level overview of 7Facette for Apache Kafka and the underlying concepts and some code snippets that can help you get up and running as quickly as possible. The 7Facette Kafka implementation contains one class each for the overall functionality for reading and writing from and to Kafka topics.

Quick Tour - Configuration and Initialization

This is the five-minute tour to get started with 7Facette for Kafka.

Prerequisites: You must install and run Apache Kafka. Then you must grab the 7Facette core module. Take a look at Getting Started.

Compatibility

This quick tour works with the following versions:

  • Apache Kafka Clients 2.0.0
  • Minimum Java version: 8

Configuration

Configuration via YML

The configuration in the YML file looks like the following for a consumer.

sevenFacette:
  kafka:
    bootstrapServer: localhost-kafka.de:9192
    consumer:
      topic1:
        useSASLAuthentication: true // default is false
        autoOffset: latest
        readIsolationLevel: READ_COMMITTED
        saslUsername: username
        saslPassword: '[[SASL_PASSWORD]]'

// And then you can create the consumer based on the name in the configuration with the auto start flag true in this way: 
KConsumer myConsumer = KFactory.createKConsumer("topic1", true);

Configuration via Code

The KafkaTopicConfig holds the Kafka configuration properties.

  • useSASLAuthentication: true or false - Default value is false
  • saslMechanism: SCRAM-SHA-256
  • saslUsername: kafkaUser
  • saslPassword: password or token
  • autoOffset: earliest or latest
  • maxConsumingTime: 5L - Means the consumer should poll max five seconds for new messages
  • kafkaProtocol: SASL_SSL or PLAIN_TEXT
  • bootstrapServer: localhost:9092
  • groupID: groupId - Default value es empty
  • readIsolationLevel: READ_COMMITTED or READ_UNCOMMITTED - Default is set to READ_UNCOMMITTED
  • autoCommit: true or false - Default is set to false
  • autoCommitInterval: Default value is 0
  • topicName: Name of the topic
KafkaTopicConfig kafkaConfig = new KafkaTopicConfig(false, "", "", "", "latest", 10, "", "kafka:1234", "", READ_COMMITTED, false, 0, "topicName");
            

Consuming and Producing

KConsumer

The KConsumer can be created via the KFactory. Based on the maxConsumingTime the consumer will poll for records on the defined topic.

KConsumer consumer = KFactory.createKConsumer(kafkaTopicConfig, true);

When the flag autoStart is set to false you have to start the KConsumer manually.

KConsumer consumer = KFactory.createKConsumer(kafkaTopicConfig, false);
consumer.consume();

Get consumed records

The consumer returns KRecords. KRecords holds the consumed data.

  • key of the message
  • value of the message
  • offset
  • partition

The method getKRecords() returns all consumed records as ConcurrentLinkedQueue.

consumer.getKRecords()

Get message count

Returns the count of the consumed messages.

consumer.getKRecordsCount()

Get last message

Returns the last consumed message as String.

consumer.getLastKRecord()

Wait for message

The consumer is running in a coroutine that means your testing code will not be blocked. To get a point where you wanna have your messages you can wait for the message. The consumer will check if there are messages and returns true or false. Here the consumer will wait max 5 seconds for the messages. After that it will stop consuming.

consumer.waitForKRecords(5000)

Wait for KRecords count

You can wait until the consumer has expected message count consumed in a given time.

consumer.waitForKRecordsCount(5, 1000);

Get records filtered by key

consumer.filterByKey("pattern", 2000);

Get records filtered by value

consumer.filterByValue("pattern", 2000);

KProducer

The KProducer provides a single method for sending new messages to a previously configured Kafka topic. It can also be created via the KFactory.

KProducer myProducer = KFactory.createKProducer(producerTableConfig, true); 

When the flag autoSend is set to false you have to flush the KProducer manually.

KProducer manualFlushProducer = KFactory.createKProducer(producerTableConfig, false); 
manualFlushProducer.send("My cool test message");
manualFlushProducer.flush()

or with autoSend true

KProducer autoFlushProducer = producerTableConfig.createKProducer(true);
autoFlushProducer.send("My cool test message");
⚠️ **GitHub.com Fallback** ⚠️