z.Creating your own app configuration generation - DUNE-DAQ/daqconf GitHub Wiki

DAQ objects

All the DAQ processes are constituted from smallest to biggest by the following entities:

  • DAQModule,
  • Application (or Apps),
  • System.

So each App has one or several DAQModules, and each System has one or several App.

To generate the configuration for your DAQ processes, you need to generate the configuration for each of the constituents.

DAQModule

To generate the configuration of a DAQModule, you should use the DAQModule python class defined in appfwk and use the following import statement:

from appfwk.daqmodule import DAQModule

DAQModule constructor

The DAQModule uses the following constructor:

daq_module = DAQModule(plugin, conf, extra_commands, connections, name)

Where:

  • plugin is the string of the name of the C++ plugin,
  • conf is the configuration that has been filled from a moo configuration schema,
  • extra_commmands is currently unused,
  • connections is a dictionary of the connections to other DAQModules,
  • name is the name of the module by which other modules and commands will refer to it

If you have written the C++ code, then the first 2 arguments should be relatively simple to figure out.

DAQModule connections

We make entries in the connections dictionary only for outgoing connections from the module. Each incoming connection to the module either comes from an outgoing connection of another module, or from another application, in which case it's defined by an incoming endpoint (see below).

Let's have a look at an example (this particular code is in trigger_gen.py):

heartbeatmaker = DAQModule(
  name = 'heartbeatmaker',
  plugin = 'FakeTPCreatorHeartbeatMaker',
  conf = heartbeater.Conf(heartbeat_interval=5_000_000),
  connections = {'tpset_sink': Connection(f"zip_region0.input")}
)

The dictionary key is the name of the queue in the plugin C++ code, so for example tpset_sink, one can see that in FakeTPCreatorHeartbeatMaker.cxx has:

m_output_queue.reset(new sink_t(appfwk::queue_inst(iniobj, "tpset_sink")));

The value of the dictionary is a Connection object defined in appfwk and available after the following import:

from appfwk.conf_utils import Connection

Connection parameters are as follow:

connection = Connection(to, queue_kind, queue_capacity, queue_name, toposort)
  • to: The name of the DAQModule to which this queue goes and the name of the queue, separated by a dot. In the above example, zip_region0.input means that there is a DAQModule of name zip_region0 in the same App, that is initiating a queue as such: appfwk::queue_inst(ini, "input"),
  • queue_kind: this is the kind of queue, by default, it's FollyMPMCQueue, but it could be FollySPSCQueue or StdDeQueue,
  • queue_capacity: how big the queue is, by default 1,000,
  • queue_name: the name of the queue. By default this is formed automatically by the configuration generation code, but some modules rely explicitly on the queue "inst", so this option is sometimes needed,
  • toposort: whether this queue is going to be used in the calculation of module start order for the application. Modules are created from downstream to upstream, based on the flow of data in queues. But there may be cycles in this graph. If so, you should set toposort=False on the connection that should not be considered in the downstream-to-upstream calculation

Typically you will only need to specify the to argument.

ModuleGraph

Once you have created all your DAQModules, you need to "compose" them to form an App. To do that, you should initiate a ModuleGraph from them. The ModuleGraph is defined here and available after the following import:

from appfwk.app import ModuleGraph

ModuleGraph constructor

ModuleGraph uses the following constructor:

ModuleGraph(modules)

Where:

  • modules is a list of the DAQModule have created

Endpoints

They are defined in appfwk.

Endpoints are used to connect different App.

To add an Endpoint to a ModuleGraph one can do the following:

module_graph.add_endpoint(external_name, internal_name, direction, topic)
  • external_name, is a name visible to all the Apps to a particular application's queue.
  • internal_name is the name of the module and name of the queue in the ModuleGraph's DAQModule list. The form of this string is module_name.queue_name (note this is akin to the Connection.to parameter).
  • direction is the direction in which this connection goes. In the above example, the module_graph consumes tpsets, hence its direction is IN. For a module that would create tpsets to be consumed by other Application, the directions would be OUT.
  • TODO PL: topics arguments? I think this has been deprecated.

For example, trigger_gen.py defines an external name tpsets_into_chain_ru0_link0, visible to all the system Apps, and ties it to heartbeatmaker_ru0_link0.tpset_source. This means that in the DAQModule list in the module_graph, there has to be a module named heartbeatmaker_ru0_link0, which initiates a queue of as such: appfwk::queue_inst(ini, "tpset_source") (in this example this code is here).

