SO 5.8 InDepth Unique Subscribers Mbox - Stiffstream/sobjectizer GitHub Wiki

The Problem

Sometimes it's required to send different mutable messages to some number of agents. For example, there could be a data-processing with several processing stages (every stage is represented as an agent) and a management agent that coordinates the processing by using mutable messages: the manager sends a message to the first stage agent, receives the result, then sends a message to the second stage agent, gets the result and so on.

Because mutable messages are necessary, only MPSC mboxes have to be used. The manager has to know MPSC mboxes for all processing stages. It might not be very convenient. Sometimes it's much easier to have just one mbox for all outgoing mutable messages (all message types are different, so there is no problem with a separate subscriber for every message type). But there is no such kind of mbox in SObjectizer; the standard MPMC mbox allows subscription of different agents but prohibits the exchange of mutable messages.

The Solution

Since v.5.8.0, there is a unique-subscribers mbox that plays a role of MPSC mbox that allows subscription from several agents if those agents subscribe to different messages.

For example, agent A can make subscriptions to messages M1 and M2, and agent B can make subscriptions to message M3, agent C can make subscriptions to M4, M5, and M6. Agent D will send messages M1, M2, ..., M6 to just one mbox, but separate agents will receive those messages.

NOTE. This type of mbox was first introduced in v.1.5.0 of companion project so5extra, and it then has been moved into SObjectizer-5.8.0.

Usage Example

Very simple usage of unique-subscribers mbox can look like this:

#include <so_5/all.hpp>

// Messages to be used in exchange.
struct msg_first { std::string msg_; };
struct msg_second { std::string msg_; };

// Type of one worker agent.
class first_worker final : public so_5::agent_t
{
public:
	first_worker(context_t ctx, const so_5::mbox_t & src)
		:	so_5::agent_t{std::move(ctx)}
	{
		// Subscribe worker to msg_first.
		so_subscribe(src).event([](mutable_mhood_t<msg_first> cmd) {
				std::cout << "first_worker: " << cmd->msg_ << std::endl;
			});
	}
};

// Type of another worker agent.
class second_worker final : public so_5::agent_t
{
public:
	second_worker(context_t ctx, const so_5::mbox_t & src)
		:	so_5::agent_t{std::move(ctx)}
	{
		// Subscribe worker to msg_second.
		so_subscribe(src).event([](mutable_mhood_t<msg_second> cmd) {
				std::cout << "second_worker: " << cmd->msg_ << std::endl;
			});
	}
};

// Type of management agent.
class manager final : public so_5::agent_t
{
	const so_5::mbox_t m_target;

public:
	manager(context_t ctx, so_5::mbox_t target)
		:	so_5::agent_t{std::move(ctx)}
		,	m_target{std::move(target)}
	{}

	void so_evt_start() override
	{
		using namespace std::string_literals;

		// Both messages are sent to the same MPSC mbox, but
		// will be received by separate subscribers.
		// NOTE: messages are sent as mutable.
		so_5::send< so_5::mutable_msg<msg_first> >(m_target, "One"s);
		so_5::send< so_5::mutable_msg<msg_second> >(m_target, "Two"s);

		so_deregister_agent_coop_normally();
	}
};

int main()
{
	so_5::launch([](so_5::environment_t & env) {
			// MPSC mbox to be used for message exchange.
			const auto target = so_5::make_unique_subscribers_mbox(env);

			env.introduce_coop([&](so_5::coop_t & coop) {
					coop.make_agent<first_worker>(target);
					coop.make_agent<second_worker>(target);
					coop.make_agent<manager>(target);
				});
		});

	return 0;
}

Unique-Subscribers Mbox and Delivery Filters

The delivery filters are supported for unique-subscribers mboxes.

Please note, that so_5::agent_t::so_set_delivery_filter_for_mutable_msg has to be used for filtering mutable messages.

make_unique_subscribers_mbox Template Function

make_unique_subscribers_mbox function that is used for creation of unique-subscribers mbox instance is a template function with the following signature:

template< typename Lock_Type = std::mutex >
mbox_t
make_unique_subscribers_mbox( environment_t & env );

It has a single template parameter: Lock_Type. This parameter specifies a type of lock object to protect mbox's internals in a multithreaded application. By default, this type is std::mutex. But it can be changed to something that more appropriate for the user's environment.

For example, a custom implementation of spinlock can be used as Lock_Type. Or it can so_5::null_mutex_t for single-threaded environments.

⚠️ **GitHub.com Fallback** ⚠️