FAQ - celonis/kafka-ems-connector GitHub Wiki

I have data in the message key. How can I make sure that data is uploaded?

Kafka Connect offers another set of plugins called Single Message Transform which allows users to change the data shape by adding or removing fields, or moving them between the key and the value.

Information and examples can be found here:

I want to replay my data

At times, there's a need to re-process the data from the Kafka topic(s). The connector is using a Kafka consumer group, which means to replay the data requires to change the consumer group offsets to the ones required. To do so, follow these steps

  1. stop the connector
  2. the connector instance name is used as part of the consumer group name. For example, if my connector instance name is my_ems_sink then the consumer group name is connect-my_ems_sink
  3. reset the consumer group offsets.
# reset the offset on topic foo and partition 0 to the value 100
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --topic foo:0 --to-offset 100 --execute

# reset all the connector topics to earliest 
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --to-earliest 
--topic foo --execute

# reset to a specific point in time
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets
--to-datetime 2022-06-01T00:00:00Z --topic foo --execute

# move back the current offset by 10
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group connect-my_ems_sink --reset-offsets --shift-by -10 
--topic foo --execute
  1. restart the connector

I want to sink more than one topic to EMS

At the moment one instance of the connector accepts only one output EMS storage (see configuration connect.ems.target.table). The solution is to create a connector instance for each required topic.

# first connector instance configuration
name=ems-first_topic
topics=first_topic
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=first_topic
# ...

# second connector instance configuration
name=ems-second_topic
topics=second_topic
connector.class=com.celonis.kafka.connect.ems.sink.EmsSinkConnector
connect.ems.endpoint=https://***.***.celonis.cloud/continuous-batch-processing/api/v1/***/items
connect.ems.target.table=second_topic
# ...