SO 5.6 InDepth Message Chains - Stiffstream/sobjectizer GitHub Wiki
The main purpose of message chain (or just mchain) mechanism is providing a way for interacting between SObjectizer- and non-SObjectizer-part of an application. The interaction in the opposite direction is very simple: usual message passing via mboxes is used. For example:
class worker : public so_5::agent_t {
public:
...
void so_define_agent() override {
// A mbox to be used to receive requests.
auto mbox = so_environment().create_mbox("requests");
// Subscription to messages from that mbox...
so_subscribe(mbox).event(&worker::request_handler1)
.event(&worker::request_handler2)
.event(&worker::request_handler3)
...;
}
...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
launch_agents(...); // Workers will be started here.
// Create a mbox for interaction with workers.
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
so_5::send<request1>(mbox, ...);
so_5::send<request2>(mbox, ...);
...
But how to receive some messages from an agent back to non-SObjectizer-part of the application?
Message chain is the answer.
Message chain looks almost like mbox for agents. An agent can send messages to mchain exactly the same way as for mbox. So mchain can be passed to agent and agent will use it as the destination for reply messages. On the other side of a mchain will be non-SObjectizer message handler. This handler will receive messages from the mchain by using special API functions and handle them appropriately.
class worker : public so_5::agent_t {
public:
worker(context_t ctx, so_5::mbox_t reply_to)
: so_5::agent_t{ctx}, m_reply_to{std::move(reply_to)}
{...}
...
void so_define_agent() override {
// A mbox to be used to receive requests.
auto mbox = so_environment().create_mbox("requests");
// Subscription to messages from that mbox...
so_subscribe(mbox).event(&worker::request_handler1)
.event(&worker::request_handler2)
.event(&worker::request_handler3)
...;
}
private:
// A mbox for sending responses.
const so_5::mbox_t m_reply_to;
...
// Request handler.
void request_handler1(const request1 & r) {
... // Some processing.
// Sending response back as a message.
so_5::send<reply1>(m_reply_to, ...);
}
...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
// Chain for receiving responses.
// Simplest mchain without any limitation will be created.
so_5::mchain_t reply_ch = create_mchain(sobj);
launch_agents(
// Passing mchain as a parameter.
// Cast it to mbox to look exactly like ordinary mbox.
reply_ch.as_mbox(), ...);
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
// And receive responses.
so_5::send<request1>(mbox, ...);
receive(from(reply_ch).handle_n(1), [](const reply1 &) {...});
so_5::send<request2>(mbox, ...);
receive(from(reply_ch).handle_n(1), [](const reply2 &) {...});
...
There are two types of mchains:
- Unlimited mchains. Those mchains have no limitation for the number of messages to be stored inside mchain. This number is limited only by the amount of available memory and by the common sense of a developer.
- Limited mchains. Those mchains have a strict limit for the number of messages. It is impossible to just push yet another message into full size-limited mchain.
Type of mchain is specified at the creation time. Once created the type of mchain cannot be changed.
There are two approaches for creation of mchain of diffrent types. The simplest approach uses create_mchain
helper function:
// Create size-unlimited mchain.
so_5::mchain_t m1 = create_mchain(env);
// Create size-limited mchain without waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m2 = create_mchain(env,
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest);
// Create size-limited mchain with waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m3 = create_mchain(env,
std::chrono::milliseconds(500),
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest);
More complex approach uses so_5::mchain_params_t
class. Type of mchain is specified by content of so_5::mchain_params_t
instance passed to so_5::environment_t::create_mchain
method. There are three helper functions which return properly initialized mchain_params_t
instances:
// Create size-unlimited mchain.
so_5::mchain_t m1 = env.create_mchain(so_5::make_unlimited_mchain_params());
// Create size-limited mchain without waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m2 = env.create_mchain(
so_5::make_limited_without_waiting_mchain_params(
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest));
// Create size-limited mchain with waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m3 = env.create_mchain(
so_5::make_limited_with_waiting_mchain_params(
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest,
std::chrono::milliseconds(500)));
The type so_5::mchain_t
is like the type so_5::mbox_t
-- it is an alias for smart intrusive pointer to so_5::abstract_message_chain_t
. It means that mchain created by create_mchain
will be destroyed automatically after the destruction of the last mchain_t object pointed to it.
Size-limited mchains have a serious difference from size-unlimited mchains: a size-limited mchain can't contain more message than the max capacity of the mchain. So there should be some reaction on an attempt to add another message to full mchain.
There could be size-limited mchains which will perform waiting for some time on an attempt of pushing new message to full mchain. If there is a free place in the mchain after that waiting then the new message will be stored into the mchain. An appropriate overload reaction will be performed otherwise.
There also could be size-limited mchains without any waiting on full mchain. If there is no free room in the mchain then an appropriate overload reaction will be performed immediately.
There are four overload reactions which can be selected for a mchain at the moment of mchain creation:
- Dropping the new message. It means that a new message will be simply ignored.
- Removing of the oldest message from the mchain. It means that the oldest message in the mchain will be removed and it will not be processed.
- Throwing an exception. An exception
so_5::exception_t
with error codeso_5::rc_msg_chain_overflow
will be raised as a result of the attempt of pushing new message to full mchain. - Aborting the whole application by calling
std::abort()
.
All those variants are described as items of enumeration so_5::mchain_props::overflow_reaction_t
.
There is yet another important property which must be specified for size-limited mchain at the creation time: the type of memory usage.
Memory for storing messages inside mchain can be used dynamically: it would be allocated when mchain grows and deallocated when mchain shrinks.
Or memory for mchain could be preallocated and there will be fixed-size buffer for messages. Size of that buffer will not change during growth and shrinking of the mchain.
Types of memory usage are described as items of enumeration so_5::mchain_props::memory_usage_t
.
All those traits of size-limited mchains are controlled by mchain_params_t
object passed to create_mchain
method. They also can be passed as arguments to helper create_mchain
functions:
so_5::environment_t & env = ...;
// Size-limited without waiting mchain.
so_5::mchain_t chain1 = create_mchain(env,
// No more than 200 messages in the chain.
200,
// Memory will be allocated dynamically.
so_5::mchain_props::memory_usage_t::dynamic,
// New messages will be ignored on chain's overflow.
so_5::mchain_props::overflow_reaction_t::drop_newest));
// Size-limited with waiting mchain.
so_5::mchain_t chain2 = create_mchain(env,
// Before dropping a new message there will be 500ms timeout
std::chrono::milliseconds(500),
// No more than 200 messages in the chain.
200,
// Memory will be preallocated.
so_5::mchain_props::memory_usage_t::preallocated,
// The oldest message will be removed on chain's overflow.
so_5::mchain_props::overflow_reaction_t::remove_oldest);
Since v.5.6.0 there is just one variant of so_5::receive
function that allows to receive and handle messages from a mchain. This version can receive and handle more than one message from mchain. It receives a so_5::mchain_receive_params_t
objects with a list of conditions and returns control if any of those conditions become true. For example:
// Handle 3 messages.
// If there are no 3 messages in the mchain then wait for them.
// The receive returns control when 3 messages will be processed or when the mchain
// will be closed explicitly.
receive( from(chain).handle_n( 3 ),
handlers... );
// Handle 3 messages but wait no more than 200ms on empty mchain.
// If mchain is empty for more than 200ms then receive returns control even
// if less than 3 messages were handled.
receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
handlers... );
// Handle all messages which are going to the mchain. But handling will be finished
// if a pause between arrivals of messages exceeds 500ms.
receive( from(chain).handle_all().empty_timeout( milliseconds(500) ),
handlers... );
// Handle all messages for 2s.
// The return from receive will be after 2s or if the mchain is closed explicitly.
receive( from(chain).handle_all().total_time( seconds(2) ),
handlers... );
// The same as previous but the return can occurs after extraction of 1000 messages.
receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
handlers... );
There is a difference between the number of extracted messages and the number of handled messages. A message is extracted from a mchain and count of extracted messages increases. But the count of handled messages is increased only if a handler for that message type is found and called. It means that the number of extracted messages can be greater than the number of handled messages.
The receive
function returns an object of so_5::mchain_receive_result_t
type. Methods of that object allow to get the number of extracted and handled messages, and also the status of receive
operation.
The usage of receive
function could look like:
so_5::send< a_supervisor::ask_status >( req_mbox );
auto r = receive(
from( chain ).handle_n(1).empty_timeout( milliseconds(200) ),
[]( a_supervisor::status_idle ) {
cout << "status: IDLE" << endl;
},
[]( a_supervisor::status_in_progress ) {
cout << "status: IN PROGRESS" << endl;
},
[]( a_supervisor::status_finished v ) {
cout << "status: finished in " << v.m_ms << "ms" << endl;
} );
if( !r.handled() )
cout << "--- no response from supervisor ---" << endl;
A mchain can be used in several receive
from different threads at the same time. It could be used for simple load balancing scheme, for example:
void worker_thread(so_5::mchain_t ch)
{
// Handle all messages until mchain will be closed.
receive(from(ch).handle_all(), handler1, handler2, handler3, ...);
}
...
// Message chain for passing messages to worker threads.
auto ch = create_mchain(env);
// Worker thread.
thread worker1{worker_thread, ch};
thread worker2{worker_thread, ch};
thread worker3{worker_thread, ch};
// Send messages to workers.
while(has_some_work())
{
so_5::send< some_message >(ch, ...);
...
}
// Close chain and finish worker threads.
close_retain_content(ch);
worker3.join();
worker2.join();
worker1.join();
Since v.5.6.0 there is just one variant of so_5::select
function that allows to receive and handle messages from several mchains. This version of select
can receive and handle more than one message from mchains. It receives a so_5::mchain_select_params_t
objects with list of conditions and returns control if any of those conditions becomes true. For example:
using namespace so_5;
mchain_t ch1 = env.create_mchain(...);
mchain_t ch2 = env.create_mchain(...);
// Receive and handle 3 messages.
// It could be 3 messages from ch1. Or 2 messages from ch1 and 1 message
// from ch2. Or 1 message from ch1 and 2 messages from ch2. And so on...
//
// If there is no 3 messages in mchains the select will wait infinitely.
// A return from select will be after handling of 3 messages or
// if all mchains are closed explicitely.
select( from_all().handle_n( 3 ),
case_( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
case_( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receive and handle 3 messages.
// If there is no 3 messages in chains the select will wait
// no more that 200ms.
// A return from select will be after handling of 3 messages or
// if all mchains are closed explicitely, or if there is no messages
// for more than 200ms.
select( from_all().handle_n( 3 ).empty_timeout( milliseconds(200) ),
case_( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
case_( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receive all messages from mchains.
// If there is no message in any of mchains then wait no more than 500ms.
// A return from select will be after explicit close of all mchains
// or if there is no messages for more than 500ms.
select( from_all().handle_all().empty_timeout( milliseconds(500) ),
case_( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
case_( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receve any number of messages from mchains but do waiting and
// handling for no more than 2s.
select( from_all().handle_all().total_time( seconds(2) ),
case_( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
case_( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receve 1000 messages from chains but do waiting and
// handling for no more than 2s.
select( from_all().extract_n( 1000 ).total_time( seconds(2) ),
case_( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
case_( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
The select
function returns an object of so_5::mchain_receive_result_t
type. Methods of that object allow to get the number of extracted and handled messages, and also the status of select
operation.
It is possible to use a mchain in several select
on different threads at the same time.
NOTE! A mchain can be used inside just one case_
statement in select
. If the same mchain is used in several case_
statements then the behavior of select
is undefined. Please note also that select
doesn't check uniqueness of mchains in case_
statements (for performance reasons).
Message handlers which are passed to receive
and select
functions must be lambda-functions or functional objects with format:
void handler(const message_type&);
void handler(message_type);
void handler(const so_5::mhood_t<message_type>&);
void handler(so_5::mhood_t<message_type>);
For example:
struct classical_message : public so_5::message_t { ... };
struct user_message { ... };
...
receive(from(chain).handle_all(),
[](const classical_message & m) {...},
[](const user_message & m) {...},
[](const int & m) {...},
[](long m) {...});
A handler for signal of type signal_type
should use mhood_t<signal_type>
as the type of the argument:
struct stopped : public so_5::signal_t {};
struct waiting : public so_5::signal_t {};
...
receive(chain, so_5::infinite_wait,
[](const classical_message & m) {...},
[](const user_message & m) {...},
[](so_5::mhood_t<stopped>) {...} ),
[](so_5::mhood_t<waiting>) {...} ));
If so_5::mhood_t<T>
message wrapper is used the type T can be type of message or signal:
struct check_status : public so_5::signal_t {};
struct reconfig { std::string config_file; };
...
receive(chain, so_5::infinite_wait,
[](const mhood_t<reconfig> & msg) {...},
[](mhood_t<check_status>) {...},
... );
NOTE! All message handlers must handle different message types. It is an error if some handlers are defined for the same message type.
All traditional send-functions like so_5::send
, so_5::send_delayed
and so_5::send_periodic
work with mchains the same way they work with mboxes. It allows to write code the traditional way:
so_5::mchain_t ch = env.create_mchain(...);
...
// Sending a message.
so_5::send<my_message>(ch, ... /* some args for my_message's constructor */);
// Sending a delayed message.
so_5::send_delayed<my_message>(ch, std::chrono::milliseconds(200),
... /* some args for my_message's constructor */);
// Sending a periodic message.
auto timer_id = so_5::send_periodic<my_message>(ch,
std::chrono::milliseconds(200),
std::chrono::milliseconds(250),
... /* some args for my_message's constructor */);
There is a method so_5::abstract_message_chain_t::as_mbox()
which can represent mchain as almost ordinary mbox. This method returns so_5::mbox_t
and this mbox can be used for sending messages and performing service request to the mchain.
Method as_mbox()
can be useful if there is a necessity to hide the fact of mchain existence. For example, an agent inside SObjectizer part of an application can receive mbox and think that there is another agent on the opposite side:
// SObjectizer part of an application.
class worker : so_5::agent_t {
public:
worker(context_t ctx, so_5::mbox_t request_mbox)
: so_5::agent_t{ctx}, m_request_mbox{std::move(request_mbox)}
{}
...
private:
const so_5::mbox_t m_request_mbox;
...
void some_event_handler() {
auto f = so_5::request_future<response, request>(m_request_mbox,
... /* some args for request's constructor */);
...
handle_response(f.get());
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<worker>(
// Passing mchain as mbox to worker agent.
ch->as_mbox());
...
} );
...
receive(from(ch).handle_n(1000),
[](const request & req) {...},
...);
The only difference between ordinary mbox created by so_5::environment_t::create_mbox()
and mbox created by as_mbox()
is impossibility of subscriptions creation. It means that agent worker
from the example above can't subscribe its event handlers to messages from m_request_mbox
.
The only case when mchain can be passed to SObjectizer part directly as mchain (without casting it to mbox by as_mbox()
) is a necessity of explicit closing of mchain in the SObjectizer part of an application:
class data_producer : public so_5::agent_t {
public:
data_producer(context_t ctx, so_5::mchain_t data_ch)
: so_5::agent_t{ctx}, m_data_ch{std::move(data_ch)}
{}
...
private:
const so_5::mchain_t m_data_ch;
...
void read_data_event() {
auto data = read_next_data();
if(data_read())
so_5::send<data_sample>(m_data_ch, data);
else {
// Data source is closed. No more data can be received.
// Close the data chain and finish out work.
close_retain_content(m_data_ch);
so_deregister_coop_normally();
}
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<data_reader>(
// Passing mchain as mchain to data_reader agent.
ch);
...
} );
...
// Receive and handle all data samples until the chain will be closed.
receive(from(ch).handle_all(), [](const data_sample & data) {...});
There is a possibility to specify a function which will be called automatically when a message in stored into the empty mchain:
so_5::environment_t & env = ...;
so_5::mchain_t ch = env.create_mchain(
so_5::make_unlimited_mchain_params().non_empty_notificator(
[]{ ... /* some stuff */ }));
This feature can be used in GUI applications for example. Some widget can create mchain and needs to know when there are messages in that mchain. Then widget can add notificator to the mchain and send some GUI-message/signal to itself from that notificator. Some of receive
functions will be called in the processing of that GUI-message/signal.