Elasticsearch - anodot/daria GitHub Wiki

Source file config

Property Type Description
type String Specify source type. Value - influx
name String Unique source name - also the config filename
config Object Source configuration

All properties are required

config object properties:

Property Type Required Description
conf.httpUris Array Yes ElasticSearch cluster URLs
conf.index String No Optional index to define the scope of the query. Enter an index name or a pattern
query_interval_sec Integer No Amount of time that the origin waits between queries (seconds)
conf.offsetField String No Offset field
conf.initialOffset String No Offset value to use when the pipeline starts. Enter a constant or an Elasticsearch date math expression

Example

[
  {
    "type": "elastic",
    "name": "test_elastic",
    "config": {
      "conf.httpUris": ["es:9200"],
      "conf.index": "test*",
      "query_interval_sec": 5,
      "conf.offsetField": "timestamp",
      "conf.initialOffset": "now-3d/d"
    }
  }
]

Using base authentication

If you need to use basic authentication for the ElasticSearch source, your agent version should be >= 3.37.0. To enable basic authentication, add these keys to the source configuration: conf.useSecurity, conf.securityConfig.securityUser, conf.securityConfig.securityPassword. Example:

[
  {
    "type": "elastic",
    "name": "test_elastic",
    "config": {
      "conf.httpUris": [
        "http://es:9200"
      ],
      "conf.isIncrementalMode": true,
      "conf.index": "test",
      "conf.offsetField": "timestamp_unix_ms",
      "conf.initialOffset": "now-1595d",
      "query_interval_sec": 1,
      "conf.queryInterval": "${1 * SECONDS}",
      "conf.useSecurity": true,
      "conf.securityConfig.securityUser": "username",
      "conf.securityConfig.securityPassword": "password"
    }
  }
]

Additionally, the username and password keys can be used for basic authentication, but they can only be used if the pipeline has enabled schema.

Using self-signed SSL certificates

There is no option to disable SSL verification, so if your certificate is self-signed you need to import it to the truststore. To do so follow these steps:

  1. Download the certificate
echo -n | openssl s_client -connect $HOST:$PORTNUMBER | openssl x509 > /tmp/$SERVERNAME.cert
  1. Copy the certificate to the streamsets pods/containers Plain docker/docker-compose installation
docker cp /tmp/$SERVERNAME.cert anodot-sdc:/data/

Kubernetes installation

kubectl cp /tmp/$SERVERNAME.cert streamsets-agent-0:/data/
  1. Import the certificate using keytool Plain docker/docker-compose installation
docker exec anodot-sdc keytool -import -alias ca -file /data/$SERVERNAME.cert -keystore /data/cacerts -storepass changeit -noprompt

Kubernetes installation

kubectl exec streamsets-agent-0 keytool -import -alias ca -file /data/$SERVERNAME.cert -keystore /data/cacerts -storepass changeit -noprompt

If you have multiple streamsets instances running you need to repeat steps 2-3 for every one of them

After importing certificates you need to specify the truststore location in your source config, example:

[
  {
    "type": "elastic",
    "name": "test_elastic",
    "config": {
      "conf.httpUris": [
        "http://es:9200"
      ],
      "conf.isIncrementalMode": true,
      "conf.index": "test",
      "conf.offsetField": "timestamp_unix_ms",
      "conf.initialOffset": "now-1595d",
      "query_interval_sec": 1,
      "conf.queryInterval": "${1 * SECONDS}",
      "conf.useSecurity": true,
      "conf.securityConfig.securityUser": "username",
      "conf.securityConfig.securityPassword": "password",
      "conf.securityConfig.sslTrustStorePath": "/data/cacerts",
      "conf.securityConfig.sslTrustStorePassword": "changeit"
    }
  }
]

New pipelines (version >=3.49.1)

Source file config

Property Type Description
type String Specify source type: elastic
name String Unique source name - also the config file name
config Object Source configuration

All properties are required

config object properties:

