Javascript User Manual - kuujo/vertigo-js GitHub Wiki
- Concepts
- A Simple Network
- The Vertigo API
- Creating Components
- Creating networks
- Network deployment
- Events
- Advanced features
Concepts
Networks
A network is the representation of a collection of components - special Vert.x verticle implementations- and the connections between them. Put together, a network processes streams of data in real-time. Vertigo puts no limitations on network structures. Each component may be connected to zero or many other components, and circular relationships may be created as well. Networks are defined using the networks API and deployed using local or Via clusters.
This is an example of a complex network. Given a set of Vert.x verticles or modules, Vertigo uses a code-based representation of the network structure to define connections between network components, start component verticles or modules, monitor components, and manage communication between them in a fast, reliable manner.
Components
A component represents a single vertex in a Vertigo network graph. Each network may contain any number of components, and each component may have any number of instances running within the network (each of which may be assigned to different machines around a cluster). Within the context of Vert.x a component can be defined as a verticle that may listen to the output of zero or many verticles and emit output to zero or many listeners. What happens within the verticle depends entirely where they appear in a network graph and how the component is implemented. Vertigo provides several component implementations.
A simple network
In order to get a better understanding of the concepts introduced in Vertigo, let's take a look at a simple network example.
Defining the network
Vertigo networks are defined using the networks API.
var vertigo = require('vertigo');
var network = vertigo.createNetwork('word_count');
network.addFeeder('word_count.word_feeder', 'word_feeder.js');
network.addWorker('word_count.word_counter', 'word_counter.js', 2)
.addInput('word_count.word_feeder').fieldsGrouping('word');
This code defines a simple network that consists of only two verticles, a feeder and a worker. First, we define a new network named word_count. This is the address that will be used by the network coordinator once the network is deployed.
Next, we add the word_feeder component to the network. Components may be
either verticles or modules. Vertigo will automatically detect whether the component
is a verticle or module based on Vert.x module naming conventions. The first argument to any addFeeder
, addWorker
or addExecutor
method is the component address. Note that this is actually
the event bus address to which other components may connect to receive
messages from the component, so it's important that this name does not
conflict with other addresses on the event bus.
Next, we add the word_counter verticle, which will count words. After the word counter is added we indicate that the word counter should receive messages from the component at word_count.word_feeder by adding an input from that address. This means that once the network is started, each word_count instance will notify all instances of the word_feeder component that it is interested in receiving messages emitted by the word_feeder.
Finally, we group the word_counter
component using a FieldsGrouping
. Because
this particular component counts the number of words arriving to it, the same
word must always go to the same component instance, and the FieldsGrouping
is the element of the definition that guarantees this will happen. Groupings are
used to essentially define how messages are distributed among multiple instances
of a component.
Creating the feeder
Now let's look at how the feeder emits messages. Previously, users needed to construct feeders and other components within the feeder implementation. That is no longer the case. To use a feeder within the feeder verticle instance you need only to access the vertigo.feeder
variable. Vertigo automatically constructs the component instance based on the verticle context. So, if the current verticle was deployed as a feeder, Vertigo will construct a feeder and make it available via the feeder
property.
var vertigo = require('vertigo');
vertigo.feeder.startHandler(function(error, feeder) {
if (!error) {
// Feeder was started successfully.
}
});
Note that we set a start handler on the feeder. This helps us ensure that the feeder is started before we start sending messages. Once the feeder has started, we can begin feeding data.
feeder.feedHandler(function() {
feeder.emit({'word': 'Hello'});
});
Here we set a feed handler that will be called whenever the feeder is prepared to accept new mesages (i.e. the feed queue is not full). But what if the data fails to be processed? How can we be notified? The Vertigo feeder API provides for additional arguments that allow the feeder to be notified once a message is successfully processed. See what successfully processing means
feeder.feedHandler(function() {
feeder.emit({word: 'Hello'}, function(error) {
if (error) {
// Processing failed. Do something about it.
}
});
});
By providing an additional handler to the emit()
method, the feeder will
now be notified once the message is acked or failed.
Creating the worker
Now that we have a feeder to feed messages to the network, we need
to implement a worker. Workers are the primary units of processing in
Vertigo. In this case, we're creating a worker that counts the occurrences
of words in the word
field of the message body.
Creating and starting workers is done in the same was as with the feeder.
var vertigo = require('vertigo');
var worker = vertigo.worker;
Once we have created a worker, we need to add a handler for incoming
messages. To do this we call the messageHandler
method.
var counts = {};
worker.messageHandler(function(message) {
var word = message.body['word'];
if (counts[word] === undefined) {
counts[word] = 0;
}
counts[word]++;
});
Once we're done processing the message, we may want to emit the new
count to any other components that may be listening. To do so, we call
the emit()
method on the Worker
instance.
worker.emit({word: message.body['word'], count: counts[word]}, message);
Once a message has been fully processed, it is essential that the message be acked. This notifies the network that the message has been fully processed, and once all messages in a message tree have been processed the original data source (the feeder above) will be notified.
worker.ack(message);
Deploying the network
Now that we've created the network and implemented each of its components,
we need to deploy the network. Vertigo supports both local and clustered
network deployments using the LocalCluster
and RemoteCluster
(see
Via) interfaces. In this case, we'll just
use the LocalCluster
.
To deploy the network, we just pass our network definition to a cluster
instance's deploy()
method.
var vertigo = require('vertigo');
var network = vertigo.createNetwork('word_count');
network.addFeeder('word_count.word_feeder', 'word_feeder.js');
network.addWorker('word_count.word_counter', 'word_counter.js', 2)
.addInput('word_count.word_feeder').fieldsGrouping('word');
vertigo.deployLocalNetwork(network, function(error, context) {
if (!error) {
vertigo.shutdownLocalNetwork(context);
}
});
The NetworkContext
that is returned by the deployment contains
valuable information about the network. See contexts
for more info.
Executing the network as a remote procedure
But what if the feeder needs to receive feedback on the word count? In this case, we can replace the feeder with an executor. Executors work by capitalizing on circular connections within networks. Thus, first we need to re-define the network to create a circular connection between the data source and the worker.
var vertigo = require('vertigo');
var network = vertigo.createNetwork('word_count');
network.addExecutor('word_count.word_executor', 'word_executor.js')
.addInput('word_count.word_counter');
network.addVerticle('word_count.word_counter', 'word_counter.js', 2)
.addInput('word_count.word_executor').fieldsGrouping('word');
Here we simply add the word counter's address as an input to the word executor, thus creating the circular connection.
Now that we have our circular connections, we can re-define the original feeder to expect a result using an executor.
var vertigo = require('vertigo');
// We don't have to set a start handler since the executor won't
// call the execute handler until it's started anyways.
vertigo.executor.executeHandler(function(executor) {
executor.execute({'word': 'Hello'}, function(failed, result) {
if (!failed) {
console.log('The current count for Hello is ' + result['count']);
}
});
});
Note that because the word_counter
component always emits data, it does
not have to be refactored to support remote procedure calls. For this reason,
all component implementations should always emit data whether they may or
may not be connected to any other component. If the component is not, in fact,
connected to any other component, emit
ing data will have no effect.
The Vertigo API
The Vertigo Javascript API is designed to be available in a single module, the vertigo
module. This is a dynamic module that changes behavior based on the context in which it is loaded. This means that when the current Vert.x verticle is a Vertigo component, the component object (feeder, worker, executor) will be made available in the vertigo
module. This differs from the previous need to construct component instances on your own. Now, Vertigo handles it for you.
The vertigo
module exposes the following variables and methods:
context
- the current component instance context (if this is a Vertigo component instance)feeder
- the current feeder component (if this is a feeder component instance)worker
- the current worker component (if this is a worker component instance)executor
- the current executor component (if this is an executor component instance)
As mentioned, the behavior of the module changes depending on the context of the current Javascript verticle. So, in a worker verticle, registering a message handler is as simple as this:
var vertigo = require('vertigo');
vertigo.worker.messageHandler(function(message) {
vertigo.worker.emit({foo: 'bar'}, message);
});
The vertigo
module also exposes the following methods for creating and deploying networks:
createNetwork(address)
- creates a new network objectdeployLocalNetwork(network, handler)
- deploys a network to the local Vert.x instanceshutdownLocalNetwork(context, handler)
- shuts down a local networkdeployRemoteNetwork(address, network, handler)
- deploys a remote network over the Vert.x event busshutdownLocalNetwork(address, context, handler)
- shuts down a remote network over the Vert.x event bus
Including the Javascript API
To include the Javascript Vertigo API in your module, add the following to your module's mod.json
file.
"includes": "net.kuujo~vertigo-js~0.6.0"
This will enable you to require()
the vertigo
module as demonstrated.
Creating components
Vertigo no longer requires users to create components within Vert.x verticles. Instead, a new component typing system has been devised which allows Vertigo to construct the appropriate component type for the current Vert.x verticle instance. Thus, if a verticle is deployed as a feeder, Vertigo will automatically make a single Feeder
instance available via the feeder
variable in the vertigo
module. Similarly, in workers it will make a worker
available, and in executors an exectutor
. Thus, to load the current Vertigo component simply require()
the vertigo
module.
var vertigo = require('vertigo');
var worker = vertigo.worker;
worker.messageHandler(function(message) {
...
});
Contexts
The InstanceContext
object contains information relevant to the current component
instance, as well as its parent component definition and even information about
the entire network layout, including unique addresses for each network component
instance.
InstanceContext
The InstanceContext
exposes the following interface:
address
- the unique worker event bus addresscomponent()
- returns the parent component context
ComponentContext
The ComponentContext
exposes the following interface:
address
- the component address - this is the basis for all component instance addressestype
- the component type, either "module" or "verticle"isModule()
- indicates whether the component is a moduleisVerticle()
- indicates whether the component is a verticleconfig()
- the component configurationnumInstances()
- the number of component instancesinputs()
- a list of component inputsinstances()
- a list of component instance contextsnetwork()
- the parent network context
NetworkContext
The NetworkContext
exposes the following interface:
address
- the network address - this is the basis for all component addressesauditors()
- returns an array of network auditor addresses, each auditor is assigned its own unique event bus addresscomponents()
- an array of all network component contexts
Feeders
Feeders are components whose sole responsibility is to feed data to a network. Data generated by feeders may come from any source, and Vertigo provides a number of feeder implementations for integrating networks with a variety of Vert.x and other APIs.
Each feeder exposes the following configuration methods:
feedQueueMaxSize(queueSize)
- sets the maximum feed queue sizefeedQueueMaxSize()
- gets the maximum feed queue sizefeedQueueFull()
- indicates whether the feed queue is fullautoRetry(retry)
- sets whether to automatically retry sending failed messagesautoRetry()
- indicates whether auto retry is enabledautoRetryAttempts(attempts)
- sets the number of retries to attempt before explicitly failing the feed. To set an infinite number of retry attempts pass-1
as theattempts
argumentautoRetryAttempts()
- indicate the number of automatic retry attempts
Emitting messages
Once a feeder has been started, you can feed messages using the emit()
method:
emit(body)
emit(body)
emit(stream, body, ackHandler)
emit(stream, body, ackHandler)
var vertigo = require('vertigo');
vertigo.feeder.startHandler(function(feeder) {
feeder.emit({foo: 'bar'});
});
Emitting messages to specific streams
Vertigo supports named input/output streams. When networks are defined, components may subscribe to specific output streams of another component. This allows different types of data to be emitted to different streams. Streams do not have to be declared by components. Simply emit data to a specific stream and Vertigo will ensure the message goes to any components that are subscribed to that stream.
feeder.emit('product', {id: '1234'});
Determining whether a message has been successfully processed
When a message is emitted from a feeder, it has the potential to result in thousands of messages being created based on it. The message may go through dozens of worker components, each of which may emit dozens of child messages. Vertigo tracks the creation and completion of each of these messages and once the entire message tree has been successfully processed it notifies the source (the feeder).
While Vertigo feeders do support automatically resending timed out messages (see the autoRetry
method), often times feeders will want to receive notification when a message fails to be processed. To do so, simply pass a handler to the feeder's emit
method. Once the message is fully processed, failed, or times out this handler will be called.
feeder.emit('product', {id: '1234'}, function(error) {
if (error) {
// Processing failed.
}
});
If the message is successfully processed, the handler will be called with the MessageId
of the message that was processed.
If the message or any of its descendants times out, the handler will be failed with a TimeoutException
. Note that the MessageId
instance will still be available in the result object. This is because Vertigo uses a special AsyncResult
implementation to ensure that data relevant to the failure is provided.
If the message or any of its descendants is explicitly failed by a worker, the handler will be failed with a FailureException
. As with the TimeoutException
, the MessageId
instance will still be available in the result object.
feeder.emit('product', {id: '1234'}, function(error, messageid) {
if (error) {
if (error.type == 'failure') {
console.log(messageid + ' was failed.');
} else if (error.type == 'timeout') {
console.log(messageid + ' timed out.');
}
}
});
When passing an ackHandler
to an emit
method, the handler will be invoked
once an ack
or fail
message is received from a network auditor. If
autoRetry
is enabled, retryAttempts
attempts will be made before the
handler will be invoked.
To specify a stream to which to emit the message, pass a string
as the first argument to emit()
.
Workers
Worker components are the primary units of processing in Vertigo. Workers
are designed to both receive input and emit output (though workers can do
one or the other). The processes that are performed in between depend
entirely on the implementation.
Each worker must have a handler assigned to it for handling incoming messages.
To attach a handler to a Worker
, call the messageHandler()
method.
var vertigo = require('vertigo');
vertigo.worker.messageHandler(function(message) {
vertigo.worker.emit({foo: 'bar', bar: 'baz'}, message);
});
Messages
When a worker receives data, it receives it in the form of a vertigo.message.Message
instance. This special message wrapper provides an API that exposes more
information about a message than just data. Vertigo messages can be organized
into message trees with relationships between messages. The JsonMessage
provides metadata about the message and its relationship to other messages
with the following methods:
id()
- the unique message correlation identifierbody()
- the message body, a JSON objectsource()
- thename
of the component from which this message tree originatedstream()
- the stream on which the message was emitted
worker.messageHandler(function(message) {
var words = message.body['words'];
});
Emitting messages
Each worker component can both receive and emit messages. Of course, where
a message goes once it is emitted from a worker instance is abstracted from
the implementation, and the Worker
interface behaves accordingly. The
Worker
exposes the following methods for emitting messages
emit(body)
- emits a message body to the default streamemit(stream, body)
- emits a message body to a specific output streamemit(body, parent)
- emits a message body to the default stream as a child of the givenMessage
instanceemit(stream, body, parent)
emits a message to a specific output stream as a child of the givenMessage
instance
worker.messageHandler(function(message) {
var words = message.body['words'];
worker.emit({count: words.length}, message);
});
Note that hierarchical message relationships are created by passing a parent
message to the emit()
method. When this type of relationship is created, the
message tree's assigned auditor
is notified of the new relationship. What this
relationship means is that the message source
will not be notified of a
message's successful completion until all messages in the message tree have
been completely acked. However, if a message is failed then the message source
will be notified immediately.
Acking messages
Vertigo provides for reliable messaging between network components using acks
and fails. Each message that is received by a worker component must be acked
or failed, otherwise the message tree will eventually be failed via timeout.
The Worker
provides the following methods for acks/fails:
ack(message)
- indicates that a message has been successfully processedfail(message)
- indicates that a message has failed processing. This can be used as a vehicle for notifying data sources of invalid data
worker.messageHandler(function(message) {
var words = message.body['words'];
if (words !== undefined) {
worker.emit({count: words.length}, message);
worker.ack(message);
}
else {
worker.fail(message);
}
});
For more information see how Vertigo guarantees message processing
Executors
Executors are components that execute part or all of a network essential as a remote procedure invocation. Data emitted from executors is tagged with a unique ID, and new messages received by the executor are correlated with past emissions.
Each executor exposes the following execute()
methods:
execute(args, resultHandler)
execute(stream, args, resultHandler)
Networks that use remote procedure invocations must be designed in a very specific manner. Remote procedure calls work by essentially creating circular connections between network components.
Creating networks
Networks are defined in code using a Network
instance.
Some examples demonstrate how the network definition API works.
To define a network, call the createNetwork(address)
method
var vertigo = require('vertigo');
var network = vertigo.createNetwork('test');
Each network must be given a unique name. Vertigo component addresses are generated in a predictable manner, and this name is used to prefix all component addresses and instance addresses.
The Network
exposes the following configuration methods:
address(address)
- sets the network address, this is a unique event bus address at which the network coordinator will coordinate deployment and monitoring of network componentsenableAcking()
- enables acking for the networkdisableAcking()
- disabled acking for the networkisAckingEnabled()
- indicates whether acking is enabled for the networknumAuditors(numAuditors)
- sets the number of network auditors (ackers)numAuditors()
- indicates the number of network auditorsackExpire(expire)
- sets the message ack expiration for the networkackExpire()
- indicates the ack expiration for the network
Adding components
The Network
class provides several methods for adding components
to the network.
addComponent(component)
addFeeder(address)
addFeeder(address, moduleOrMain)
addFeeder(address, moduleOrMain, config)
addFeeder(address, moduleOrMain, instances)
addFeeder(address, moduleOrMain, config, instances)
addWorker(address)
addWorker(address, moduleOrMain)
addWorker(address, moduleOrMain, config)
addWorker(address, moduleOrMain, instances)
addWorker(address, moduleOrMain, config, instances)
addExecutor(address)
addExecutor(address, moduleOrMain)
addExecutor(address, moduleOrMain, config)
addExecutor(address, moduleOrMain, instances)
addExecutor(address, moduleOrMain, config, instances)
Note that Vertigo supports both verticles and modules as network components. Vertigo will automatically determine whether a component is a verticle or a module based on Vert.x module naming conventions.
The return value of each of these methods is a new Component
instance
on which you can set the following properties:
main(main)
- sets the component verticle main. If this is a valid module name then an error will occurmodule(module)
- sets the component module name. If this is not a valid module name then an error will occurconfig(config)
- sets the component configuration. This is made available as the normal Vert.x configuration within a component instanceinstances(instances)
- sets the number of component instances
Adding inputs
Conncetions between components are created by adding an input to a component definition. Inputs indicate which components a given component is interested in receiving messages from. Vertigo uses a publish/subscribe messaging system, so when a component is started, it will subscribe to messages from other components according to its input configurations.
Inputs may be added to any component with the addInput
method:
addInput(input)
addInput(address)
addInput(address, grouping)
Each of these methods returns the added Input
instance which exposes
the following methods:
groupBy(grouping)
- sets the input groupingrandomGrouping()
- sets a random grouping on the inputroundGrouping()
- sets a round-robin grouping on the inputfieldsGrouping(fields)
- sets a fields grouping on the inputallGrouping()
- sets a fanout grouping on the inputstream(stream)
- sets the input stream
The input address should be the event bus address for a component to which the input will subscribe to receive new output messages. Note that this subscription pattern does not place any restrictions on the input address, so users can subscribe a component to output from any component in any network. See nested networks for an example.
Input Streams
Vertigo now supports emitting messages from components to specific streams to which components may choose to subscribe. This essentially means that components can subscribe to specific output from other components by specifying a stream. Streams are identified simply as strings. Note that not specifying a stream simply indicates a subscription to the default stream.
To subscribe to a non-default stream, pass the stream name as the second argument to addInput
or call stream(stream)
on the input object.
// Subscribe to the 'apples' stream of 'some_feeder'.
network.addWorker('worker', 'my_worker.js').addInput('some_feeder', 'apples');
Input Groupings
With each component instance maintaining its own unique event bus address, Vertigo needs a way to determine which component instance any given message should be dispatched to. For this, each input may indicate a grouping which determines how messages are distributed among multiple instances of the component on that input. For instance, one component may need to receive all messages emitted to it from another component, while another may be need to receive messages in a round-robin fashion. Vertigo provides groupings for various scenarios, including consistent-hashing based groupings. Custom component groupings may also be provided.
Groupings are abstracted from component implementations, so they can be added when defining a network component rather than within component verticles themselves.
To set a component grouping, call the groupBy()
method on a component input.
var network = vertigo.createNetwork('foo');
network.addFeeder('foo.bar', 'some_feeder.py');
network.addWorker('foo.baz', 'some_worker.js', 2).addInput('foo.bar').fieldsGrouping('type');
When messages are emitted to instances of the component, the related grouping selector will be used to determine to which component instance a given message is sent.
Vertigo provides several grouping types:
RandomGrouping
- component instances receive messages in random order
var network = vertigo.createNetwork('foo');
network.addFeeder('foo.bar', 'some_feeder.py');
network.addWorker('foo.baz', 'some_worker.js', 2).addInput('foo.bar').randomGrouping();
RoundGrouping
- component instances receive messages in round-robin fashion
var network = vertigo.createNetwork('foo');
network.addFeeder('foo.bar', 'some_feeder.py');
network.addWorker('foo.baz', 'some_worker.js', 2).addInput('foo.bar').roundGrouping();
FieldsGrouping
- component instances receive messages according to basic consistent hashing based on a given field
var network = vertigo.createNetwork('foo');
network.addFeeder('foo.bar', 'some_feeder.py');
network.addWorker('foo.baz', 'some_worker.js', 2).addInput('foo.bar').fieldsGrouping('type');
Consistent hashing supports multiple fields as well.
AllGrouping
- all component instances receive a copy of each message
var network = vertigo.createNetwork('foo');
network.addFeeder('foo.bar', 'some_feeder.py');
network.addWorker('foo.baz', 'some_worker.js', 2).addInput('foo.bar').allGrouping();
Event bus hooks
The Vertigo Javascript API provides a custom hook implementation that allows users to hook into component events at the point of deployment. When requested, Javascript network components can publish various events over the event bus. To add a hook to a component call the addHook()
method on the component definition. The first argument to the method is the hook type (listed below) and the second is a callback to call when the hook is triggered. The available events are as follows:
start
- called when a component instance is startedreceive
- called when the component receives a messageack
- called when the component acks a messagefail
- called when the component fails a messageemit
- called when the component emits a messageacked
- called when a feeder or executor receives notification of a fully processed messagefailed
- called when a feeder or executor receives notification of a message failuretimeout
- called when a feeder or executor receives notification of a message timeoutstop
- called when a component instance is stopped
var network = vertigo.createNetwork('test');
network.addFeeder('feeder', 'my_feeder.js').addHook('emit', function(id) {
console.log('Message ' + id + ' was emitted from the feeder');
});
Network deployment
Once you have defined a network using the definition API, the network can
be deployed via the Cluster
API. Vertigo provides two types of deployment
methods via the vertigo
interface:
deployLocalNetwork(network, handler)
deployRemoteNetwork(address, network, handler)
When a network is deployed successfully, a NetworkContext
instance may
be returned if a doneHandler
was provided. The NetworkContext
instance
contains information about the network components, including component
definitions, addresses, and connections.
Deploying networks locally
To deploy a network locally (within a single Vert.x instance) use the
LocalCluster
. The local cluster uses the core Vert.x Container
API
to deploy network verticles and modules locally.
var vertigo = require('vertigo');
vertigo.deployLocalNetwork(network, function(error, context) {
if (!error) {
vertigo.shutdownLocalNetwork(context);
}
});
Deploying networks across a cluster
Vertigo also supports deploying networks across a cluster of Vert.x instances. This is supported using the Via distributed deployment module. Via allows Vertigo to essentially deploy network verticles and modules by sending messages over the event bus to other Vert.x instances. This can result in a much more stable network since Via can reassign components to other Vert.x instances if a specific node dies.
Via clusters use the same interface as local clusters, but with an additional
address
argument to the constructor - the address to the Via master
through which the network's coordinator should deploy component modules
and verticles.
var vertigo = require('vertigo');
vertigo.deployRemoteNetwork('via.master', network, function(error, context) {
if (!error) {
vertigo.shutdownRemoteNetwork('via.master', context);
}
});
Events
Vertigo emits event messages over the Vert.x event bus when certain special events occur. To listen for a Vertigo system event, simply register a handler on the Vert.x event bus at the specified event address. Currently, Vertigo events are limited, but more will be added in the future (and by request).
Network events
vertigo.network.deploy
- triggered when a network is deployed.address
- the network addressnetwork
- a JSON representation of the network
vertigo.network.start
- triggered when an entire network has been started.address
- the network addresscontext
- a JSON representation of the network context
vertigo.network.shutdown
- triggered when a network has been shutdown.address
- the network addresscontext
- a JSON representation of the network context
Component events
vertigo.component.deploy
- triggered when a component instance is deployed.address
- the component addresscontext
- a JSON representation of the component instance context
vertigo.component.start
- triggered when a component instance has been started.address
- the component addresscontext
- a JSON representation of the component instance context
vertigo.component.shutdown
- triggered when a component instance has been shutdown.address
- the component addresscontext
- a JSON representation of the component instance context
Advanced Features
The Vertigo communication system was intentionally designed so that no component needs to know too much about who it is receiving messages from or who it is sending messages to. This results in a flexible messaging system, allowing users to tap into specific portions of networks or join multiple networks together.
Wire taps
A Vertigo component's output is not strictly limited to components within its own network. Vertigo uses a publish-subscribe style messaging scheme, so components can send messages to anyone who's interested in listening. This means you can "tap in" to any component on any network from any Vert.x verticle (as long as you know the component's addresss).
For example, let's say we've deployed the following network:
var network = vertigo.createNetwork('tap');
network.addFeeder('tap.first', 'first.js', 2);
network.addWorker('tap.second', 'second.py', 2).addInput('tap.first');
network.addWorker('tap.third', 'Third.java', 4).addInput('tap.second');
We can tap into the output of any of the network's components using a Listener
instance.
Listeners behave just as any other message receiving API in Vertigo - in fact, listeners
underly the InputCollector
API. So, we simply create a new listener, assign a messageHandler
to the listener, and start it.
var listener = new vertigo.input.Listener('tap.second');
listener.start(function(error) {
if (!error) {
listener.messageHandler(function(message) {
console.log('Received ' + JSON.stringify(message) + ' from tap.second.');
});
}
});
When the listener is started, it will begin publishing heartbeat messages to
tap.second
on the event bus. Heartbeat messages to tap.second
will be received
by both of the component instances at tap.second
, each of which will add the
listener (and its unique address) as an output channel. If this verticle is stopped
or the stop()
method is called on the listener, the component instances at tap.second
will stop receiving heartbeats and will thus remove the listener as an output channel.
Note that the Listener
constructor can also accept an Input
instance as an argument,
so multiple instances of the same verticle can be supported.
Nested networks
Just as any Vert.x verticle can request input from any network component, networks too can receive input from any other network's component. This can be accomplished simply by adding the component's address as an input to any network component.
Using the same network as before:
var network = vertigo.createNetwork('tap');
network.addFeeder('tap.first', 'first.js', 2);
network.addWorker('tap.second', 'second.py', 2).addInput('tap.first');
network.addWorker('tap.third', 'Third.java', 4).addInput('tap.second');
We can create another network that receives input from tap.second
:
var network = vertigo.createNetwork('tapper');
network.addWorker('tapper.worker1', 'tapper_worker1.js', 2)
.addInput('tap.second').randomGrouping();
network.addWorker('tapper.worker2', 'tapper_worker2.js', 4)
.addInput('tapper.worker1').roundGrouping();
This results in a network that has no feeders, but instead essentially adds workers to the first network. In fact, since Vertigo's ack/fail system is similarly abstracted from network details - auditors are independent of networks, and each message tree is assigned a specific auditor - messages will behave as if the appended network is indeed a part of the original network, and the original message source will not receive ack/fail notification until the second network has completed processing of the received message. This makes this a reliable method of expanding upon existing running networks.