Kafka - anodot/daria GitHub Wiki

Supported versions: Kafka 0.10, 0.11, 1.0+, 2.0+, 3.0+

Source config

  • (advanced) - Kafka version. Allowed values: 0.10, 0.11, 2.0+. Default - 2.0+
  • Kafka broker connection string - comma-separated list of broker URIs
  • (advanced) Kafka Configuration - Additional Kafka properties to pass to the underlying Kafka consumer. Format - key1:value1,key2:value2,key3:value3. If you want to use SASL/SSL - you can specify this config here (Example).
  • Topic list - Kafka topics list
  • Number of threads - default 1, number of threads to consume data
  • Initial offset - values: EARLIEST, LATEST, TIMESTAMP
  • Offset timestamp (unix timestamp in milliseconds) - if the initial offset is TIMESTAMP then specify it here
  • (advanced) Data format. Allowed values: JSON, DELIMITED, AVRO. Default - JSON. Data formats config examples
  • (advanced) Change fields names for DELIMITED data. Format - key:val,key2:val2,key3:val3
  • (advanced) Schema file path for AVRO format - specify file path with json schema
  • (advanced) Max Batch Size (records) - how many records to send to further pipeline stages. Default - 1000 records
  • (advanced) Batch Wait Time (ms) - how much time to wait until batch will reach its size. Default - 1000 ms

Source file config

Property Type Description
type String Specify source type. Value - kafka
name String Unique source name - also the config file name
config Object Source configuration

All properties are required

config object properties:

Property Type Required Description
conf.brokerURI String Yes List of kafka brokers, separated with commas
conf.topicList Array Yes Topic list
conf.numberOfThreads Integer No Number of threads
conf.kafkaAutoOffsetReset String No When to pull data from. Valid values are: LATAEST - from now, 'EARLIEST' - from the begining
conf.maxBatchSize Integer No number of records to send to further pipeline stages
conf.maxWaitTime Integer No time to wait until batch will reach its size (ms)
conf.kafkaOptions Array of objects No Array of configs
version String no Kafka version. Allowed values: 0.10, 0.11, 2.0+. Default - 2.0+
conf.dataFormat String no Allowed values: JSON, DELIMITED, AVRO. Default - JSON
csv_mapping object no Names of columns for delimited data
conf.dataFormatConfig.avroSchemaSource String no Allowed values SOURCE (schema is present in data itself), INLINE (specify schema in conf.dataFormatConfig.avroSchema parameter), REGISTRY (Confluent schema registry)
conf.dataFormatConfig.avroSchema Object no Avro schema (json object)
conf.dataFormatConfig.schemaRegistryUrls Array no Schema registry urls
conf.dataFormatConfig.schemaLookupMode String no How to look up a schema in the registry. Allowed values SUBJECT, ID, AUTO
conf.dataFormatConfig.subject String no Schema subject (specify if schemaLookupMode is SUBJECT)
conf.dataFormatConfig.schemaId String no Schema id (specify if schemaLookupMode is ID)
conf.keyDeserializer String no Key deserializer. Allowed values: STRING, CONFLUENT
conf.valueDeserializer String no Value deserializer. Allowed values: DEFAULT, CONFLUENT

conf.kafkaOptions object:

Property Type Description
key String config key, examples: security.protocol, sasl.mechanism
value String config values

All properties are required

Example

{
  "type": "kafka",
  "name": "kafka_source",
  "config": {
    "conf.brokerURI": "kafka:9092,kafka2:9092",
    "conf.numberOfThreads": 2,
    "conf.topicList": ["test"],
    "conf.kafkaAutoOffsetReset": "LATEST",
    "conf.maxBatchSize": 1000,
    "conf.maxWaitTime": 1000,
    "conf.kafkaOptions": [{"key": "security.protocol", "value": "SASL_PLAINTEXT"}, {"key": "sasl.mechanism", "value": "PLAIN"}, {"key": "sasl.jaas.config", "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='pass123'"}],
    "version": "0.10",
    "conf.dataFormat": "DELIMITED",
    "csv_mapping": {"0": "time", "1": "dim1", "2": "val"}
  }
}

Pipeline config

  • Pipeline ID - unique pipeline identifier (use a human-readable name so you could easily use it further)
  • Count records? - to include the number of records as a separate metric, default No
  • Measurement name - what property for counting records
  • (Advanced) Is 'what' property static? - if yes, then what property is constant (Measurement name), if no - get value for what property from data
  • (Advanced) Values array path - Values array path
  • Value columns with target types - key-value pairs separated with spaces, format column_name:target_type column2_name:target_type. A target type represents how samples of the same metric are aggregated in Anodot. Valid values are: gauge (average aggregation), counter (sum aggregation), running_counter. If what property is not static - a specified target type will be used for all metrics, or specify a property name where the target type is stored
  • Measurement properties names - key-value pairs separated with spaces, format column_name:what column2_name:whar2. If what property is not static - instead of an actual what value specify a property name where it is stored.
  • Timestamp config
    • Use Kafka timestamp or take it from data
    • Timestamp property name
    • Timestamp property type unix
      • string (must specify format)
      • unix_ms (unix timestamp in milliseconds)
      • unix (unix timestamp in seconds)
    • Timestamp format string - if the timestamp property type is string - specify format according to this spec. Test here
    • Timezone - if the timestamp property type is string you can specify a timezone for it (e.g. Europe/London), UTC by default.
  • Required dimensions - Names of properties delimited with spaces. If these fields are missing in a record, it goes to the error stage. Format - dimension1 dimension2
  • Optional dimensions - Names of properties delimited with spaces. These fields may be missing in a record. Format - dimension1 dimension2
  • Consumer group name - Kafka consumer group that will be used to retrieve data
  • (Advanced) Static dimensions - dimensions with static values to pass to Anodot. Format - key1:value1 key2:value2 key3:value3
  • (Advanced) Tags - tags. Format - key1:value1 key2:value2 key3:value3
  • (Advanced) Filter Condition - Additional filtering to be applied on the data retrieved from Kafka. See Kafka Filtering page for details.

