(Detailed Design) Aleph2: Distributing harvest and analytics technologies across multiple nodes - IKANOW/Aleph2 GitHub Wiki

Overview

In many cases, a harvest or analytics technology module need only run on a single node.

For example, to launch a Storm or Hadoop task you submit it to the cluster, and it takes care of distributing it.

In these cases, the multi_node_enabled flag of DataBucketBean or AnalyticThreadBean objects can be left unset or set to false.

Conversely - but with the same end result, the process might be inherently single threaded - for example running a Gephi command line pipeline to partition a graph.

(Even in these simple cases however, there can be complications in large clusters when some nodes have the relevant technology installed and others don't - this is covered below.)

In other cases, the import (or analytics) process itself is not inherently distributed, and the Aleph2 framework tries to make it easy for the technology developer to add basic distribution:

  • set the multi_node_enabled flag of DataBucketBean or AnalyticThreadBean objects can be left unset or set to true
  • Then, each call to any of the IHarvestTechnologyModule or IAnalyticsTechnologyModule is made on each "available" node (see below for meaning of "available")
  • It is the responsibility of the technology developer to perform any more fine-grained synchronization between nodes (eg using IHarvestContext.getBucketObjectStore or IGeneralDistriburedServices) in the edge cases where this is necessary

Node availability

Aleph2 provides a number of different methods of deciding which nodes should handle a given bucket/thread (and associated harvester/analytics technology), checked in this order:

  • The local data import manager configuration (TODO) can limit which harvest/analytic technology modules will run on each node
  • The DataBucketBean and AnalyticThreadBean classes have node_list_rules controlling which nodes they will run on (by hostname)
  • The IHarvestTechnologyModule and IAnalyticsTechnologyModule interfaces have canRunOnThisNode functions that can prevent the technology running on this node by returning false.

For "distributed" buckets and threads (ie multi_node_enabled: true), each bucket/thread command is run on all "available" nodes given the above 3 rules.

For "single node" buckets and threads (ie multi_node_enabled: false or unset), an "available" node is randomly selected. Once this has happened, the node_affinity is set in the DataBucketStatusBean/AnalyticThreadStatusBean and subsequent calls are routed there immediately.

(TODO: longer term, the Aleph2 platform will attempt to "load balance" "single node" buckets and threads across "available" nodes)

Examples

Distributed Flume

A cluster of 20 nodes includes 4 "import nodes" that will run only Flume.

These 2 nodes have local configuration overrides for the data import manager, specifying that only "flume" harvest technologies are available.

A user creates a new bucket for Flume imports, associating the bucket with the "flume" harvest technology library. They elect not to set the node_list_rules.

The Flume harvest technology developer has overridden canRunOnThisNode (eg):

public boolean canRunOnThisNode(@NonNull DataBucketBean bucket) { return new File('/opt/flume/bin/flume-ng').exists(); }

When the new bucket is created, it is submitted to all 20 nodes:

  • On 16 of the nodes, the above function returns false
  • On the 4 "data import" nodes, it returns true, therefore 4 identical Flume instances are launched, all reading data from a given port and writing it out to the same HDFS path (and/or Kafka queue). The incoming log data can then be load balanced across the 4 nodes.

Imagine a different user creates a "webcrawl" bucket that is also distributed. The "data import" nodes ignore the bucket because of their launch configuration restrictions.

Single-node gephi analytic thread

TODO