Sending FlatBuffers through Kafka in Java - ScreamingUdder/wiki GitHub Wiki

Sending FlatBuffers through Kafka

This guide assumes you are familiar with setting up Kafka Producers and Consumers to exchange simple messages. This will cover particular settings and gotchas in regards to sending Flatbuffers through Kafka.

Serializers & Deserializers

FlatBuffers are sent as byte arrays, so its necessary to use the appropriate (de)serialization. These are:

org.apache.kafka.common.serialization.ByteArraySerializer

for the producer, and

org.apache.kafka.common.serialization.ByteArrayDeSerializer

for the consumer. These should be used for the keys and/or values depending on your needs.

Kafka Producer

The FlatBuffer builder needs to be used to create a FlatBuffer before being sent by the producer. To create a FlatBuffer, you should follow the guides and documentation for creating a schema and generating its code, which can be found on the FlatBuffers site.

First, initialise the FlatBuffer builder, and give it an initial size.

FlatBufferBuilder builder = new FlatBufferBuilder(1024);

Then, you need to create your object. In this example, ours will be called a Message. Remember to have your generated schema available to the project.
Our Message will only contain a string as its contents. As FlatBytes store binary, we must first convert our message to an integer using the builder like this:

int contents = builder.createString([STRING]);

[STRING] is replaced with whatever the contents of the message is intended to be. Then, we need to create the Message object and add this byte array to it, and then convert the entire object to a byte array. For our Message object we'll do the following:

Message.startMessage(builder);
Message.addContents(builder,contents);
int messageBinary = Message.endMessage(builder);

Finally, the builder is called to finish building, with the new int we made.

builder.finish(messageBinary);

To send this FlatBuffer through Kafka, we can create a ProducerRecord with the topic name and

builder.sizedByteArray()    

Kafka Consumer

Setting up the consumer is slightly more involved. First of all, the consumer records need to be specified to take the byte arrays that the FlatBuffers will arrive at.(props) is the name of our properties that we've set. Remember the byte array deserializers.)

KafkaConsumer<byte[],byte[]> consumer = new KafkaConsumer<>(props);

Using byte[] is also necessary in the ConsumerRecords and ConsumerRecord objects when initialising them. For this guide, we will call each message received by the Consumer a record. The contents of the FlatBuffer can now be access like so:

byte[] bytes = record.value();

This will give us the byte array of the message. We can then access the original message in its string form, by wrapping this array in a ByteBuffer, and using the getRootasMessage function provided by the generated Message object.

java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(bytes);
Message message = Message.getRootAsMessage(buf);

And as the field in our Message is called contents, we can access it's string with

Message.contents()