If your JSON data has a nested structure you can specify a path to the required field with the / sign.

JSON example


    {
      "amount": 100,
      "transaction": {
        "type": "SALE",
        "status": "SUCCESS"
      }
    }

To specify these fields in pipeline config you can type transaction/type and transaction/status

Pipeline File config

Properties list

Property Required Value type in config file Description
source yes String Source config name
pipeline_id yes String Unique pipeline identifier (use a human-readable name so you could easily use it further)
static_what no bool If measurement_name is static. Default true
count_records no bool to include the number of records as a separate metric, default false
values_array_path no Values array path
values_array_filter_metrics no Values array filter metrics
count_records_measurement_name no String what property for counting records
values yes Object Key-value pairs (target_type). A target type represents how samples of the same metric are aggregated in Anodot. Valid values are gauge (average aggregation), counter (sum aggregation), running_counter. If what property is not static - instead of an actual target type specify a property name where the target type is stored
units no Object Key-value pairs (value:unit). The value must be from the values column, units can be any.
measurement_names yes Object Key-value pairs (property:name). If what property is not static - instead of an actual what value specify a property name where it is stored.
dimensions yes Object Dimensions object
timestamp yes Object Timestamp object
properties no Object with key-value pairs Dimensions with static values to pass to Anodot.
tags no Object with key-value pairs Tags
filter no Object Filtering config
override_source no Object Values from here will override the same source config values for the pipeline source
timezone no String A timezone of a timestamp field if its type is string, e.g. Europe/London, default UTC
header_attributes no Array A list of dimensions that should be extracted from Kafka message headers
periodic_watermark no Object Configuration for agent to send watermarks to Anodot periodically
partitions no Array Take records only from a specific partition. conf.numberOfThreads in the source config should be equal to the number of partitions.
tag_configurations no Object Configure tags with dynamic values. Each value in these configurations is a field. To learn about fields visit the fields wiki page
notifications no Object See notifications page

timestamp object properties:

Property Type Description
type String string, unix or unix_ms
name String Property name
format String Specify format if timestamp type is string

Required properties are type and name

dimensions object properties:

Property Type Description
required List of strings These properties are always present in a record
optional List of strings These properties may be missing in a record

All properties are required

filter object properties:

Property Type Description
condition String filtering condition. Example "Country" = "USA"

override_source object properties:

Property Type Description
conf.consumerGroup String Kafka consumer group that is used to retrieve data

periodic_watermark object properties:

Property Type Description
bucket_size String The bucket of what size you want to close after sending data. Possible values: 1m, 5m, 1h, 1d. For example, data is loaded into Kafka once per day, then if you set bucket_size to 1d, after the data has been sent by the pipeline, today's bucket will be closed and Anodot will expect the next data point to arrive tomorrow.
delay Integer Time to wait in seconds after the bucket_size was reached. Example: Using bucket_size 5m and delay of 90 seconds, the watermark for the 11:05 bucket, which holds data from 11:05:00 - 11:09:59 will be sent at 11:11:30. 90 seconds after the bucket close time.

Example

[
  {
    "source": "kafka_source",
    "pipeline_id": "test",
    "count_records_measurement_name": "clicks",
    "count_records": true,
    "values": {"amount": "gauge"},
    "units": {"amount": "USD"},
    "measurement_names": {"amount": "name"},
    "dimensions": {
      "required": ["ver", "Country"],
      "optional": ["Exchange", "optional_dim"]
    },
    "timestamp": {
      "type": "string",
      "name": "timestamp_string",
      "format": "M/d/yyyy H:mm:ss"
    },
    "properties": {"key1": "value1", "key2": "value2", "key3": "value3"},
    "tags": {"key1": ["value1"], "key2": ["value2"], "key3": ["value3"]},
    "override_source": {
        "conf.consumerGroup": "my_group"
    },
    "header_attributes": ["topic"],
    "partitions": [0, 2, 15],
    "tag_configurations": {
      "Tag_name1": {
        "value_path": "property1"
      },
      "Tag_name2": {
        "value_path": "property2"
      }
    }
  }
]

Using metrics 3.0 and Anodot schema

  • If you set the option static_what: false that means that measurement names are generated dynamically. Because we don't know measures in advance we can't create schema and use metrics 3.0 protocol

Periodic watermarks

When using metrics 3.0 and Anodot schema, Kafka pipelines do not send watermarks on data arrival. You should configure pipelines to send watermarks periodically. It is possible to add this configuration only via file, not via CLI. To do it, add the periodic_watermark object to the pipeline configuration, you can find its description here.

The Agent application contains a script that runs every minute and checks if a watermark needs to be sent for any pipeline. Depending on the bucket_size from the periodic_watermark configuration. The script will calculate the next bucket start and send it to Anodot, thus the current bucket will be closed and no new metrics can be sent with timestamp less than the next bucket start.