General Analytics Overview - IKANOW/Aleph2 GitHub Wiki
All buckets (*) can contain an analytic thread, which can do one of three things:
- provide a more flexible and powerful enrichment capability for data received from the harvester
- take selected data from existing buckets (or external data sources) and combine/transform/analyze them in various ways, persisting the output(s) in this bucket's storage
- (a combination of the above two)
(*) Note that this isn't quite true, there are currently some technical limitations - buckets cannot currently have both enrichment and analytic threads, analytic jobs must have the same lock_to_nodes
true
/false
attribute (see below), and if harvesters are present then analytic jobs must have the default (false
) lock_to_nodes
attribute
This is the top-level "analytic bucket" JSON:
{
"analytic_thread": {
"enabled": true|false,
"trigger_config": { ... },
"jobs": [
{ ... },
]
},
// other standard bucket fields
}
Each analytic thread consists of two things:
- A set of "analytic jobs" that typically represent a single distributed job or process, with fully or partially persistent inputs and outputs (see below)
- (note that in-memory processing pipelines will often be nested within jobs, eg batch enrichment (todo link) is one example; spark or storm topologies another example todo somewhere describe 3 levels of nesting)
- A set of triggers that determines when an analytic thread activates, as described below
Analytic buckets (ie buckets containing an analytic thread) can be in one of three states:
- Suspended: when the containing buckets is suspended, no jobs will be active (and running jobs will be stopped as soon as possible)
- Published: Batch jobs are ready to run but have not yet been triggered so are inactive. Note that streaming jobs are always active once the bucket is published
- Activated: The bucket triggers have fired, so one or more of the batch jobs might be active. The actual job-level active/inactive status is a function of each job's dependencies. Once no jobs are inactive, the bucket status moves back to "Published".
The remainder of this page describes batch jobs, streaming jobs, and triggers.
Triggers are important because they control when jobs run. They can be very simple or very complicated. This section focuses on simple triggers, the more complex cases are covered in a later section.
The simplest trigger schema is described below:
{
"analytic_thread": {
"trigger_config": {
"enabled": true|false,
"auto_calculate": true|false,
"schedule": string
},
// (other analytic thread fields, "jobs" and "enabled"
},
// other standard bucket fields
}
The above trigger_config
schema contains the following fields:
-
enabled
: this object is ignored if set tofalse
. If there is no/disabledtrigger_config
object then the bucket is activated once published, but will not then re-active unless the bucket is re-published. -
auto_calculate
: if this is set totrue
(the default) then the bucket will activate when there is data in all of the external inputs (see below for information on how inputs are specified). For more complex input criteria there is thetrigger
attribute, which is described below. Iffalse
then will always active everyschedule
period (see below).-
(Note that the triggering for internal bucket inputs (eg
search_index_service
) is not currently wired up, create an issue if this is needed, it is only a few lines of code. Until then such triggers will never fire).
-
(Note that the triggering for internal bucket inputs (eg
-
schedule
: A human readable time or schedule (eg"10 min"
, every 10 minutes;"Wed 3pm"
, every Wednesday at 3pm;"2018-01-01 12:00"
, a specific one-off date) when the trigger will be checked ... or if there is no trigger specified (ieauto_calculate
missing orfalse
and notrigger
attribute) when the bucket will be activated. Note that ifschedule
is not specified but the root level bucketpoll_frequency
is specified then thepoll_frequency
will be used instead.
It is difficult to get a neat overview of analytic jobs because what they can do is so varied and flexible.
Examples of batch analytic jobs:
- Running a single (distributed) Hadoop MR job
- Running a (distrubted) Spark (eg Machine Learning) topology that will iteratively run complicated algorithms until some completion criteria is satisfied
- Running a standalone process or script to do anything a user wants (based on their access privileges), eg could be running the Gephi command line application to layout a huge (100K nodes+) graph network.
The point is that the core system acts as an orchestration engine for the core system
The analytic job JSON looks like:
{
"name": string,
"enabled": true|false,
"analytic_type": "batch",
// Which technology to run
"analytic_technology_name_or_id": string,
"entry_point": string,
// What functionality is provided within that technology
"module_name_or_id": string,
"library_names_or_ids": [ string ],
// Configuration that drives the analytic technology behavior for this bucket
"config": { ... },
// Inputs:
"dependencies": [ string ],
"global_input_config": { ... },
"inputs": [ { ... } ],
// Output:
"output": { ... },
// Distribution control
"node_rules_list": [ string ],
"multi_node_enabled": true|false,
"lock_to_nodes": true|false
}
The above schema has the following fields:
-
name
: A unique name (within the bucket) for the analytic job (alphanumeric/_
/.
only) used by the Java API, by dependencies, and for other buckets'/jobs' inputs (to retrieve intermediate input). -
enabled
(optional): Iffalse
then the job is ignored as if it wasn't present. -
analytic_type
:"batch"
for batch jobs,"streaming"
for streaming jobs. - The next 2 fields define what the "umbrella" technology is (eg Spark/Hadoop/user process etc)
-
analytic_technology_name_or_id
: Is either the path of the shared library (TODO link), or can be"BatchEnrichmentService"
(see below, under "using the enrichment subsystem") -
entry_point
(optional): normally can be left blank because it's part of the shared library specification, but if the analytic technology specified above has a number of different types of analytic technology, then theentry_point
can point to theIAnalyticTechnologyModule
class path (eg"com.ikanow.aleph2.analytics.SparkTechnologyService"
), which is guaranteed to be on the system classpath.
-
- The next 2 fields allow user code to be executed:
-
module_name_or_id
(optional): The shared library containing the user code (if any). The JAR inside this shared library is guaranteed to be in the system classpath.- (note that the equivalent of the
entry_point
field needs to be plumbed into theconfig
field)
- (note that the equivalent of the
-
library_names_or_ids
(optional): More shared libraries needed by the job. The only difference compared tomodule_name_or_id
is that the JVM API makes it easier to access the JSON within the shared library.
-
- These are the most critical fields:
-
config
(optional): An arbitrary JSON object that is passed into the technology and is used to control the behavior of the technology and any user components, is not interpreted by the core at all, ie its format is defined by the combination ofanalytic_technology_name_or_id
and optionallymodule_name_or_id
. -
dependencies
: A list of strings corresponding toname
s of other jobs in the same bucket - this job will only run once all of the dependent jobs are complete. If a job does not have anydependencies
then it starts as soon as the bucket is activated. -
inputs
(optional): defines the inputs that the analytic technology will have access to. Its flexible format is described below. -
global_input_config
(optional): sets default parameters applied to allinput
objects unless theinput
has that field set. Format described below. -
output
: defines what happens to data emitted from the analytic job (by the Java API). Its format is described below.
-
- Finally, there are some less commonly used fields that define how jobs are distributed. They are less common because typically the underlying technology is distributed (eg a YARN job like Hadoop).
-
multi_node_enabled
: (like the bucket (harvest) parameter, currently not supported) -
lock_to_nodes
: (like the bucket (harvest) parameter) if true (defaults to false), then each bucket is associated permanently with the first node to run it. This is necessary if running a persistent external process. Note that all jobs in the same bucket must have the samelock_to_nodes
value, and it must befalse
if the bucket also has a harvester. -
node_list_rules
: (like the bucket (harvest) parameter) This job will only be run on the nodes that match these rules (XXX LINK). Jobs with the same analytic technologies can have differentnode_list_rules
, but those jobs will only run on the intersection.
-
Batch analytic inputs can cover the following:
- Access the bucket's batch input in the storage service (ie files in the bucket's HDFS "import" directory)
- Access one of the data services (
search_index_service
,storage_service
,document_service
; or non-default services, egdocument_service.v1
is the V1 document bridge) for any bucket (for which the user has read permission) - Access an internal or external job's intermediate output (see under "output" section below)
The remainder of this section describes the different schema for the above cases.
Accessing the bucket's own batch input:
{
"enabled": true|false,
"name": string,
"resource_name_or_id": "",
"data_service": "batch"
}
With:
-
enabled
: iffalse
then it is as if the input does not exist -
name
: a unique name (within the job) - used for easy access to the input in the Java API -
resource_name_or_id
: can either be""
or the bucket path
Accessing a bucket's data service:
{
"enabled": true|false,
"name": string,
"resource_name_or_id": string,
"data_service": string,
"filter": { ... },
"config": {
"new_data_only": true|false,
"high_granularity_filter": true|false,
"time_min": string,
"time_max": string,
"test_record_limit_request": integer
}
}
Where:
-
enabled
: iffalse
then it is as if the input does not exist -
name
: a unique name (within the job) - used for easy access to the input in the Java API -
resource_name_or_id
now points to the bucket path of the desired bucket. Currently globbing is not supported, though it will be in the future.- For
"data_service": "storage_service"
(see below), thenresource_name_or_id
can have one of the following three suffixes:-
":raw"
: the raw data (eg binary/CSV etc), if avaiable, before it has been converted to JSON -
":json"
: the raw JSON, before any enrichment (if any) has occurred -
":processed"
: (the default), if available, the same JSON that is stored in the document db/search index, ie after passing through that bucket's enrichment pipeline
-
- For
"data_service": "document_service.V1DocumentService"
(see below), thenresource_name_or_id
is in the format"/aleph2_external/<community-ids-separated-by-_>"
- (the prefix `"/aleph2_external/" is reserved for non-default data services like the one above, it means the user's access permissions are not checked internally but are delegated to the data service)
- For
-
data_service
: the following are always supported:"search_index_service"
,"document_service"
,"storage_service"
(others will come later), other non-default service are supported with the format"<data_service>.<service_name>"
, eg"document_service.V1DocumentService"
is a bridge to access V1 documents in MongoDB.-
"storage_service"
- reads from the bucket's HDFS "archive" (if enabled)- (as noted above, you can access any of the raw/json/processed versions of the data that are available)
-
"search_index_service"
- a Lucene searchable JSON version of the data objects, if enabled. -
"document_service"
- a key/value version of the data objects, if enabled.
-
-
filter
is a JSON object whose format depends on the data service being used:- For
"storage_service"
, it is not used. - For
"search_index_service"
: (longer term there will be a generic JSON search format based on theICrudService
Java API, in the meantime...)- For elasticsearch implementations (the default): use
{ "technology_override: string }
or{ "technology_override: { ... } }
, where:- if it is a string, then it is treated like an Elasticsearch URL param eg
"q=text"
,"q=field:text"
etc. - if it is a JSON object, then it is treated like an Elasticsearch query DSL.
- if it is a string, then it is treated like an Elasticsearch URL param eg
- For elasticsearch implementations (the default): use
- For
"document_service"
, it is not currently used. - For
"document_service.V1DocumentService"
, the V1 schema described here is used, except:- only the following extensions (fields starting with
$
) are supported:"$fields"
,"$splits"
,"$docsPerSplit"
,"$srctags"
,"$tmin"
,"$tmax"
- Instead of the fields starting with
$
they start with:
, eg:tmin
- only the following extensions (fields starting with
- For
-
config
has the following schema:-
new_data_only
: (not currently supported - on the near term roadmap) will only process data that it has not already processed, based on various timestamps declared in the data schema. -
high_granularity_filter
: iftrue
then will attempt to enforce thefilter
(and potentiallyconfig
) exactly, and will error on job creation if that isn't possible. Iffalse
then the follow-on analytics should not assume that the records match thefilter
/config
(trade-offs may be made between performance and false positives). If not present then either may be applied, so follow-on analytics should assume it behaves likefalse
. -
time_min
: A human readable string (eg "1 day") expressing the newest data that will be processed - note the granularity of this is (currently) thegrouping_period
in the storage schema (forstorage_service
), or in the temporal schema (for other data services) -
time_max
: A human readable string (eg "1 day") expressing the newest data that will be processed, same remarks apply as fortime_min
-
test_record_limit_request
: If running a test version of the bucket, then this limit is a requested limit on the amount of data to be returned. This request might only be partially satisfied (eg that many records per input split), but it can help reduce the amount of data processed during smaller tests.
-
Accessing semi-transient internal stages of internal and external jobs:
{
"enabled": true|false,
"name": string,
"resource_name_or_id": string,
"data_service": "batch",
"config": {
"new_data_only": true|false,
"time_min": string,
"time_max": string,
"test_record_limit_request": integer
}
}
Where:
-
enabled
: iffalse
then it is as if the input does not exist -
name
: a unique name (within the job) - used for easy access to the input in the Java API -
resource_name_or_id
is one of the following:-
"<job name>"
- retrieves the data from the transient output of the specified job (see below, under output) of this bucket. -
"<bucket path>:<job name>"
- retrieves the data from the transient output of the specified job (see below, under output) of the specified bucket, if owner of this bucket has read permission.
-
-
config
has the following schema:-
new_data_only
: (not currently supported - on the near term roadmap) will only process data that it has not already processed, based on various timestamps declared in the data schema. -
time_min
: A human readable string (eg "1 day") expressing the newest data that will be processed - note the granularity of this is (currently) thegrouping_period
in the storage schema (forstorage_service
), or in the temporal schema (for other data services) -
time_max
: A human readable string (eg "1 day") expressing the newest data that will be processed, same remarks apply as fortime_min
-
test_record_limit_request
: If running a test version of the bucket, then this limit is a requested limit on the amount of data to be returned. This request might only be partially satisfied (eg that many records per input split), but it can help reduce the amount of data processed during smaller tests.
-
The output format is relatively simple:
{
"preserve_existing_data": true|false,
"is_transient": true|false,
"transient_type": string,
"sub_bucket_path": string
}
Where
-
preserve_existing_data
: If this is set totrue
, then new data will be appended to the bucket. If this is set tofalse
, the data will be (atomically) overwritten each time the analytic job runs. -
is_transient
: If this is set tofalse
, then the output of this job is written into the bucket's data services according to however the data schema are set (eg into Elasticsearch and HDFS in a typical implementation withsearch_index_service
andstorage_service
enabled).- If instead set to
true
, then the data is instead written into a "temporary" HDFS store, which other jobs can then read (as described above under "Accessing semi-transient internal stages of internal and external jobs")
- If instead set to
-
transient_type
: Only used ifis_transient
istrue
. Currently only"batch"
is supported (once implemented, batch jobs will be able to sending their output into real-time queues of streaming jobs) -
sub_bucket_path
: Not currently supported. Once implemented will allow jobs in one bucket to write directly into the output of that bucket's sub-bucket (when"is_transient": false
).
Batch transient jobs go into the (HDFS) folder /app/aleph2/data/<bucket path>/managed_bucket/import/transient/<job name>/
, where they can be accessed via the input configurations by other jobs, or scripts and other external engines. (To get to the local file system, prepend /opt/hadoop-fileshare
(v2.5-) or /opt/hadoop-nfs
(v2.6+)).
Note that in addition to writing data out via these output
blocks, the enrichment module can call an "external emit" to any buckets that are in the root external_emit_paths
list of paths/globs. Objects are written in batches to (HDFS) /app/aleph2/data/<bucket path>/managed_bucket/import/ready
.
TODO link to batch enrichment section
TODO format
TODO passthrough