Concurrency handling - noflo/noflo-assembly GitHub Wiki

There are several techniques to make concurrent processing with NoFlo Assembly faster and safer.

Scoping

Long-running graphs serving multiple requests/jobs work faster than firing one-off graphs to handle a single job due to zero restart time. However, if no measures are taken, concurrent jobs may interfere within the same graph and the results are messed.

NoFlo provides IP scoping mechanism to avoid such collisions. All IPs with the same value of scope property are isolated from all other IPs with a different scope value. The only exception is Initial IPs (IIPs) which are visible in all scopes at the same time because they usually represent initial configuration/startup data.

Creating a scope

The most common way to use scopes is to create scoped IPs when the request/job enters the application. For example, if we have a source component in the beginning of an assembly line that receives incoming HTTP requests, it could create an isolated request like this:

import { IP } from 'noflo';

// ...
app.get('/foo', (req, res) => {
  const id = uuid.v4(); // generate unique id
  const msg = {
    errors: [],
    id,
    req,
  };
  const ip = new IP('data', msg, { scope: id });
  output.sendDone(ip);
});

Getting scoped data

After you have assigned a scope to an IP (or multiple IPs), NoFlo Process API takes care of isolating scoped IPs from non-matching scope. So most of the components down the line don't need to care about scope handling, they just read the data normally, e.g.:

const msg = input.getData('in'); // getting assembly message
const config = input.getData('config'); // getting config from IIP
// Do whatever is needed to msg
output.sendDone(msg);

Accessing an IP scope

If you need to change an IP scope or at least just access it, it can be done using input.get() to get an IP object.

const ip = input.get('in');
// let's add prefix to the scope
ip.scope = `foo_${ip.scope}`;
// then send it
output.sendDone(ip);

Cross-scope processing

By default IPs from different scopes are not visible to the same process function instance. A workaround for this scope isolation mechanism can be made using the following technique:

(input, output) => {
  const port = input.port.name; // The port the IP has just arrived to
  const ip = input.get(port);
  // Now given the ip, port and ip.scope we can do buffering and/or processing of generic kind
}

Parallel branches

Parallel branches in the graph may reduce response time per each request. While making parallel branches in the graph is easy with fan-out and fan-in nodes, there are several details to be aware of.

References vs. clones

In Classical FBP (e.g. JavaFBP) the default policy for fan-out nodes (with same data being sent to multiple connections) is to send different clones of the same IP to each of the connections. This kind of behavior guarantees that no races occur when parallel branches attempt to read or modify the same IP.

In NoFlo the default policy for fan-out nodes is to send the reference to the same IP object to each of the connections. It can save memory and processing time, and makes it possible for the changes to be visible in other branches (which is used in our Error handling pattern). But it makes the IPs unprotected from data races upon concurrent access.

NoFlo provides IP.clonable property to take control over this behavior, so if a component send an IP with clonable: true property, it will behave like Classical FBP and send clones of this IP rather than the reference to the same object:

const ip = new IP('data', msg, { clonable: true });
output.sendDone(ip);

Forks

In Assembly Line the approach is to take the best of the two worlds: have the lightness and visibility of the references, and protect data from races by cloning.

By default assembly messages are passed by reference. However, when an assembly line is split into multiple lines, a decision on how to split the message has to be made:

  • if you know for sure, that parallel branches will never try to modify the same message properties, the same message can be passed to all outputs;
  • otherwise it would make sense to use fork() and then merge() when merging the parallel lines back into one.

Forked messages do not reference the same object. But they have properties which may reference the same object. For instance, if m1 and m2 are the forks of m0, then m0.errors === m1.errors === m2.errors. Like mentioned before, this is critical for Error handling in parallel branches. When forking, you can specify message properties to be omitted or cloned rather than copied by reference.

See the fork/merge API reference here.