Kafka Client for JS - munichbughunter/SevenFacette GitHub Wiki

Configuration

Configuration via JSON

The configuration in the JSON file looks like the following for a consumer and producer.

{
    "sevenFacette": {
        "kafka": {
            "bootstrapServer": "localhost:9092",
            "consumer": {
                "testConsumer": {
                    "useSASLAuthentication": false,
                    "autoOffset": "earliest",
                    "topicName": "test", 
                    "maxConsumingTime": 60000
                },
                "topic2": {
                    "autoOffset": "earliest",
                    "bootstrapServer": "localhost:9191"
                }
            },
            "producer": {
                "testProducer": {
                    "useSASLAuthentication": false,
                    "autoOffset": "latest",
                    "topicName": "test"
                }
            }
        }
    }
}  

// And then you can create the consumer based on the name in the configuration: 
var kafka = new sf.kafka.createKConsumer("testConsumer")

Consuming and Producing

KConsumer

The KConsumer can be created via factory method. Based on the maxConsumingTime the consumer will poll for records on the defined topic.

var kafka = new sf.kafka.createKConsumer("testConsumer")

Consume each message

The eachMessage handler provides a convenient and easy to use API, feeding your function one message at a time. It is implemented on top of eachBatch, and it will automatically commit your offsets and heartbeat at the configured interval for you.

  const sfConsumer = kafka.getConsumer()
  
  await sfConsumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      if (message.key == null) {
        kafka.addKRecord("", message.value.toString(), message.offset, partition)  
      } else {
        kafka.addKRecord(message.key.toString(), message.value.toString(), message.offset, partition)
      }
    },
  })

Disconnect from Kafka

Before you can make your validation you have to disconnect your consumer or producer instance with the shutdown() command.

kafka.shutdown();

Get consumed records

The consumer returns KRecords. KRecords holds the consumed data.

  • key of the message
  • value of the message
  • offset
  • partition

The method getMessages() returns all consumed records as Array.

kafka.getMessages()

Get message count

Returns the count of the consumed messages.

kafka.getKRecordsCount()

Get last message

Returns the last consumed message as String.

kafka.getLastKRecord()

Filter for records

You can filter for records.

Get records filtered by key

kafka.filterByKey('KeyOfTheRecord')

Get records filtered by value

kafka.filterByValue('Pattern')

KProducer

The KProducer provides a single method for sending new messages to a previously configured Kafka topic. It can also be created via factory method.

var producer = new sf.kafka.createKProducer("testProducer")

Send message with producer

await producer.sendKeyMessage("Testmessage", "Hello here is my fantastic message")
  setTimeout(() => {
    producer.disconnect()
  }, 5000)
⚠️ **GitHub.com Fallback** ⚠️