Important note: Sometimes, a DAQModule uses the NetworkManager for a connection and therefore doesn't initialise the queue itself. In which case, the Endpoint doesn't need to specify the queue and module to which it connects to and can pass None to the internal_name.

Fragment Producers

Some applications have DAQModule that produce fragments, in which case they need to be registered differently in the ModuleGraph. For that one can use the function module_graph.add_fragment_producer:

module_graph.add_fragment_producer(system, region, element, requests_in, fragments_out)
  • system is a string that represents the system that generated the fragments (for example DataSelection, TPC, NDLArTPC or PDS),
  • region is the region's id,
  • element is the element's id,
  • requests_in is the module and queue which are expecting data requests. Again, this is specified as such: module_name.queue_name,
  • fragments_out is the module and queue which will respond to data requests and output the fragments. This is specified in the same as the requests_in (module_name.queue_name).

Application

Once you have created a ModuleGraph, initiating an App is very simple:

app = App(modulegraph, host)

The modulegraph is the one you have just created, and a host is a string for the host.

System

To create a system, one can just use the constructor:

system = System(partition_name, first_port)

and add applications in the System.apps dictionary as such:

system.apps["an_app"] = some_app

Note that the System class is defined in here and available after importing:

from appfwk.system import System

System network endpoints

This is a list of all the connection manager's endpoints. The endpoints are either created automatically at a later stage or can be specified directly:

system.network_endpoints.append(nwmgr.Connection(name="partition.datareq_0", topics=[], address=f"tcp://{{host_ruemu0}}:999"))

Note that nwmgr.Connection is directly the moo schema representing a NetworkManager connections.

Application connections

Connecting Apps is done by extending the System.app_connections dictionary. The keys of this dictionary are strings that are of the form upstream_app.endpoint_name. AppConnection objects are initialised from the constructor:

system.app_connections["app_name.endpoint_name"] = AppConnection(nwmgr_connection, receivers, topics, msg_type, msg_module_name, use_nwqa)
  • nwmgr_connection is the name of the network manager connection,
  • receivers is a list of receivers, of the form app_name.endpoint_name,
  • topics is a list of topics in case the connection should be pub/sub,
  • msg_type is the C++ class of the objects that are passed, to be passed when the connection uses QueueToNetwork/NetworkToQueue
  • msg_module_name is the name of the C++ plugin that holds the QueueToNetwork/NetworkToQueue logic for this object type (if required)
  • use_nwqa is unused - whether to use QueueToNetwork/NetworkToQueue is determined by whether the application endpoints connect to an internal queue, or directly into a module

Connecting fragment producers to dataflow App

The dataflow App is responsible for passing the data request to the various fragment producers, and writing out this data. To connect the fragment producer to the dataflow App, simply use:

connect_all_fragment_producers(system)

which is defined in here and available after the invoking:

from daqconf.core.fragment_producers import connect_all_fragment_producers

Telling the MLT which fragment producers exist

The ModuleLevelTrigger (MLT) needs to know which fragment producers exist to create TriggerDecision, for that, there needs to be an additional step after all the fragment producers are created to modify the MLT. This is done in here, and is available from:

set_mlt_links(system, mlt_app_name)

where the mlt_app_name is the app which has the mlt DAQModule. This function is available after the following import:

from daqconf.core.fragment_producers import set_mlt_links

Connecting the endpoints to the module

In some situations, the App's endpoints are not actually connected to any module (for example if connection uses a N2Q module), this is done automatically in add_network defined here.

add_network(app_name, system)

where app_name is the name of the app for which the network connection need to be added. This function is available after an import of the form:

from appfwk.conf_utils import add_network

Finally, creating commands and writing json configuration files

That is done by doing:

from appfwk.conf_utils import make_app_command_data, make_system_command_datas, generate_boot, write_json_files

app_command_datas = {
  name : make_app_command_data(system, app)
  for name,app in the_system.apps.items()
}

system_command_datas = make_system_command_datas(system)

boot = generate_boot(system.apps,
                     partition_name=partition_name,
                     ers_settings=ers_settings,
                     info_svc_uri=info_svc_uri,
                     disable_trace=disable_trace,
                     use_kafka=use_kafka)

system_command_datas['boot'] = boot

write_json_files(app_command_datas, system_command_datas, json_dir)

write_metadata_file(json_dir, "daqconf_multiru_gen")