Developer Notes - JeffersonLab/epics2kafka GitHub Wiki



Schema

Internally the connector transforms the EPICS CA time Database Record (DBR_TIME) event data into Kafka Connect Schema structures of the form:

{
  "error":string optional
  "status":int8 optional,
  "severity":int8 optional,
  "doubleValues":float64[] optional,
  "floatValues":float32[] optional,
  "stringValues":string[] optional,
  "intValues":int32[] optional
  "shortValues":int16[] optional
  "byteValues":int8[] optional
}

Source

Note: Only zero or one of the values arrays will be non-null, but union types are expressed with optional (nullable) fields in Kafka Connect Schema language.

Note: Channel Access supports various extended DBR types, but DBR_TIME types are the only ones that provide a timestamp, which is often useful for monitoring. As a consequence alarm status and severity are always included as DBR_TIME types always include them. If not needed they can be ignored, or a Kafka Connect Transform can be used to drop the fields. Other extended types such as DBR_CTRL and DBR_GR are not supported by this Connector, as it rarely makes sense for these complex structures to be monitored as a whole. However, individual fields can be explicitly monitored by overriding the default field of .VAL. For example the fields for precision, units, or limits can be monitored individually using pv suffixes such as .PREC, .UNITS, .HOPR, .LOPR.

Timestamp

The timestamp provided by the Channel Access DBR_TIME record is inserted into the Kafka record metadata, and therefore it is unnecessary to include the timestamp as part of the record value. The Kafka configuration options message.timestamp.type defaults to CreateTime, which preserves the timestamp as provided by CA. The option message.difference.max.ms defaults to 9223372036854775807 (max long), which effectively disables difference checking, but if a smaller value is provided and an EPICS CA IOC provides a timestamp greater than the difference between the broker's LogAppendTime the message will be dropped. Use message.timestamp.type LogAppendTime to ignore the CA timestamp, which may be unreliable, and instead use the broker time (better order guarentees). If both timestamps are desired a connect transform could be used to copy the timestamp metadata into a field in the message value and the message could then be routed to a new topic with LogAppendTime type - the options are per topic (with broker defaults).

Tests

Unit Tests

Uses JUnit and are performed by the Gradle build unless the "-x test" argument is used. The tests use an Embedded EPICS CA IOC (Java). An HTML test report is generated at the path:

build/reports/tests/test/index.html

Integration Tests

Uses Docker containers and are separate from unit tests. Can be run with:

gradlew integrationTest

An HTML test report is generated at the path:

build/reports/tests/integrationTest/index.html

Note: It's best to run the integration tests from the test runner container defined in Dockerfile-test and launched with compose file test.yml. The softioc uses UDP broadcasts to communicate EPICS CA search request/response, which doesn't work reliably if your tests are run from a local workstation while attempting to communicate with a separate docker subnet. See: Docker Integration Test Strategy

Logging

Kafka uses the SLF4J logger with Log4J, and therefore epics2kafka uses them as well. Log levels can be controlled by including a log4j.properties file on the classpath. The epics2kafka logger root begins with org.jlab.kafka.connect. The unit tests have a separate configuration file in the test/resources directory and the epics2kafka logger is set to TRACE. The integration tests have minimal formatting configured at integration/resources/log4j.properties since container messages are streamed to standard out and already include multiple columns such as timestamp, level, and class. The Dockerfile includes both a logging.properties and log4j.properties from examples/logging to help quiet noisy messages from various dependencies.

Docker

docker pull jeffersonlab/epics2kafka
docker image tag jeffersonlab/epics2kafka epics2kafka

Image hosted on DockerHub

Development UDP port hang-ups

During development sometimes a Java process will hang onto the EPICS UDP port 5056. You then won't be able to start up the docker compose environment because you'll see the error:

Error response from daemon: Ports are not available: exposing port UDP 0.0.0.0:5065 -> 0.0.0.0:0: listen udp 0.0.0.0:5065: bind: Only one usage of each socket address (protocol/network address/port) is normally permitted.

To fix this you'll need to find this rogue process holding onto the UDP bind. On Windows execute:

netstat -ano -p udp | find 5065

Take the PID reported there and use Task Manager to kill the process.

Container Base: Confluent vs Debezium

NOTE: This comparison is out-of-date (old versions are compared).   

It is usually a good idea to base your own container off of an existing container and the top two that I could find that integrate nicely with a Kafka environment and include the Confluent Schema Registry are containers from Confluent and Debezium.

The Confluent container seems like the obvious choice given we're using the Confluent Schema registry, but it suffers many issues. The Confluent container licensing terms are hard to navigate (License Barriers), the documentation for using the container is difficult to find and limited, and the container is organized and contains a very Confluent specific layout and set of packages (differs from bundle you get if you simply Download Kafka from Apache.org, plus large number of extras included whether you want them or not bloating container size and start-up time).

The Debezium container is much easier to use, light weight, and well documented. The biggest drawback is it doesn't track bleeding edge changes to the Schema Registry so if you encounter an issue that is fixed in latest, like this one, you have a complication.

Comparison Table

Factor Confluent 6.1.0 Debezium 1.4
Docs The README doesn't say much, and instructions are not linked and buried on the Confluent site, and some required environment variables are not overtly documented The README provides enough info to configure and run the container
Ease of Use Confluent requires 11 environment variables: CONNECT_BOOTSTRAP_SERVERS, CONNECT_REST_ADVERTISED_HOST_NAME, CONNECT_GROUP_ID, CONNECT_CONFIG_STORAGE_TOPIC, CONNECT_OFFSET_STORAGE_TOPIC, CONNECT_STATUS_STORAGE_TOPIC, CONNECT_KEY_CONVERTER, CONNECT_VALUE_CONVERTER, CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR, CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR, CONNECT_STATUS_STORAGE_REPLICATION_FACTOR. Debezium only requires 4 environment variables (BOOTSTRAP_SERVERS, CONFIG_STORAGE_TOPIC, OFFSET_STORAGE_TOPIC, STATUS_STORAGE_TOPIC) to get started and uses sensible defaults for the rest.
License There is a lot of information to stumble upon (Image Reference, FAQ), none of which is linked off of DockerHub README and the license changed around 2018 so there is historic info to dig through, and the community edition license is custom, so you have to read it. The repo on github README says MIT for Dockerfile, plus Apache 2.0 for Debezium code (Dependencies can be different - additional info)
Organization Organization can be subjective, but I found spreading the various pieces around in /etc/, /usr/bin/, and /usr/share/java unhelpful. In production we're using the Apache.org install of Kafka and we simply untar the distribution in /opt/ (and could add symbolic links to various locations if you like spreading things around). More practically, our scripts assume you've set a KAFKA_HOME environment variable, but that doesn't work in the confluent container. The Confluent Dockerfile layering is more complicated and the github repo is under renovation complicating understanding. The debezium Dockerfile simply downloads Kafka from Apache.org and untars distribution into /kafka: simple, familiar, and easy.
Size/Speed DockerHub reports image size of 771MB for cp-kafka-connect-base tag 6.1.0; on run the container loads a bunch of extra plugins you may not use. DockerHub reports image size of 353MB for connect-base tag 1.4.1.Final; runs quick due to actually being a base image without a bunch of extra plugins