Batch Enrichment - IKANOW/Aleph2 GitHub Wiki
One of the most important components in an ETL flow is enriching and transforming the data received from the harvesters - in some cases (eg Logstash) these harvesters can perform their own transforms; in other cases (eg script harvester performing file copies) they can't; in some cases they can but it's not very easy/intuitive (eg Flume/morphlines). In most cases harvesters will not support more advanced transforms/enrichments such as Natural Language Processing or "target lookups".
In addition, it is often the case that analytic workflows are very similar to enrichment workflows, and therefore it is desirable to be able to perform enrichment activities as part of analytic threads.
The Aleph2 enrichment framework allows users to upload technology-independent functionality (or use built-in system functions such as a Javascript scripting module) and then apply/reuse them composably across a variety of different ETL and analytics workflows.
There are two types of enrichment:
- Batch - data accumulates in the storage service (eg distributed filesystem like HDFS) and then all available data is processed in a batch at user configurable timeouts.
- Stream - data is received record-by-record in a real-time queue and immediately processed.
The technologies used for batch and streaming enrichment are configurable, but currently there are only 2 implemented: Hadoop for batch enrichment and Storm for streaming enrichment.
This section describes how batch enrichment is configured.
Aleph2 has the concept of 2 different types of enrichment:
- A topology is a "standalone" workflow, in the sense that it only interacts with the Aleph2 system at the endpoints, there can be complicated processing (eg multiple modules) inside the "topology" implemented by the topology developers using whatever libraries they want and transparently to Aleph2.
- A pipeline is comprised of a set of small modules with the dependencies between them specified as part of the Aleph2 config. It is more restrictive but more modular and easier to re-use standalone subsets of functionality across many jobs.
Currently, streaming enrichment (via Storm) only supports the topology type, and batch enrichment (via Hadoop) only supports the pipeline type. (See the Spark analytic technology, which will eventually be promoted to support enrichment as well, for an example of a batch topology).
The general analytic capability in Aleph2 is quite powerful and the configuration can be quite complex. It is described here.
A simpler subset of the functionality can be more easily applied using the following root level bucket fields:
{
//...
"poll_frequency": string,
"master_enrichment_type": "batch",
"batch_enrichment_configs": [ {...} ]
}
With this configuration, the batch input directory (/app/aleph2/data/<bucket path>/managed_bucket/import/ready
) is checked periodically, based on the human readable string in poll frequency
(eg "5 minutes"
).
Whenever it contains data (either binary/text, in which case each file is treated like a seperate record; or line-separated JSON, in which case each file efficiently generates one record per JSON object), Aleph2 executes the processing workflow defined by batch_enrichment_configs
(see below) on that data and then removes it.
Note that the above bucket can either have a harvester defined (via the usual harvest_technology_name_or_id
/harvest_enrichment_configs
, which will typically take data from some external source and inject it into the import/ready
directory); or can have data copied in their manually; or can have data injected by calls to the "external emit" function by other analytic jobs/enrichment; or any combination of these three.
Each of the batch enrichment configuration elements that are inserted into the above batch_enrichment_configs
array have the following format:
{
"name": string,
"enabled": boolean,
"dependencies": [ string ],
"grouping_fields": [ string ],
"module_name_or_id": string,
"library_names_or_ids": [ string ],
"entry_point": string,
"config": { ... },
"technology_override": { ... }
}
Where:
-
"name"
: A unique name within the pipeline, has to be alphanumeric/_ -
"enabled"
: (Optional) Defaults totrue
, iffalse
then this is ignored. -
"dependencies"
: See below, under dependencies -
"grouping_fields"
: See below, under grouping - The "usual" set of ways of specifying what module is desired:
-
"module_name_or_id"
: The shared library containing the user code (if any). The JAR inside this shared library is guaranteed to be in the system classpath. Eithermodule_name_or_id
orentry_point
needs to be set. -
"library_names_or_ids"
: More shared libraries needed by the job. The only difference compared to module_name_or_id is that the JVM API makes it easier to access the JSON within the shared library. -
"entry_point"
: (optional): normally can be left blank because it's part of the shared library specification, but if the analytic technology specified above has a number of different types of analytic technology, then the entry_point can point to theIEnrichmentBatchModule
class path, which is guaranteed to be on the system classpath.
-
-
"config"
: An arbitrary JSON object that is passed into the enrichment module (in fact the entire root object is passed in). -
"technology_override"
: An arbitrary JSON object that can be interpreted by the- For example, the available Hadoop
technology_override
fields are described here.
- For example, the available Hadoop
The idea behind the dependencies
field is to provide 2 features:
- To enable the user to create an ordered pipeline of jobs, with each
- Where order doesn't matter, to enable the enrichment technology to run jobs in parallel
Each dependencies
field is a list of name
fields from the rest of the pipeline. A given pipeline element will not run until of these named pipeline elements have also run (which in turn won't run until their dependencies have run etc etc). A pipeline element with an empty dependencies
array (ie []
; not present is different, see below) will run on the input.
If not specified, then the dependencies
array defaults to "$previous"
, meaning the previous element in the array. Therefore the "default" of just adding elements to the batch_enrichment_configs
array in order provides a single pipeline processing each batch of data in order.
Currently the only batch enrichment pipeline technology (Hadoop) provides an incomplete implementation of this:
- All
dependencies
must either be not set or set to[ "$previous" ]
, ie forming a single pipeline.
A very common operation across most distributed processing frameworks is the "reduce" or "grouping" operation, and this is therefore baked into the batch_enrichment_configs
format.
If a pipeline element has a non-empty grouping_fields
array, then it is requesting that all the records which it processes are grouped by the specified fields (dot notation supported for nested fields, and if "?"
is used then the key is specified on a record-by-record basis by the user processing) - ie instead of a simple batch of records of a designated size, a stream of unknown size is provided to each batch call, guaranteed to have the same values of all the grouping ket fields.
Note that Hadoop (currently the only supported enrichment engine) can only handle one grouping operation per pipeline (though it can have arbitrarily many elements to the left/right of the element that has the grouping_fields
specified, corresponding to a Map-Map-..-Map-Combine-Reduce-Map-Map-...
type pipeline).
Note that Hadoop (currently the only supported enrichment engine) cannot have a grouping field on the first element in the pipeline (the Passthrough service can be used to work around that limitation.
One important thing to understand is how the "batch" methods (eg onObjectBatch
in Java or handle_batch_java
in JS) are called in grouping scenarios:
- The module that has the
grouping_fields
set is called once per unique keyset, with a stream of all records with that matching key, no "batching-by-size" - Modules after the grouping are called once per batch, with keyset ignored (ie a batch can have >1 keyset, and a records with a single keyset can be split across multiple batches)
Therefore any activities that require the keys be grouped should be performed in the grouped module, not in subsequent ones. Conversely, activities that perform better in batches (eg remote lookups) should be performed in subsequent modules (eg if there is 1-2 records per keyset, then you'll pay a much heavier lookup overhead penalty than if you can batch them into 100s/1000s). This is discussed further under Passthrough service and Deduplication service.