SO 5.8 InDepth Message Chains - Stiffstream/sobjectizer GitHub Wiki
- Purpose
- More Details
Created by gh-md-toc
The main purpose of message chain (or just mchain) mechanism is providing a way for interacting between SObjectizer- and non-SObjectizer-part of an application. The interaction in the opposite direction is very simple: usual message passing via mboxes is used. For example:
class worker : public so_5::agent_t {
public:
...
void so_define_agent() override {
// A mbox to be used to receive requests.
auto mbox = so_environment().create_mbox("requests");
// Subscription to messages from that mbox...
so_subscribe(mbox).event(&worker::request_handler1)
.event(&worker::request_handler2)
.event(&worker::request_handler3)
...;
}
...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
launch_agents(...); // Workers will be started here.
// Create a mbox for interaction with workers.
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
so_5::send<request1>(mbox, ...);
so_5::send<request2>(mbox, ...);
...
But how to receive some messages from an agent back to non-SObjectizer-part of the application?
Message chain is the answer.
Message chain looks almost like mbox for agents. An agent can send messages to mchain exactly the same way as for mbox. So mchain can be passed to agent and agent will use it as the destination for reply messages. On the other side of a mchain will be non-SObjectizer message handler. This handler will receive messages from the mchain by using special API functions and handle them appropriately.
class worker : public so_5::agent_t {
public:
worker(context_t ctx, so_5::mbox_t reply_to)
: so_5::agent_t{ctx}, m_reply_to{std::move(reply_to)}
{...}
...
void so_define_agent() override {
// A mbox to be used to receive requests.
auto mbox = so_environment().create_mbox("requests");
// Subscription to messages from that mbox...
so_subscribe(mbox).event(&worker::request_handler1)
.event(&worker::request_handler2)
.event(&worker::request_handler3)
...;
}
private:
// A mbox for sending responses.
const so_5::mbox_t m_reply_to;
...
// Request handler.
void request_handler1(const request1 & r) {
... // Some processing.
// Sending response back as a message.
so_5::send<reply1>(m_reply_to, ...);
}
...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
// Chain for receiving responses.
// Simplest mchain without any limitation will be created.
so_5::mchain_t reply_ch = create_mchain(sobj);
launch_agents(
// Passing mchain as a parameter.
// Cast it to mbox to look exactly like ordinary mbox.
reply_ch.as_mbox(), ...);
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
// And receive responses.
so_5::send<request1>(mbox, ...);
receive(from(reply_ch).handle_n(1), [](const reply1 &) {...});
so_5::send<request2>(mbox, ...);
receive(from(reply_ch).handle_n(1), [](const reply2 &) {...});
...
There are two types of mchains:
- Unlimited mchains. Those mchains have no limitation for the number of messages to be stored inside mchain. This number is limited only by the amount of available memory and by the common sense of a developer.
- Limited mchains. Those mchains have a strict limit for the number of messages. It is impossible to just push yet another message into full size-limited mchain.
Type of mchain is specified at the creation time. Once created the type of mchain cannot be changed.
There are two approaches for creation of mchain of diffrent types. The simplest approach uses create_mchain
helper function:
// Create size-unlimited mchain.
so_5::mchain_t m1 = create_mchain(env);
// Create size-limited mchain without waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m2 = create_mchain(env,
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest);
// Create size-limited mchain with waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m3 = create_mchain(env,
std::chrono::milliseconds(500),
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest);
More complex approach uses so_5::mchain_params_t
class. Type of mchain is specified by content of so_5::mchain_params_t
instance passed to so_5::environment_t::create_mchain
method. There are three helper functions which return properly initialized mchain_params_t
instances:
// Create size-unlimited mchain.
so_5::mchain_t m1 = env.create_mchain(so_5::make_unlimited_mchain_params());
// Create size-limited mchain without waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m2 = env.create_mchain(
so_5::make_limited_without_waiting_mchain_params(
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest));
// Create size-limited mchain with waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m3 = env.create_mchain(
so_5::make_limited_with_waiting_mchain_params(
200,
so_5::mchain_props::memory_usage_t::dynamic,
so_5::mchain_props::overflow_reaction_t::drop_newest,
std::chrono::milliseconds(500)));
The type so_5::mchain_t
is like the type so_5::mbox_t
-- it is an alias for smart intrusive pointer to so_5::abstract_message_chain_t
. It means that mchain created by create_mchain
will be destroyed automatically after the destruction of the last mchain_t object pointed to it.
Size-limited mchains have a serious difference from size-unlimited mchains: a size-limited mchain can't contain more message than the max capacity of the mchain. So there should be some reaction on an attempt to add another message to full mchain.
There could be size-limited mchains which will perform waiting for some time on an attempt of pushing new message to full mchain. If there is a free place in the mchain after that waiting then the new message will be stored into the mchain. An appropriate overload reaction will be performed otherwise.
There also could be size-limited mchains without any waiting on full mchain. If there is no free room in the mchain then an appropriate overload reaction will be performed immediately.
There are four overload reactions which can be selected for a mchain at the moment of mchain creation:
- Dropping the new message. It means that a new message will be simply ignored.
- Removing of the oldest message from the mchain. It means that the oldest message in the mchain will be removed and it will not be processed.
- Throwing an exception. An exception
so_5::exception_t
with error codeso_5::rc_msg_chain_overflow
will be raised as a result of the attempt of pushing new message to full mchain. - Aborting the whole application by calling
std::abort()
.
All those variants are described as items of enumeration so_5::mchain_props::overflow_reaction_t
.
There is yet another important property which must be specified for size-limited mchain at the creation time: the type of memory usage.
Memory for storing messages inside mchain can be used dynamically: it would be allocated when mchain grows and deallocated when mchain shrinks.
Or memory for mchain could be preallocated and there will be fixed-size buffer for messages. Size of that buffer will not change during growth and shrinking of the mchain.
Types of memory usage are described as items of enumeration so_5::mchain_props::memory_usage_t
.
All those traits of size-limited mchains are controlled by mchain_params_t
object passed to create_mchain
method. They also can be passed as arguments to helper create_mchain
functions:
so_5::environment_t & env = ...;
// Size-limited without waiting mchain.
so_5::mchain_t chain1 = create_mchain(env,
// No more than 200 messages in the chain.
200,
// Memory will be allocated dynamically.
so_5::mchain_props::memory_usage_t::dynamic,
// New messages will be ignored on chain's overflow.
so_5::mchain_props::overflow_reaction_t::drop_newest));
// Size-limited with waiting mchain.
so_5::mchain_t chain2 = create_mchain(env,
// Before dropping a new message there will be 500ms timeout
std::chrono::milliseconds(500),
// No more than 200 messages in the chain.
200,
// Memory will be preallocated.
so_5::mchain_props::memory_usage_t::preallocated,
// The oldest message will be removed on chain's overflow.
so_5::mchain_props::overflow_reaction_t::remove_oldest);
Sending delayed/periodic messages to a mchain is treated differently:
-
there is no waiting if the mchain is full even if such waiting is specified for the chain. It's because the timer thread where the send is performed can't be suspended; all fired delayed/periodic messages have to be delivered by the timer thread as quickly as possible;
-
overflow_reaction_t::throw_exception is replaced by overflow_reaction_t::drop_newest. It means that exceptions are never thrown for attempts to push a message into full mchain if the timer thread performs that push.
Since v.5.6.0 there is just one variant of so_5::receive
function that allows to receive and handle messages from a mchain. This version can receive and handle more than one message from mchain. It receives a so_5::mchain_receive_params_t
objects with a list of conditions and returns control if any of those conditions become true. For example:
// Handle 3 messages.
// If there are no 3 messages in the mchain then wait for them.
// The receive returns control when 3 messages will be processed or when the mchain
// will be closed explicitly.
receive( from(chain).handle_n( 3 ),
handlers... );
// Handle 3 messages but wait no more than 200ms on empty mchain.
// If mchain is empty for more than 200ms then receive returns control even
// if less than 3 messages were handled.
receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
handlers... );
// Handle all messages which are going to the mchain. But handling will be finished
// if a pause between arrivals of messages exceeds 500ms.
receive( from(chain).handle_all().empty_timeout( milliseconds(500) ),
handlers... );
// Handle all messages for 2s.
// The return from receive will be after 2s or if the mchain is closed explicitly.
receive( from(chain).handle_all().total_time( seconds(2) ),
handlers... );
// The same as previous but the return can occurs after extraction of 1000 messages.
receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
handlers... );
There is a difference between the number of extracted messages and the number of handled messages. A message is extracted from a mchain and count of extracted messages increases. But the count of handled messages is increased only if a handler for that message type is found and called. It means that the number of extracted messages can be greater than the number of handled messages.
The receive
function returns an object of so_5::mchain_receive_result_t
type. Methods of that object allow to get the number of extracted and handled messages, and also the status of receive
operation.
The usage of receive
function could look like:
so_5::send< a_supervisor::ask_status >( req_mbox );
auto r = receive(
from( chain ).handle_n(1).empty_timeout( milliseconds(200) ),
[]( a_supervisor::status_idle ) {
cout << "status: IDLE" << endl;
},
[]( a_supervisor::status_in_progress ) {
cout << "status: IN PROGRESS" << endl;
},
[]( a_supervisor::status_finished v ) {
cout << "status: finished in " << v.m_ms << "ms" << endl;
} );
if( !r.handled() )
cout << "--- no response from supervisor ---" << endl;
A mchain can be used in several receive
from different threads at the same time. It could be used for simple load balancing scheme, for example:
void worker_thread(so_5::mchain_t ch)
{
// Handle all messages until mchain will be closed.
receive(from(ch).handle_all(), handler1, handler2, handler3, ...);
}
...
// Message chain for passing messages to worker threads.
auto ch = create_mchain(env);
// Worker thread.
thread worker1{worker_thread, ch};
thread worker2{worker_thread, ch};
thread worker3{worker_thread, ch};
// Send messages to workers.
while(has_some_work())
{
so_5::send< some_message >(ch, ...);
...
}
// Close chain and finish worker threads.
so_5::close_retain_content(so_5::terminate_if_throws, ch);
worker3.join();
worker2.join();
worker1.join();
Since v.5.6.0 there is just one variant of so_5::select
function that allows to receive and handle messages from several mchains. This version of select
can receive and handle more than one message from mchains. It receives a so_5::mchain_select_params_t
objects with list of conditions and returns control if any of those conditions becomes true. For example:
using namespace so_5;
mchain_t ch1 = env.create_mchain(...);
mchain_t ch2 = env.create_mchain(...);
// Receive and handle 3 messages.
// It could be 3 messages from ch1. Or 2 messages from ch1 and 1 message
// from ch2. Or 1 message from ch1 and 2 messages from ch2. And so on...
//
// If there is no 3 messages in mchains the select will wait infinitely.
// A return from select will be after handling of 3 messages or
// if all mchains are closed explicitely.
select( from_all().handle_n( 3 ),
receive_case( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
receive_case( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receive and handle 3 messages.
// If there is no 3 messages in chains the select will wait
// no more that 200ms.
// A return from select will be after handling of 3 messages or
// if all mchains are closed explicitely, or if there is no messages
// for more than 200ms.
select( from_all().handle_n( 3 ).empty_timeout( milliseconds(200) ),
receive_case( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
receive_case( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receive all messages from mchains.
// If there is no message in any of mchains then wait no more than 500ms.
// A return from select will be after explicit close of all mchains
// or if there is no messages for more than 500ms.
select( from_all().handle_all().empty_timeout( milliseconds(500) ),
receive_case( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
receive_case( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receve any number of messages from mchains but do waiting and
// handling for no more than 2s.
select( from_all().handle_all().total_time( seconds(2) ),
receive_case( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
receive_case( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
// Receve 1000 messages from chains but do waiting and
// handling for no more than 2s.
select( from_all().extract_n( 1000 ).total_time( seconds(2) ),
receive_case( ch1,
[]( const first_message_type & msg ) { ... },
[]( const second_message_type & msg ) { ... } ),
receive_case( ch2,
[]( const third_message_type & msg ) { ... },
[]( so_5::mhood_t< some_signal_type > ) { ... } ),
... ) );
Since v.5.7.0 the select
function returns an object of so_5::mchain_select_result_t
type. Methods of that object allow to get the number of extracted and handled messages, and also the number of sent messages.
It is possible to use a mchain in several select
on different threads at the same time.
NOTE! A mchain can be used inside just one receive_case
statement in select
. If the same mchain is used in several receive_case
statements then the behavior of select
is undefined. Please note also that select
doesn't check uniqueness of mchains in receive_case
statements (for performance reasons).
Since v.5.7.0 select()
function can be used for sending messages into mchains. For example:
using namespace so_5;
auto ch = env.create_mchain();
select( from_all().handle_n(1),
send_case(
// Target chain for the new message.
ch,
// Message instance to be sent to the target chain.
message_holder_t<Msg>::make(...),
// Handler that will be called after a successful send.
[]() {...}) );
Sending a message via select()
function can be useful because of:
- an attempt to send a message to a full mchain won't invoke overload reaction for that mchain. For example if the target mchain is created with
abort_app
overflow reaction then an attempt to send a message to the full mchain via ordinarysend()
function can lead to the abortion of the whole application. But an attempt to send a message to the same full mchain viaselect()
withsend_case()
will suspend the execution of the current threa until there will be free room in the target mchain or until the target mchain will be closed (or timeouts forselect()
expire); -
send_case()
inselect()
can be used together withreceive_case()
and that allows to handle incoming messages whileselect()
waits a possibility to send outgoing message to the target mchain.
Those benefits can be seen in the following example of calculation of Fibonacci numbers in a separate thread. Please note that work of fibonacci()
function will be suspended on select()
and send_case()
if main()
has not read the previous number from values_ch
chain.
#include <so_5/all.hpp>
#include <chrono>
using namespace std;
using namespace std::chrono_literals;
using namespace so_5;
struct quit {};
void fibonacci( mchain_t values_ch, mchain_t quit_ch )
{
int x = 0, y = 1;
mchain_select_result_t r;
do
{
r = select(
from_all().handle_n(1),
// Sends a new message of type 'int' with value 'x' inside
// when values_ch is ready for a new outgoing message.
send_case( values_ch, message_holder_t<int>::make(x),
[&x, &y] { // This block of code will be called after the send().
auto old_x = x;
x = y; y = old_x + y;
} ),
// Receive a 'quit' message from quit_ch if it is here.
receive_case( quit_ch, [](quit){} ) );
}
// Continue the loop while we send something and receive nothing.
while( r.was_sent() && !r.was_handled() );
}
int main()
{
wrapped_env_t sobj;
thread fibonacci_thr;
auto thr_joiner = auto_join( fibonacci_thr );
// The chain for Fibonacci number will have limited capacity.
auto values_ch = create_mchain( sobj, 1s, 1,
mchain_props::memory_usage_t::preallocated,
mchain_props::overflow_reaction_t::abort_app );
auto quit_ch = create_mchain( sobj );
auto ch_closer = auto_close_drop_content( values_ch, quit_ch );
fibonacci_thr = thread{ fibonacci, values_ch, quit_ch };
// Read the first 10 numbers from values_ch.
receive( from( values_ch ).handle_n( 10 ),
// And show every number to the standard output.
[]( int v ) { cout << v << endl; } );
send< quit >( quit_ch );
}
The select()
function can have several send_case()
inside:
select(from_all().handle_all(),
send_case(ch1, message_holder_t<FirstMsg>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMsg>::make(...), []{...}),
send_case(ch3, message_holder_t<ThirdMsg>::make(...), []{...}));
In that example select()
returns only when all send attempt will be completed (a message will be sent to the corresponding chain or that chain will be closed).
Attention. The behavior of select()
is undefined if there are several different send_case()
for the same mchain.
The send_case()
requires an already created instance of a message to be sent in the form of so_5::message_holder_t<T>
smart-pointer. Usually that instance is created by using message_holder_t<T>::make()
factory method:
struct my_message {
int a_;
double b_;
};
select(from_all().handle_n(1),
send_case(ch1, message_holder_t<my_message>::make(1, 12.42), []{...}),
...);
But this instance can be created earlier:
auto msg = message_holder_t<my_message>::make(1, 12.42);
...
select(from_all().handle_n(1),
send_case(ch1, std::move(msg), []{...}),
...);
NOTE. If a message wasn't sent by send_case()
it will be thrown away on return from select()
function. For example, in the following code at most one message will be sent and two other messages will be thrown away:
select(from_all().handle_n(1),
send_case(ch1, message_holder_t<FirstMessage>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMessage>::make(...), []{...}),
send_case(ch3, message_holder_t<ThirdMessage>::make(...), []{...}));
The Role Of handle_n(), handle_all(), empty_timeout() And Other Modificators For select() With send_case()
Modificators handle_n()
, handle_all()
, extract_n()
, empty_timeout()
, no_wait_on_empty()
, total_time()
play their usual role even if send_case()
is used in select()
. For example:
// Return after a successful send to just one of the chains.
// Or if all chains are closed.
select(from_all().handle_n(1),
send_case(ch1, message_holder_t<FirstMessage>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMessage>::make(...), []{...}),
send_case(ch3, message_holder_t<ThirdMessage>::make(...), []{...}));
// Return after the completion of all three sends.
// A send operation call be successful (message was sent) or failed
// (chain was closed).
select(from_all().handle_n(3),
send_case(ch1, message_holder_t<FirstMessage>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMessage>::make(...), []{...}),
send_case(ch3, message_holder_t<ThirdMessage>::make(...), []{...}));
// Same as above.
select(from_all().handle_all(),
send_case(ch1, message_holder_t<FirstMessage>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMessage>::make(...), []{...}),
send_case(ch3, message_holder_t<ThirdMessage>::make(...), []{...}));
Additional care should be taken for handle_all()
if send_case()
is used with receive_case()
:
// Return after the completion of the send operation and reading the whole
// content of chR chain (e.g. until chR chain will be closed).
// NOTE that send operation will be performed only once.
select(from_all().handle_all(),
send_case(chW, message_holder_t<MyMsg>::make(...), []{...}),
receive_case(chR, ...));
If handle_n()
is used for select()
with send_
and receive_case()
then all operations are taken into account. For example, the following select()
:
select(from_all().handle_n(2),
send_case(ch1, message_holder_t<FirstMsg>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMsg>::make(...), []{...}),
receive_case(ch3, [](...){...}),
receive_case(ch4, [](...){...}));
can be completed if:
- messages were sent to ch1 and ch2;
- an outgoing message was sent to ch1 and an incoming message was read and handled from ch3;
- incoming messages were read and handled from ch3 and ch4;
- two incoming messages were read from ch4 (or ch3);
- and so on.
NOTE. Modificator extract_n()
will be taken into the account only if at least one receive_case()
is used in select()
.
Modificators empty_timeout()
and no_wait_on_empty()
can be seen as full_timeout()
and no_wait_on_full()
:
// Try to send a message to chW.
// Wait no more than 150ms if chW is full.
select(from_all().handle_n(1).empty_timeout(150ms),
send_case(chW, message_holder_t<Msg>::make(...), []{...}));
// Try to send a message to chW.
// Do not wait if chW is full.
select(from_all().handle_n(1).no_wait_on_empty(),
send_case(chW, message_holder_t<Msg>::make(...), []{...}));
Modificator total_time()
limits the time for the whole select()
call. For example:
// This select() completes when all send operations are done or after 250ms.
select(from_all().handle_all().total_time(250ms),
send_case(ch1, message_holder_t<FirstMessage>::make(...), []{...}),
send_case(ch2, message_holder_t<SecondMessage>::make(...), []{...}),
send_case(ch3, message_holder_t<ThirdMessage>::make(...), []{...}));
Message handlers which are passed to receive
and select
functions must be lambda-functions or functional objects with format:
void handler(const message_type&);
void handler(message_type);
void handler(const so_5::mhood_t<message_type>&);
void handler(so_5::mhood_t<message_type>);
For example:
struct classical_message : public so_5::message_t { ... };
struct user_message { ... };
...
receive(from(chain).handle_all(),
[](const classical_message & m) {...},
[](const user_message & m) {...},
[](const int & m) {...},
[](long m) {...});
A handler for signal of type signal_type
should use mhood_t<signal_type>
as the type of the argument:
struct stopped : public so_5::signal_t {};
struct waiting : public so_5::signal_t {};
...
receive(chain, so_5::infinite_wait,
[](const classical_message & m) {...},
[](const user_message & m) {...},
[](so_5::mhood_t<stopped>) {...} ),
[](so_5::mhood_t<waiting>) {...} ));
If so_5::mhood_t<T>
message wrapper is used the type T can be type of message or signal:
struct check_status : public so_5::signal_t {};
struct reconfig { std::string config_file; };
...
receive(chain, so_5::infinite_wait,
[](const mhood_t<reconfig> & msg) {...},
[](mhood_t<check_status>) {...},
... );
NOTE! All message handlers must handle different message types. It is an error if some handlers are defined for the same message type.
All traditional send-functions like so_5::send
, so_5::send_delayed
and so_5::send_periodic
work with mchains the same way they work with mboxes. It allows to write code the traditional way:
so_5::mchain_t ch = env.create_mchain(...);
...
// Sending a message.
so_5::send<my_message>(ch, ... /* some args for my_message's constructor */);
// Sending a delayed message.
so_5::send_delayed<my_message>(ch, std::chrono::milliseconds(200),
... /* some args for my_message's constructor */);
// Sending a periodic message.
auto timer_id = so_5::send_periodic<my_message>(ch,
std::chrono::milliseconds(200),
std::chrono::milliseconds(250),
... /* some args for my_message's constructor */);
There is a method so_5::abstract_message_chain_t::as_mbox()
which can represent mchain as almost ordinary mbox. This method returns so_5::mbox_t
and this mbox can be used for sending messages and performing service request to the mchain.
Method as_mbox()
can be useful if there is a necessity to hide the fact of mchain existence. For example, an agent inside SObjectizer part of an application can receive mbox and think that there is another agent on the opposite side:
// SObjectizer part of an application.
class worker : so_5::agent_t {
public:
worker(context_t ctx, so_5::mbox_t request_mbox)
: so_5::agent_t{ctx}, m_request_mbox{std::move(request_mbox)}
{}
...
private:
const so_5::mbox_t m_request_mbox;
...
void some_event_handler() {
auto f = so_5::request_future<response, request>(m_request_mbox,
... /* some args for request's constructor */);
...
handle_response(f.get());
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<worker>(
// Passing mchain as mbox to worker agent.
ch->as_mbox());
...
} );
...
receive(from(ch).handle_n(1000),
[](const request & req) {...},
...);
The only difference between ordinary mbox created by so_5::environment_t::create_mbox()
and mbox created by as_mbox()
is impossibility of subscriptions creation. It means that agent worker
from the example above can't subscribe its event handlers to messages from m_request_mbox
.
The only case when mchain can be passed to SObjectizer part directly as mchain (without casting it to mbox by as_mbox()
) is a necessity of explicit closing of mchain in the SObjectizer part of an application:
class data_producer : public so_5::agent_t {
public:
data_producer(context_t ctx, so_5::mchain_t data_ch)
: so_5::agent_t{ctx}, m_data_ch{std::move(data_ch)}
{}
...
private:
const so_5::mchain_t m_data_ch;
...
void read_data_event() {
auto data = read_next_data();
if(data_read())
so_5::send<data_sample>(m_data_ch, data);
else {
// Data source is closed. No more data can be received.
// Close the data chain and finish out work.
so_5::close_retain_content(so_5::exceptions_enabled, m_data_ch);
so_deregister_coop_normally();
}
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<data_reader>(
// Passing mchain as mchain to data_reader agent.
ch);
...
} );
...
// Receive and handle all data samples until the chain will be closed.
receive(from(ch).handle_all(), [](const data_sample & data) {...});
There is a possibility to specify a function which will be called automatically when a message in stored into the empty mchain:
so_5::environment_t & env = ...;
so_5::mchain_t ch = env.create_mchain(
so_5::make_unlimited_mchain_params().non_empty_notificator(
[]{ ... /* some stuff */ }));
This feature can be used in GUI applications for example. Some widget can create mchain and needs to know when there are messages in that mchain. Then widget can add notificator to the mchain and send some GUI-message/signal to itself from that notificator. Some of receive
functions will be called in the processing of that GUI-message/signal.