JsScriptEngineService: Javascript Prototyping Engine - IKANOW/Aleph2-examples GitHub Wiki
This module enables users to write or upload javascript snippets to perform enrichment, analytics, and business logic as part of batch enrichment or analytics jobs.
Currently it is only possible to access the data using the (performance optimized but occasionally ungainly) hybrid Java/JS API (See below). In the future there will also be a pure JS interface (which will be slightly slower but will enable more standard-looking/portable code).
There is currently no bucket validation performed at launch time.
Currently the JS prototyping engine is not safe for non-admin users on secure clusters. Full JVM based sandboxing and RBAC is on the roadmap, watch this space. In the meantime access to this enrichment engine should be restricted to admin users (by restricting the read rights of the uploaded JAR).
The following system logging subsystems are used (see also below under "Logging"):
- subsystem:
JsScriptEngineService, command:<name>.onStageInitialize, level:ERROR- errors that occur during the initialization. - subsystem:
JsScriptEngineService.<name>, command:<name>.onObject, level:ERROR- the first error to occur during runtime - subsystem:
JsScriptEngineService.<name>, command:<name>.onObject, level:DEBUG- every error to occur during runtime
The batch_enrichment_configs (or pipeline_element in analytic_threads.jobs.config) should have the following format:
{
"entry_point": "com.ikanow.aleph2.enrichment.utils.services.JsScriptEngineService",
"config": { /* object, see below */ },
"module_name_or_id": "/app/aleph2/library/enrichment_utils.jar",
"library_names_or_ids": [ /*strings, see below */ ]
}
The config parameter has the following format:
{
"script": /* string, see below */
"imports": [ /* strings, see below */ ],
"config": { /* object, see below */ },
"exit_on_error": true|false
}
Where:
- The
scriptfield is a Javascript code snippet that is evaluated when the module is loaded (together with the imports, see below)- Note that if one of the imports defines the callback function (with a custom name, eg
"handle_batch_java_DO_SOMETHING", then the script can be as simple asvar handle_batch_java = handle_batch_java_DO_SOMETHING;)
- Note that if one of the imports defines the callback function (with a custom name, eg
- The
importsis a list of JS files that should be in the classpath, eg in a JAR specified in one of the parentlibrary_names_or_idsfields. Each JS file is evaluated (before thescriptscript) when the module is loaded. - The
configobject is an arbitrary JSON object that is available to the script via the_a2.configvariable (see below) - If
exit_on_erroristrue(defaults tofalse) then any unhandled runtime exception in the batch processing causes the job to fail and exit (depending on the processing framework it might be retried on a different node), otherwise it is logged and ignored.
-
function handle_batch_java(json_stream, len, grouping_key)-
This script runs under the Nashorn JS engine (developer guide), all of the logic is pure Javascript, but objects can either be Java or JS - each object below lists which it is, and Java object methods can be called from within the JS.
-
This method must be implemented by the user (either in the script or one of the imports), and is called for each batch of objects, with the following parameters:
-
json_stream- is a (java) stream of (java)JsonNode(in factObjectNodes) - use.toArray()or.toIterator()to convert to an array or (JSON) iterator- (An expensive but easy way to convert to a stream of JS objects would be
json_stream.map(function(m) JSON.parse(m.toString())))
- (An expensive but easy way to convert to a stream of JS objects would be
-
len- may be null (if the size of the stream is unknown - this is typically the case whengrouping_keyis non-null), else is the size ofjson_stream -
grouping_key- if this module comes immediately after a grouping stage, this is a JSON object (represented by a (java)JsonNode) that defines which group this is (eg reduce key in a simple map/reduce architecture)
-
-
(Alternativey you can specify
handle_batch_java_record(json_stream, len, grouping_key), for which the only difference is thatjson_streamis the rawTuple2<Long, IBatchRecord>, which lets you access theinjectedboolean and also to get the content (IBatchRecord). -
To promote any object (whether received/mutated/created) to the next stage of the enrichment pipeline (or output it if this is the final stage), simply call
-
_a2.emit(json_to_promote) -
_a2.emit(json_to_promote, grouping_field) -
Where
json_to_promotecan be any of:- JS or Java string, in which case it is serialized in JSON (will error if not valid JSON)
- Java
ObjectNode, in which case it is sent (this is the fastest) - JS object, in which case it is converted into a Java
ObjectNode(this is the slowest) - The return value is a java
Validation<BasicMessageBean, JsonNode>meaning that on failure (return_value.isFail()), the error message can be accessed viareturn_value.fail().message().
-
And
grouping_fieldis an object of the same types as above, and can be used in "manual grouping" situations to set the grouping key for Hadoop reduces, Spark groups etc.
-
-
Similarly, the method
*
_a2.externalEmit(bucket_full_name, json_to_promote)will emit the specified JSON object into the input of the specified bucket (defined by itsfull_namefield). * Note that thebucket_full_namereferenced above should be in the bucket's rootexternal_emit_pathslist of paths/globs * The return value is a javaValidation<BasicMessageBean, JsonNode>meaning that on failure (return_value.isFail()), the error message can be accessed viareturn_value.fail().message().
* Don't forget to set the top levelallowed_external_pathswhen externally emitting data objects.
-
A number of other variables are accessible from the _a2 object:
-
_a2.context: the (Java)IEnrichmentModuleContextinterface that enables all interaction with the Aleph2 system -
_a2.config: a native JSON object that is the job-specific configuration passed to the system -
_a2.bucket: a (Java)DataBucketBeanobject that is the entire bucket being processed -
_a2.logger: a (Java)IBucketLoggerobject that provides full access to the powerful logging infrastructure (see below for details, under "Logging") -
_a2_global_mappera (Java)ObjectMapperthat allows (Java) JSON objects to be created, eg_a2_global_mapper.createObjectNode()
In addition to _a2.emit and _a2.externalEmit there are two other utility functions:
-
_a2.to_json(java_jsonnode): Converts a (Java) JsonNode into a native JSON object (by ser/desr - ie this is not especially efficient). -
_a2.list_to_js(java_list): Converts a Java list into a JS list (usingJava.from- note this didn't work in all cases)- To create an array of native JSON we did the following (not very efficient but simple - note this places all the objects in memory so should only be done for streams of known size, otherwise the toArray can be removed to have a (java) stream of native JSON objects):
json_stream.map(function(m) { return JSON.parse(m.toString()); }).toArray()
- To create an array of native JSON we did the following (not very efficient but simple - note this places all the objects in memory so should only be done for streams of known size, otherwise the toArray can be removed to have a (java) stream of native JSON objects):
-
_a2_bucket_log(level, msg)- a simple API for (relatively inefficient) logging - see below for details, under "Logging"
It is possible to write centralized logs into the ES logging service. This can be done in two ways:
- There's a simple API call
_a2_bucket_log(level, msg)-levelshould be the Log4j2 level (don't forget to eitherimportthe package (org.apache.logging.log4j.Level) or specify the fully qualified name (egorg.apache.logging.log4j.Level.ERROR). Themsgfield is just a standard JavaString.- (Note: no
_a2.at the start of the command) - Alternatively, the following pre-built versions are available:
_a2.log_trace(msg)_a2.log_debug(msg)_a2.log_info(msg)_a2.log_warn(msg)_a2.log_error(msg)
- (Recall that the level of the captured logs can be configured from the
management_schema.logging_schematop level construct, see TODO)
- (Note: no
- For more a more complex but more powerful logging API, the full Java
IBucketLogger(javadocs, design docs) can be accessed via_a2.logger- The java API makes heavy use of "supplier" lambdas, eg
() -> "my message". These can be generated in the Nashorn JS engine asfunction() return "my message"(note: no{}s).
- The java API makes heavy use of "supplier" lambdas, eg
The JS API user logging comes out with subsystem set to JsScriptEngineService.<enrichment_element_name> and command set to <enrichment_element_name>.handle_batch_java, with <enrichment_element_name> set to no_name if not specified. If the full Java API is used, then the subsystem and command fields are at the discretion of the user. (Though it is recommended to use the same schema for consistency).
In addition, system logging is generated as described above under "Overview".
In order to make it easier to debug JS scripts, write production level unit-tested code, etc. There is a (currently very basic) Java-based test harness available
You will need the following projects imported into eclipse:
-
aleph2_data_model(from Aleph2 repo) -
aleph2_enrichment_utils(from Aleph2-examples repo)- create a directory (which is git-ignored) under
aleph2_enrichment_utils, calledexample_scripts
- create a directory (which is git-ignored) under
- Create a run configuration for the class
JsScriptEngineTestService(under the packagecom.ikanow.aleph2.enrichment.utils.service,srcfolder)- eg just right click and "Run as Java Application" which will error due to missing command line arguments, but can then be edited from right click and "Run Configuration..."
- Add the following command line arguments:
- eg
example_scripts/test_script.js- the script containing thehandle_batch_javamethod under test - eg
./example_scripts/test_in.json- a line separated JSON file contain the stream of JSON objects that will be handled by thehandle_batch_javamethod under test - eg
./example_scripts/test_out_: the prefix (including path) for the following output files:-
<PREFIX>emit.json- the JSON objects directly emitted with no grouping key -
<PREFIX>group.json- the JSON objects directly emitted with a grouping key -
<PREFIX>external_emit.json- the JSON objects externally emitted, with the field__a2_bucketadded containing the path of the bucket to which the JSON was emitted
-
- So for example:
./example_scripts/test_script.js ./example_scripts/test_in.json ./example_scripts/test_out_ "{\"len\": 4, \"group\": { \"key\": \"test\"} }"
- eg
Note that not all of the _a2 API is currently plumbed in (contact me to get things added):
-
_a2.bucketand_a2.configare not filled in -
_a2.global contextis not filled in