Analytics and enrichment technology overrides (Hadoop and Storm) - IKANOW/Aleph2 GitHub Wiki

Hadoop

Hadoop - Inputs

Hadoop - Inputs - HDFS

It is not currently possible to override any of the input settings when using the enrichment syntax, and only JSON can be imported. However when using the analytic thread syntax, input settings can be overridden, and XML is supported.

The technology_override object format is as follows (this goes under filter in analytic_thread.jobs.inputs):

{
   "parsers": [
   {
      "xml": {
         "file_pattern": string,

         "root_fields": [ string ],
         "attribute_prefix": string,
         "ignore_fields": [ string ],
         "preserve_case": boolean,

         "primary_key_prefix": string
         "primary_key_field": string

         "set_id_from_content": boolean,
         "id_field": string,
         "xml_text_field": string
      }
   }
   ]
}

Where

  • For XML (xml):
    • file_pattern: a regular expression (defaults to "^.*.xml$") - this parser is only applied if the path name matches this regix.
    • root_fields: If specified, then a list of XML fields which will trigger a record being extracted from the XML stream. If not specified, then will assume the format is <?> <??>RECORD<??>...<??>RECORD<??> </?>
    • attribute_prefix: (If not specified, then attributes are ignored) A string that is used to prefix attributes as they are converted to JSON fields.
    • ignore_fields: If specified, then any fields are removed (all their children are promoted one level). This is useful because XML often uses the <record><elements><element>...<element><elements><records> format
      • For example in the above format, "ignore_fields": ["elements"] would simplify the incoming streaming to <record><element>...<element></record>, which would then map to the JSON '{ "element": [ ... ] }'
    • preserve_case: If false (default true) then all the elements are converted to lower case.
    • primary_key_prefix: If this and the primary_key_field are both specified then the "_id" field (or id_field, see below) in the generator JSON is set to primary_key_prefix + RECORD[primary_key_field]
    • primary_key_field: See above.
    • set_id_from_content: If set to true (default false) then the id field ("_id"/id_field) is set to the UUID hash of the XML content (allows for deduplication based on content where there is no good unique field-based id in the XML itself)
      • WARNING: this is not very high performance
    • id_field: Defaults to "_id"; can be overridden, in which case the above set_id_from_content or primary_key_prefix/primary_key_field write into this field instead.
    • xml_text_field: If specified then the entire XML string is inserted into this field (allows for subsequent XML-based parsing/display etc)

Note from the above format that is possible to specify multiple parsers of different types (eg different XML formats) differentiated by the file_pattern regex. If a file does not match any of the specified parsers (including the implicit ".json" which assumes newline-separated JSON objects), then it is treated as binary/text, ie one record per file.

If multiple parsers all hit, only the first match is used.

Hadoop - Inputs - Elasticsearch (search_index_service/document_service)

Described under general analytics.

Hadoop - Inputs - MongoDB (document_service.v1)

Described under general analytics.

Hadoop - Processing

In the enrichment_meta objects (Enrichment) or the analytic_thread.jobs.config.enrich_pipeline objects (Analytics), the technology_override field can be used as follows:

{
   "num_reducers": integer,
   "use_combiner": boolean,
   "requested_batch_size": integer,
   "config": { ... }
}

Where:

  • "num_reducers": the number of reducers to run, if not specified then the cluster default is used.
    • (should only be specified once per pipeline - which setting is used if multiple are set is not defined")
  • "use_combiner": if grouping then whether to apply a combiner and reducer (true), or just a reducer (false, default)
    • (as above, can only be specified once per pipeline)
  • "requested_batch_size": requests the optimum batch size (ie number of records to be passed to each batch request) - note this is not guaranteed to be granted
  • "config": Hadoop parameters (eg "mapred.task.timeout") to override where allowed (by user and system permissions).
    • Note the "."s are replaced by ":"s to workaround certain DBs' key issues, eg would actually use "mapred:task:timeout" in the above example)
    • (Note it is not possible to override parameters set as "final" in the XML/Ambari/etc configuration - contact your system administrator for details; in addition, certain parameters might require elevated privileges)

Hadoop - Logging

The Hadoop batch enrichment module performs the following Aleph2 logging:

  • subsystem: "BeJobLauncher", command: "startAnalyticJobOrTest", level: INFO: "Adding storage paths ..." - this message is generated for HDFS file-based inputs (internal or external) and lists the paths whose files will be processed
  • subsystem: "BeJobLauncher", command: "startAnalyticJobOrTest", level: WARN: "Tried but failed to get ..." - this message is generated if Aleph2 returns no data for the requested inputs
  • subsystem: "BeJobLauncher", command: "startAnalyticJobOrTest", level: INFO: "Adding data service path ..." - this message is generated for data stored based inputs (eg Elasticsearch vs search_index_service) and returns technology-specific metadata such as (eg) the indexes being searched, the query, etc
  • subsystem: "BeJobLauncher", command: "startAnalyticJobOrTest", level: WARN: "No data in specified inputs ..." - the specified inputs were valid but contained no data
  • subsystem: "BeJobLauncher", command: "startAnalyticJobOrTest", level: ERROR: "Error submitting ..." - there was an error submitting the job to Hadoop - lists the configuration and error
  • subsystem: "BeJobLauncher", command: "startAnalyticJobOrTest", level: INFO: "Hadoop level overrides ..." - lists the any Hadoop-specific configuration overrides specified by the user
  • subsystem: "BatchEnrichmentJob", command: "<name>.onStageInitialize", level: INFO: a module in the pipeline has been successfully initialized
  • subsystem: "BatchEnrichmentJob", command: "<name>.onStageInitialize", level: ERROR: initialization of a module in the pipeline has failed
  • subsystem: "BatchEnrichmentJob.<name>", command: "<name>.onObjectBatch", level: TRACE: generated after each batch processing completes for a given module (name or "no_name"), reports the number of data objects input and output.
  • subsystem: "BatchEnrichmentJob.<name>", command: "<name>.completeBatchFinalStage", level: INFO: a Hadoop mapper has completed running, reports the number of data objects input and output.

Storm

Currently there are no technology overrides available for Storm.

Currently Storm has no Aleph2 logging.

⚠️ **GitHub.com Fallback** ⚠️