FlumeHarvestTechnology: Flume Harvest Component - IKANOW/Aleph2-examples GitHub Wiki
The Flume Harvester enables users to configure the high-performance, mature, and widely-used Flume infrastructure to bring data into Aleph2, either via its standard batch (HDFS) and streaming (Kafka) end-points, or (more as a debug capability) directly into any of its data stores.
Currently the Flume harvesters is not safe for non-admin users on secure clusters. It will shortly be integrated with RBAC but in the meantime access to this enrichment engine should be restricted to admin users (by restricting the read rights of the uploaded JAR).
The Flume harvester currently has no Aleph2 logging.
Because of its flexibility, Flume has a reasonably complicated per-bucket configuration model. At the top level it looks like this:
{
// (INPUT)
"input": {
"spool_dirs": [
{ /* see below */ }
]
},
// (BASIC PROCESSING)
"flume_config": { /* see below */ },
"flume_config_str": string,
"flume_config_test_overrides": { /* see below */ },
"substitution_prefix": string,
// (ADVANCED PROCESSING)
"morphlines_config": { /* see below */ },
"morphlines_config_str": string,
// OUTPUT
"output": {
"add_time_with_name" string,
// one of:
"json": { /* see below */ },
"csv": { /* see below */ }
"batch_schema": { /* see below */ },
// (more for debugging)
"direct_output": [ string ]
}
}
Aleph2 provides a simple input format schema (currently: spooldir type only), or it is possible to use the full Flume configuration as described below in "Basic Processing".
Any number of the following JSON objects can be inserted into the input.spool_dirs array:
{
"enabled": boolean,
"path": string,
"test_src_path": string,
"ignore_pattern": string,
"delete_on_ingest": boolean,
"append_basename_field": string,
"append_path_field": string
}
Where (with reference to the Flume documentation):
-
enabled: if present and set to false, then this input is ignored as if it wasn't present -
path: maps to thespoolDirparameter in Flume. Note that files in this directory (over whichtomcatmust have write permission) are either renamed or deleted after processing (seedelete_on_ingest). -
test_src_path: if this is specified, then when the "Test" function is invoked on the bucket, the data is copied from here (ie not deleted/renamed) into a transient spooling directory, and thepathdirectory is not used. -
ignore_pattern: maps to theignorePatternparameter in Flume. -
delete_on_ingest: iftruethen the file is deleted when processed, iffalsethen it has the string".COMPLETED"appended- (note that this only makes sense if the
ignore_patternregex pattern will match on files ending".COMPLETED")
- (note that this only makes sense if the
-
append_basename_field: if set then the basename (ie filename minus the path) will be added to the Flume event header map with the specified field name. -
append_path_field: if set then the full absolute path will be added to the Flume event header map with the specified field name.
(The following Flume parameters are fixed: type=spoolDir, channels=mem (see below), trackerDir is set to a unique sub-directory of the spoolDir; there's a 'mem' channel with capacity=1000, transactionCapacity=100, type=memory)
Note: In many cases (eg reading CSV or JSON from file into Aleph2 with no additional enrichment) the basic and advanced processing steps described in this and the next section are not required.
The Flume configuration file is generated by the following steps
- First the string in
flume_config_stris prepended (and cannot be override) - Then, if present, the
inputobject is converted into a set of flume property-value lines- (the
spooldirinputs are calledfile_in_<index>where<index>is the position in the array, counting from 1)
- (the
- Then each key/value pair from
flume_configis appended in the format:<key>=<value>- Where the keys are whatever you want to configure in Flume, except with
.s replaced with:s, eg{ "sources:file_in:type": "netcat" }would be mapped to the linetest_ext_c1651d4c69ed_1.sources.file_in.type=netcat - (And then in test mode only, the key/value pairs from
flume_configare overwritten one-by-one by the key/value parts fromflume_config_test_overrides) - You can also the following substitution (the
substitution_prefixfield can replace the$$$$below):-
$$$$hostname- the hostname on which this Flume agent is being launched -
$$$$signature- a unique and deterministic UUID generated from the bucket signature
-
So for example the following object:
{
"flume_config_str": "test.field=test",
"flume_config": {
"sources": "file_in",
"channels": "mem",
"sinks": "aleph2_sink",
"sources:file_in:type": "netcat",
}
}
would generate the following lines:
test.field=test
test_ext_c1651d4c69ed_2.sources=file_in
test_ext_c1651d4c69ed_2.channels=mem
test_ext_c1651d4c69ed_2.sinks=aleph2_sink
test_ext_c1651d4c69ed_2.sources.file_in.type=netcat
The Flume agent can be configured to run the morphlines processing pipeline by using morphlines_config and morphlines_config_str in the same way as flume_config and flume_config_str are used above.
Note: Morphlines have not been tested beyond checking that two parameters are inserted correctly into the Flume configuration file (eg it is not known which libraries, if any, are supported by the HDP install).
The output block is mandatory unless basic processing is wired in and provides one of more sinks.
The global output configuration is:
"output": {
"add_time_with_name" string,
// one of:
"json": { /* see below */ },
"csv": { /* see below */ }
"batch_schema": { /* see below */ },
// (more for debugging)
"direct_output": [ string ]
}
Where
- If
add_time_with_nameis populated then the current time is written into the output JSON with the specified field name.- If it isn't set but the temporal schema is populated and its
time_fieldis set, then that field name is used as a backup
- If it isn't set but the temporal schema is populated and its
- One of
jsonandcsvmust be set. Their formats are covered below. -
batch_schema: If writing into a bucket's batch queue (ie HDFS) then this schema, which is in the same format as the generic writer configuration from the data schema. -
direct_output: this is a set of strings. By default Flume will either write into the batch queue (HDFS) for buckets with"master_enrichment_type": "batch", or into the streaming queue (Kafka) for buckets with"master_enrichment_type": "stream". Instead one of more of the following fields can be specified instead:-
"batch": writes into the batch queue as per the default (but regardless ofmaster_enrichment_type) -
"stream": writes into the streaming queue as per the default (but regardless ofmaster_enrichment_type) -
"search_index_service": bypasses any enrichment and writes directly into the Search Index data service (eg Elasticsearch) -
"storage_service": bypasses any enrichment and writes directly into the Storage service (eg HDFS)
-
If the data received from the input is already in some sort of JSON format, then the JSON format should be used:
{
"enabled": boolean,
"json_policy": "body"|"body_plus_headers"|"event"|"event_no_body",
"include_body_with_name": boolean
}
Where:
-
enabled: if present andfalse, then this block is ignored as if it weren't present at all -
json_policy: a flume Event consists of a text block, together with a map of "headers" (strings only, no complex objects). This field determines how the JSON object is constructed:-
"body": The body is serialized into a JSON object (ie has to be in"{ "field": value, ... }"format); headers are discarded -
"body_plus_headers": As above, but each header field is inserted into the serialized body object -
"event": The headers and their values form the JSON objects. The body of the event is inserted with the fieldname given byinclude_body_with_name, or"message"by default). -
"event_no_body": as above, but the body is discarded.
-
(Note than line-separated JSON can be natively ingested by Aleph2 in batch mode, though does require a harvester or external process to place it in the import/ready directly)
If the data is in some line-oriented token-separated format, such as CSV or TSV, then the csv output block should be used:
{
"enabled": true|false,
"separator": string,
"quote_char": string,
"escape_char": string,
"header_fields": [ string ],
"ignore_regex": string,
"non_string_types": { string: string },
"non_string_type_map": { string: [ string ] },
"append_event_fields": [ string ]
}
Where:
-
enabled: if present andfalse, then this block is ignored as if it weren't present at all -
separator: must be a single character, defaults to","- the separator token -
quote_char: must be a single character, defaults to"\""(") - can be used to quote fields that contain the seperator token -
escape_char: must be a single character, defaults to"\\"(\) - will escape special characters like quotes and separators (see above) -
header_fields: a list of columns/field names - the output JSON object is built up by using these fields as keys, with the value at the same index in the token-separated line.- Any value not corresponding to a header field (eg at a higher index) is discarded.
-
ignore_regex: a regex (must match entire string) that if it matches will result in that line being dropped.- For example if there are header/column list lines that start with
#then"ignore_regex": "^#.*"could be used.
- For example if there are header/column list lines that start with
-
non_string_types: an optional map of strings that map the header fields to non-string types- The following types are supported:
"int","long","boolean","double","hex","date" - So for example,
"non_string_types": { "test_field": "long" }would store the the field"test_field"as a long in the result JSON object, eg100instead of"100".
- The following types are supported:
-
non_string_type_map: an alternative way of expressingnon_string_types. In this case, the key is the type, and then the value is a list of field names- Eg
non_string_type_map: { "long": [ "test_field" ] }would be equivalent to the previous example
- Eg
-
append_event_fields: an optional list of fields from the Flume event header - these fields, if they exist are copied into the JSON after the CSV processing.
Note that there is no Flume connector for XML, but XML can be efficiently ingested directly into the batch enrichment queue as described here. This will require that an external process or harvester place it in the 'import/ready' queue.
In most cases it will not be necessary to apply global configuration, but the following fields are configurable per shared library:
{
"flume_config_path": string, // defaults to "/etc/flume/conf"
"flume_service_path": string // defaults to "/usr/hdp/current/flume-server/bin/flume-ng"
}
Where:
-
flume_config_path: the path on the system whether the Flume default configuration is stored -
flume_service_path: the path on the system where the binary (`"flume-ng") resides