Property Type Required Description
conf.httpUris String yes URL to fetch data from, e.g. http://victoriametrics:8428
conf.index String yes ElasticSearch index
username String no username
password String no password
verify_ssl Boolean no whether to verify SSL certificate when connecting via HTTPS, default: true
query_timeout Integer no Query timeout in seconds, 15 by default

Elasticsearch Query

Define the query that the origin uses to return data from Elasticsearch. You can define any valid Elasticsearch query. Important note: aggregations are not supported

You must include both the offset field and the offset value in the Elasticsearch query. Use ${OFFSET} to represent the offset value.

For example, let's say that you configure the timestamp field as the offset field. You include the offset field and offset value in the query to determine where to start reading data. You use the Elasticsearch date math expression now-3d/d to set the initial offset value to three days before the current time

Example of the query:

{
  "sort": [{"timestamp": {"order": "asc"}}],
  "query": {"range": {"timestamp": {"gt": ${OFFSET}}}}
}

New pipelines (version >=3.49.1)

You need to add a timestamp condition to your query, it should look like this

{
  "sort": [{"timestamp": {"order": "asc"}}],
  "query": {"range": {"timestamp": {"gt": "$OFFSET", "lte": "$OFFSET+$INTERVAL"}}}
}

When you run the pipeline, the query returns all documents that have a timestamp field with a value greater than three days before the current time. The query sorts the results by timestamp.

Pipeline File config

Properties list

Property Required Property name in config file Value type in config file Description
Source yes source String Source config name
Pipeline ID yes pipeline_id String Unique pipeline identifier (use human-readable name so you could easily use it further)
Query file path Yes query_file String Path to the file with a search query
Static what no static_what bool If measurement_name is static
Count records? no count_records bool to include the number of records as separate metric, default false
Measurement name no count_records_measurement_name String what property for counting records
Value columns with target types yes values Object Key-value pairs (target_type). A target type represents how samples of the same metric are aggregated in Anodot. Valid values are gauge (average aggregation), counter (sum aggregation). If what property is not static - instead of an actual target type specify a property name where the target type is stored
Measurement properties names yes measurement_names Object Key-value pairs (property:name). If what property is not static - instead of an actual what value specify a property name where it is stored.
Dimensions yes dimensions Array Dimensions object. Format _source/<dimention-name>
Timestamp yes timestamp Object Timestamp object
Additional properties no properties Object with key-value pairs Additional properties with static values to pass to Anodot as dimensions.
Timezone no timezone String A timezone of a timestamp field if its type is string, e.g. Europe/London, default UTC
days_to_backfill yes Integer Collect data starting N days ago
interval yes Integer Query data every N seconds
delay no Integer Collect data with a specified delay, e.g. the pipeline will retrieve data until now -(minus) delay, it will not fetch the latest data immediately, unit - minutes

Timestamp object properties:

Property Type Description
type String string, unix or unix_ms
name String Property name
format String Specify format if timestamp type is string

Required properties are type and name

Example

{
    "source": "test_elastic",
    "query_file": "/path/to/file",
    "pipeline_id": "test",
    "count_records_measurement_name": "clicks",
    "count_records": true,
    "days_to_backfill": 10,
    "interval": 300,
    "delay": 1,
    "values": {
        "amount": "gauge",
        "some/path/to/amount": "gauge"
    },
    "measurement_names": {"amount": "name"},
    "dimensions": [
        "_source/Exchange",
        "_source/optional_dim"
    ],
    "timestamp": {
      "type": "string",
      "name": "timestamp_string",
      "format": "M/d/yyyy H:mm:ss"
    },
    "properties": {"key1": "value1", "key2": "value2", "key3": "value3"},
    "tags": {"key1": "value1", "key2": "value2", "key3": "value3"}
}

WARNING: Elastic search usually returns JSON data with nested fields, so you need specify path to required field with / sign. For example:


    {
      "amount": 100,
      "transaction": {
        "type": "SALE",
        "status": "SUCCESS"
      }
    }

To specify these fields in pipeline config you can type transaction/type and transaction/status