Working with Kafka - michaelpidde/datastreaming GitHub Wiki
Connectors
Kafka exposes a REST API that can be used for configuration. It also provides a number of shell scripts on the host which can be used for configuration as well as operating against the Kafka service itself (viewing events in a topic, for example).
The most expedient way to add a connector is to utilize the REST API. This adds a connector using details stored in a JSON file:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @register-mssql.json
You can likewise delete a connector (deleting/recreating is as easy as doing a PUT to update the configuration):
curl -X DELETE http://localhost:8083/connectors/sqlserver-connector
To view the status of a connector, use the status endpoint:
curl http://localhost:8083/connectors/sqlserver-connector/status
Topics
Topics are the "buckets" that events flow into. In this application, Debezium is expecting a topic per table. For example, application.dbo.customer table events go into a topic called sqlserver.application.dbo.customer.
You can log into the Kafka container and use the shell scripts to view topics or modify their configuration:
docker-compose exec -it kafka bash
To list all topics:
kafka-topics.sh --bootstrap-server localhost:9092 --list
To view the configuration of a topic:
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic sqlserver.application.dbo.customer
You can update the topic configuration (for example, setting a different retention period):
kafka-configs.sh --bootstrap-server kafka:9092 --entity-type topics \
--entity-name sqlserver.application.dbo.customer \
--alter --add-config retention.ms=86400000 # 1 day
Scaling
By default, a topic in Kafka has 1 partition. Messages are distributed evenly across partitions, and consumer groups are connected to a topic. Consumers within the group will be assigned to partitions. As an example, imagine a topic that has 1,000 messages in it in 1 partition, and one consumer in the group. This may work perfectly fine for a small application with minimal traffic, or for events that are infrequent.
In order to scale, you can configure the topic to use more partitions. Say you increase to 10,000 messages, and increase to 10 partitions. Partition 1 will now hold messages 1 - 1,000, partition 2 has messages 1,001 - 2,000, etc. To scale horizontally, you would also want to increase your consumers. In this case if you have 5 consumers, consumer 1 will read from partitions 1 and 2, consumer 2 from partitions 3 and 4, etc. You can have as many consumers as partitions - in this case up to 10. If you have more consumers than partitions, the excess consumers will sit idle.
Message Processing
Kafka maintains an offset per consumer group per topic to track which messages have been consumed. Going back to the prior example of 1 topic with 1,000 messages and 1 consumer, the offset for the consumer+topic will be 0:1000 indicating that there are 1000 messages and 0 of them have been consumed.
To view the consumer offsets, you can use a command like this:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group customer-consumer --describe
For my current container, the result looks like this:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
customer-consumer sqlserver.application.dbo.customer 0 1016 1016 0 - - -
The dashes indicate that there is no currently no active consumer in this group connected to Kafka. However, it does have a committed offset, meaning when a consumer is connected again to the topic, it will wait idle until a new event comes in because the current-offset matches the log-end-offset (i.e. all messages have been consumed by this consumer group).
If you wanted to reprocess the events before they get purged at the end of the retention period, you could reset the current offset to 0, or you could start up a consumer under a different group name (such as customer-consumer-reprocess).
The latter option also illustrates how you can have different subscribers (consumer groups) reading the same topic but tracking their offsets separately. This allows the same messages to be processed in different ways - for example, checking the before/after state of different database fields and triggering different actions based on them. This is an effective way to remove database triggers and convert them to streamed events.
Error Handling
There are three primary failure mechanisms, the choice of which depends on your use-case.
- "Fail fast" - allow the consumer to exit on error. This prevents the offset from being incremented; when the issue causing the exception is fixed, the consumer will continue processing where the offset left off, therefore removing the need for resetting the offset to an earlier number.
- Retry the message - this mechanism is useful in cases where network latency could prevent the completion of message processing, for example a timed out request to a third-party REST API.
- Send to a Dead Letter Queue (failure queue) - in this case, the message is inserted into another topic and the consumer moves on to subsequent messages. This method is useful when message processing has potential side effects particular to singular messages but not all messages. The dead letter messages can be manually assessed and reprocessed when ready.