Analytics and enrichment technology overrides (Spark) - IKANOW/Aleph2 GitHub Wiki
The Spark analytic engine is currently a slightly hybrid component. Unlike Hadoop and Spark, it does not currently implement the full batch or streaming enrichment interface.
It is however a modular engine that can run user executable Java/Scala code, including enrichment modules, this is described below, under "Spark - Processing".
It can also run in either streaming or batch mode (the streaming mode is not finished/tested yet).
Spark re-uses the Hadoop Input formats documented here, including all of the same technology overrides.
Modules can also programmatically use SparkTechnologyUtils.buildBatchSparkSqlInputs
to access the JSON as SQL via the DataFrame
interface. An example of this is here.
There are currently no technology overrides available for SQL inputs.
The Spark integration is quite flexible and the configuration is therefore somewhat complex. This documentation will therefore provide the full schema for reference but then break it up into smaller related chunks of configuration.
The full analytic thread configuration for Spark is (where an actual value is given, it is mandatory and the only value supported):
{
"analytic_technology_name_or_id": "/app/aleph2/library/spark_technology.jar",
"analytic_type": string, // ("batch" or "stream")
"lock_to_nodes": true,
"config": {
"cluster_mode": "yarn-cluster", //("local", "local[<N>", "local[*]", "yarn-client" not supported yet)
"entry_point": string,
"enrich_pipeline": [ {...} ],
"language": string, // ("jvm" (default), "python"; "js", "r" not supported yet)
"script": string,
"job_config": { ... },
"include_job_config_in_spark_config": true|false,
"spark_config": { ... },
"system_config": { ... },
"external_files": [ string ],
"external_lang_files": [ string ],
"external_jars": [ string ],
"uploaded_files": [ string ],
"uploaded_lang_files": [ string ]
},
//(usual params:)
"name": string,
"module_name_or_id": string,
"library_names_or_ids": [ string ],
"inputs": [ { ... } ],
"output": { ... }
}
There's a lot to take in here, this paragraph provides a general outline and the parameters are described in more detail under the module settings:
-
analytic_type
: if"stream"
then the module will be immediately executed and will run until the module exits (see below for further details)- (currently the Kafka input type needed to make Spark a full streaming enrichment capability has not been wired in)
-
config.entry_point
: in"jvm"
mode, specifies the name of the module to be run (see below), note unless the module is built in (again, see below) then the JAR containing it should be specified inmodule_name_or_id
(with additional dependencies inlibrary_names_or_ids
)- (the
"python"
case is described below, under "Spark - Built in processing module - Python")
- (the
-
config.enrich_pipeline
: Specify an enrichment pipeline. By default this is not executed, but the functionality can be accessed via theEnrichmentPipelineService
- examples of this can be seen in the currently incompleteBatchEnrichmentPipelineTopology
- The JARs of any non-built-in enrichment modules should be added to
library_names_or_ids
- The JARs of any non-built-in enrichment modules should be added to
-
config.language
: defaults to"jvm"
, in which case it runs the"main"
of the specifiedentry_point
. Other options:-
"python"
: this is described under "Spark - Built in processing module - Python" -
"js"
: not currently supported, but note it can be approximated by includingJsScriptEngineService
enrichment modules and then calling the pipeline viaBatchEnrichmentPipelineTopology
(when complete) or currently viaSparkScalaInterpreterTopology
-
"r"
: not currently supported
-
-
config.script
- an arbitrary string passed into the modules (eg specifies the python to run in"python"
mode, the scala to run withSparkScalaInterpreterTopology
etc. - Various configuration objects:
-
config.job_config
: An arbitrary JSON object that is accessible to the modules either via the usualIAnalyticsContext
route or (ifconfig.include_job_config_in_spark_config
is set), viaSparkConf
/SparkContext
, as the JSON string with namespark.aleph2_job_config
. -
config.spark_config
- key/value pairs made available inSparkConf
/SparkContext
- dot notation is supported in the keys but with.
replaced with:
(note: must startspark:
)- (these parameters are inserted as
--conf <key>=<value>
into the Spark job command line)
- (these parameters are inserted as
-
config.system_config
- for advanced use, these are inserted into the Spark job command line in the format<key1> <value1> ... <keyN> <valueN>
-
- Various internal and external assets, the files specified here are all available in the classpath, though with slight differences depending on the
language
:-
config.uploaded_files
- Adds files in the Aleph2 shared library to the classpath -
config.uploaded_lang_files
- In python/R mode this allows.egg
,.py
,.r
etc files to be included in the python/R executable. -
config.external_files
- likeuploaded_files
except these are on the local file system where the job is launched (ie they must either be present on all nodes in the Aleph2 cluster, ornode_list_rules
must be specified to restrict the nodes, or they must be in a shared file system accessible to all nodes, etc) -
config.external_jars
- likelibrary_names_or_ids
except these are on the local file system where the job is launched (ie they must either be present on all nodes in the Aleph2 cluster, ornode_list_rules
must be specified to restrict the nodes, or they must be in a shared file system accessible to all nodes, etc) -
config.external_lang_files
- likeuploaded_lang_files
except these are on the local file system where the job is launched (ie they must either be present on all nodes in the Aleph2 cluster, ornode_list_rules
must be specified to restrict the nodes, or they must be in a shared file system accessible to all nodes, etc)
-
Spark has a pyspark
mode, that allows Python developers to write code that runs in Spark. Note that the performance is significantly worse than running in the JVM.
The following configuration executes the specified Python script inside Spark:
{
"analytic_technology_name_or_id": "/app/aleph2/library/spark_technology.jar",
"analytic_type": string, // ("batch" or "stream")
"lock_to_nodes": true,
"config": {
"cluster_mode": "yarn-cluster",
"entry_point": string,
"language": "python",
"script": string,
"job_config": { ... },
"include_job_config_in_spark_config": true,
"spark_config": { ... },
"system_config": { ... },
"external_files": [ string ],
"external_lang_files": [ string ],
"uploaded_files": [ string ],
"uploaded_lang_files": [ string ]
},
"name": string,
"inputs": [ { ... } ],
"output": { ... }
}
Where the parameters are as above, except:
- There are three options to specify the script to execute:
- If
config.entry_point
is set, then it must point to an external python file on the local file system where the job is launched (ie they must either be present on all nodes in the Aleph2 cluster, ornode_list_rules
must be specified to restrict the nodes, or they must be in a shared file system accessible to all nodes, etc). - If
config.script
ends.py
then it is assumed that a.zip
or.egg
file attached viaconfig.uploaded_lang_files
orconfig.external_lang_files
contains the specified path, and that file is executed as a script. - Else
config.script
is treated as a python script to be executed.
- If
Regardless of how the script is specified, it should have the following format:
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from aleph2_driver import Aleph2Driver
if __name__ == "__main__":
sc = SparkContext(appName="JOB_NAME")
aleph2 = Aleph2Driver(sc, sys.argv[1])
# code, see API below, eg:
# aleph2.getAllRddInputs(sc).map(lambda j: aleph2.externalEmit('/other/bucket', j),count
Aleph2 provides the following API:
-
Aleph2Driver(sc, signature)
- builds an Aleph2 driver-
sc
is the Python Spark context -
signature
is the analytics context signature, which is passed into the python script assys.argv[1]
-
-
Aleph2Driver(sc, signature, test_signature)
- builds an Aleph2 driver for a test run of an analytic- (arguments the same as above, except)
-
test_signature
is a signature that builds theTestSpecBean
object that defines test-specific parameters, which is passed into the python script assys.argv[2]
(for test jobs only)
-
getRddInputNames(self)
- returns an array of the input names specified in theinputs
field of the analytic job -
getRddInput(self, sc, name)
- returns the Python version of RDD corresponding to the inputname
- (each record is a Python dictionary)
-
getAllRddInputs(self, sc)
- returns the union of all the specified inputs as a Python RDD -
emitRdd(self, rdd)
- sends all objects in the inputrdd
to the Aleph2 data store -
externalEmitRdd(self, path, rdd)
- sends all objects in the inputrdd
to the specified other bucket (specified bypath
)- Note that the
path
referenced above should be in the bucket's rootexternal_emit_paths
list of paths/globs
- Note that the
-
emitObject(self, obj)
- A per-object call that can be put into closures/lambdas and that stores the specified Python dictionary (obj
) in the Aleph2 data store -
externalEmitObject(self, path, obj)
- A per-object call that can be put into closures/lambdas and that sends the specified Python dictionary (obj
) to the specified other bucket (specified bypath
)- Note that the
path
referenced above should be in the bucket's rootexternal_emit_paths
list of paths/globs
- Note that the
Note that Python currently has no built-in Aleph2 logging, as well as no logging method in the Python driver (one should be added here, an easy change).
This is mostly an illustrative module to show developers how to use the Spark SQL capability in their own modules.
The configuration looks like:
{
"analytic_technology_name_or_id": "/app/aleph2/library/spark_technology.jar",
"analytic_type": "batch",
"lock_to_nodes": true,
"config": {
"cluster_mode": "yarn-cluster",
"entry_point": "com.ikanow.aleph2.analytics.spark.assets.SparkSqlTopology",
"script": string,
"spark_config": { ... },
"system_config": { ... }
},
"name": string,
"inputs": [ { ... } ],
"output": { ... }
}
Where:
-
config.script
is the SQL command to execute - the resulting data frame is converted to JSON and output - The following Aleph2-specific
config.spark_config
is supported:-
spark.aleph2_subsample
- a floating point number between 0 and 1, defines what % of records are output -
spark.aleph2_subsample_test_override
- as above, but only applied in test jobs
-
Note that the SQL module currently has no built-in Aleph2 logging.
This is mostly an illustrative module to show developers how the outline Spark capability works. It can be used in the usual passthrough scenarios though.
The configuration looks like:
{
"analytic_technology_name_or_id": "/app/aleph2/library/spark_technology.jar",
"analytic_type": "batch",
"lock_to_nodes": true,
"config": {
"cluster_mode": "yarn-cluster",
"entry_point": "com.ikanow.aleph2.analytics.spark.assets.SparkPassthroughTopology",
"spark_config": { ... },
"system_config": { ... }
},
"name": string,
"inputs": [ { ... } ],
"output": { ... }
}
Where:
- The following Aleph2-specific
config.spark_config
is supported:-
spark.aleph2_subsample
- a floating point number between 0 and 1, defines what % of records are output -
spark.aleph2_subsample_test_override
- as above, but only applied in test jobs
-
Note that the Passthrough module currently has no built-in Aleph2 logging.
This module enables an enrichment pipeline topology to be executed within Spark. Unlike in Hadoop, the topology can run different processing vs different inputs, multiple grouping stages, etc.
The configuration is as follows:
{
"analytic_technology_name_or_id": "/app/aleph2/library/spark_technology.jar",
"analytic_type": "batch",
"lock_to_nodes": true,
"config": {
"cluster_mode": "yarn-cluster",
"entry_point": "com.ikanow.aleph2.analytics.spark.assets.BatchEnrichmentPipelineTopology",
"spark_config": { ... },
"system_config": { ... },
"enrich_pipeline": [ {...} ]
},
"library_names_or_ids": [ string ],
"name": string,
"inputs": [ { ... } ],
"output": { ... }
}
Where:
-
enrich_pipeline
- is a list of enrichment modules. Thedependencies
can either point to another module'sname
, or$previous
, or an input'sname
-
library_names_or_ids
- should include any JARs whose enrichment modules are specified inenrich_pipeline
The implementation of this capability is not currently complete. The capability can be manually reproduced for now using the SparkScalaInterpreterTopology
module, with scripting gluing the specified modules together.
As for batch enrichment, the implementation of this capability is not currently complete.
Configuration/usage as above, except:
-
analytic_type
should be"streaming"
- the
onObjectBatch
method of theIEnrichmentBatchModule
is called every N seconds with whatever data is available (including none)- (N cannot currently be changed from the default - see note about implementation not being complete)
- the
onStageComplete
method of theIEnrichmentBatchModule
is never called
-
SparkScalaInterpreterTopology
- write Scala scripts that are executed against the input, scripts can run any included batch enrichment pipeline elements
This section gives a brief outline of the JVM API used to build Aleph2 Spark modules, for more details checked the source code of the linked modules above.
The basic idea of developing a JVM module is:
- Write a class with a static
main(String[] args)
- (The main is invoked by the Spark client process with
args[0]
the analytics context signature, andargs[1]
the test signature if the job is running in test mode)
- (The main is invoked by the Spark client process with
- The analytics context needs to be built, as follows:
final Tuple2<IAnalyticsContext, Optional<ProcessingTestSpecBean>> aleph2_tuple = SparkTechnologyUtils.initializeAleph2(args);
- Optionally for test jobs, to ensure the Spark job exits after the designated period:
SparkTechnologyUtils.registerTestTimeout(aleph2_tuple._2(), () -> System.exit(0));
- The Spark context is created as per usual:
SparkConf spark_context = new SparkConf().setAppName("MODULE_NAME");
try (final JavaSparkContext jsc = new JavaSparkContext(spark_context)) {
- Inside the
try
/finally
routine:- Build a map of input RDDs:
final Multimap<String, JavaPairRDD<Object, Tuple2<Long, IBatchRecord>>> inputs = SparkTechnologyUtils.buildBatchSparkInputs(context, test_spec, jsc, Collections.emptySet());
- (the set at the end is a collection of input names to exclude from the input building)
- Or
SparkTechnologyUtils.buildBatchSparkSqlInputs
, which builds DataFrames instead - Or
SparkTechnologyUtils.buildStreamingSparkInputs
, which connects to Kafka queues- (Not yet completed/tested)
- Or
- Then perform whatever Spark logic is desired on the RDDs. Note that the
IAnalyticsContext
is serializable so it can be used in closures.
- Build a map of input RDDs:
Modules have their own specific Aleph2 logging, and in addition, the follow common log messages are generated:
-
subsystem
:SparkTechnologyService.<name>
,command
:startAnalyticJobOrTest
,level
:INFO
- Issued when a Spark client is successfully launched. -
subsystem
:SparkTechnologyService.<name>
,command
:startAnalyticJobOrTest
,level
:ERROR
- Issued when a Spark client is launched but errors. -
subsystem
:SparkTechnologyService.<name>
,command
:startAnalyticJobOrTest
,level
:ERROR
- Issued when a Spark module fails (ie an error occurs in the Aleph2 system before it tries to launch the module).
This configuration can be set as part of the shared library JSON schema as described here.
{
"spark_home": string,
"config": { ... }
}
Where:
-
spark_home
is the location of the Spark scripts and libraries (defaults to"/usr/hdp/current/spark-client/"
) -
config
is a set of Spark parameters that are always applied to every job