SO 5.8 QuickHowTos - Stiffstream/sobjectizer GitHub Wiki
- How To Run SObjectizer's Environment?
- How To Stop SObjectizer's Environment?
- How To Run SObjectizer's Environment In Single-Threaded Mode?
- How To Make A New Cooperation?
- How To Make A New Child Cooperation?
- How To Deregister A Cooperation?
- How To Bind An Agent To A Dispatcher
- How To Create A Multi-Producer/Single-Consumer Mbox For An Agent?
- How To Create Multi-Producer/Multi-Consumer Mbox?
- How To Make Subscription For An Agent
- How To Check Presence Of A Subscription?
- How To Delete A Subscription?
- How To Setup A Delivery Filter?
- How To Remove A Delivery Filter?
- How To Setup Message Limits For An Agent?
- How To Assign The Priority For An Agent?
- How To Turn Message Delivery Tracing On?
- How To Work With mchains?
Created by gh-md-toc
Running SObjectizer's Environment with default Environment's params:
#include <so_5/all.hpp>
int main() {
so_5::launch([](so_5::environment_t & env) {
... // Some initialization actions.
});
...
}
Running SObjectizer's Environment with modified Environment's params:
#include <so_5/all.hpp>
int main() {
so_5::launch([](so_5::environment_t & env) {
... // Some initialization actions.
},
[](so_5::environment_params_t & params) {
... // Parameters' tuning.
});
...
}
In that case the second lambda will be executed first and only then SObjectizer's Environment will be started with the first lambda as an initial action.
Running SObjectizer's Environment with default parameters and without any initial actions:
#include <so_5/all.hpp>
int main() {
so_5::wrapped_env_t sobj; // SObjectizer is started on separate thread here.
... // Some actions.
return 0; // SObjectizer will be stopped automatically here.
}
Running SObjectizer's Environment with default parameters and with an initial action:
#include <so_5/all.hpp>
int main() {
so_5::wrapped_env_t sobj{
[](so_5::environment_t & env) {
... // Some initial actions.
}}; // SObjectizer is started on separate thread here.
... // Some actions.
return 0; // SObjectizer will be stopped automatically here.
}
Running SObjectizer's Environment with modified parameters but without any initial actions:
#include <so_5/all.hpp>
so_5::environment_params_t make_params() {
so_5::environment_params_t result;
... // Parameters' tuning.
return result;
}
int main() {
so_5::wrapped_env_t sobj{ make_params() }; // SObjectizer is started on separate thread here.
... // Some actions.
return 0; // SObjectizer will be stopped automatically here.
}
Running SObjectizer's Environment with parameters tuning and an initial action:
#include <so_5/all.hpp>
int main() {
so_5::wrapped_env_t sobj{
[](so_5::environment_t & env) {
... // Some initial actions.
},
[](so_5::environment_params_t & env) {
... // Parameters' tuning.
}}; // SObjectizer is started on separate thread here.
... // Some actions.
return 0; // SObjectizer will be stopped automatically here.
}
By default SObjectizer's Environment uses autoshutdown mode. In this mode Environment automatically shuts down when there is no more live and working cooperations.
NOTE. Autoshutdown mode can be turned off by using so_5::environment_params_t::disable_autoshutdown()
method.
Call to so_t::environment_t::stop()
sends a shutdown signal to SObjectizer's Environment. Environment then deregisters all working coops, waits for completeness of deregistration process (it can take some time) and finishes its work.
An example of shutting the Environment down from an agent:
class demo_agent : public so_5::agent_t {
...
void on_some_event(mhood_t<some_msg> cmd) {
...
if(some_final_condition())
// Finish work of SObjectizer.
so_environment().stop();
}
};
If so_5::wrapped_env_t
is used for running SObjectizer's Environment then stop()
and stop_then_join()
methods of wrapped_env_t
can be used for stopping SObjectizer.
An example of using wrapped_env_t::stop()
method:
int main() {
so_5::wrapped_env_t sobj{ ... }; // SObjectizer is started on separate thread.
... // Some work on the main work thread.
// Send a shutdown signal to SObjectizer.
sobj.stop();
... // Continue some work on the main thread.
// SObjectizer performs shutdown actions in parallel.
// Wait complete shutdown of SObjectizer.
sobj.join(); // Now SObjectizer is stopped.
...
}
An example of using wrapped_env_t::stop_then_join()
method:
int main() {
so_5::wrapped_env_t sobj{ ... }; // SObjectizer is started on separate thread.
... // Some work on the main work thread.
// Send a shutdown signal to SObjectizer.
// And wait for completeness of SObjectizer's shutdown.
sobj.stop_then_join(); // Now SObjectizer is stopped.
...
}
NOTE. wrapped_env_t::stop_then_join()
is automatically called in wrapped_env_t
's destructor.
It is possible to run SObjectizer in single-threaded mode by specifying an appropriate Environment Infrastructure factory:
int main() {
so_5::launch(
[](so_5::environment_t & env) {...}, // Initial actions.
[](so_5::environment_params_t & params) {
params.infrastructure_factory(
so_5::env_infrastructures::simple_mtsafe::factory());
});
...
}
See Environment-Infrastructures for more information on this topic.
The easiest way of creation of new coop is usage of so_5::environment_t::introduce_coop()
helper function:
// The default dispatcher will be used for binding.
env.introduce_coop( []( so_5::coop_t & coop ) {
coop.make_agent< first_agent >(...);
coop.make_agent< second_agent >(...);
});
// For the case when a dispatcher binder is specified.
env.introduce_coop(
so_5::disp::active_obj::make_dispatcher( env ).binder(),
[]( so_5::coop_t & coop ) {
coop.make_agent< first_agent >(...);
coop.make_agent< second_agent >(...);
} );
There is an old way of creation and registration of a coop which still can be useful in some cases:
so_5::environmet_t & env = ...;
{
// A binder for the default dispatcher will be used.
auto coop = env.make_coop();
coop.make_agent<first_agent>(...);
coop.make_agent<second_agent>(...);
// Registration of the coop.
env.register_coop(std::move(coop));
}
{
// Create with a dispatcher binder to be used by default.
auto coop = env.make_coop(
so_5::disp::active_obj::make_dispatcher(env).binder());
...
env.register_coop(std::move(coop));
}
If a coop should contain just one agent then so_5::environment_t::register_agent_as_coop()
method can be used:
so_5::environment_t & env = ...;
// A binder for the default dispatcher will be used.
env.register_agent_as_coop(
env.make_agent<my_agent>(...));
// A binder for a new private dispatcher will be used.
env.register_agent_as_coop(
env.make_agent<my_agent>(...),
so_5::disp::active_obj::make_dispatcher(env).binder());
The simplest way of creation of a child coop is to use so_5::introduce_child_coop()
helper function. It has two forms. The first one receives a reference to an agent from parent coop:
class owner : public so_5::agent_t {
public :
...
void so_evt_start() override {
so_5::introduce_child_coop(*this, [](so_5::coop_t & coop) {
coop.make_agent<worker>();
});
}
};
The second form receives a reference to the parent coop:
// Registration of the parent coop.
auto parent_coop_obj = env.make_coop();
... // Filling the parent_coop_obj.
so_5::coop_handle_t parent_coop_handle = env.register_coop(std::move(parent_coop_obj);
// Registration of a child coop.
so_5::introduce_child_coop(
parent_coop_handle,
[](so_5::coop_t & child) {
child.make_agent<worker>();
...
});
...
A child coop can be created by using an old way with create_coop
and register_coop
methods:
class owner : public so_5::agent_t {
public :
...
void so_evt_start() override {
auto child = so_environment().make_coop(so_coop()); // owner's coop as the parent.
child->make_agent<worker>();
so_environment().register_coop(std::move(child));
}
};
Calling so_5::agent_t::so_deregister_agent_coop() or so_5::agent_t::so_deregister_agent_coop_normally() Methods
An agent can deregister the coop it belongs by calling so_5::agent_t::so_deregister_agent_coop()
or so_5::agent_t::so_deregister_agent_coop_normally()
. For example:
class demo : public so_5::agent_t {
...
void on_some_event(mhood_t<some_msg> cmd) {
try {
... // Some processing.
if(no_more_work_left())
// Normal deregistration of the coop.
so_deregister_agent_coop_normally();
}
catch(...) {
// Some error.
// Deregister the coop with special 'exit code'.
so_deregister_agent_coop(so_5::dereg_reason::user_defined_reason+10);
}
}
};
Coop can be deregistered by its name via so_5::environment_t::deregister_coop()
method. It can be useful if decision about coop's deregistration is made outside of the coop. For example:
class worker : public so_5::agent_t {
public:
// This message will be sent to the parent agent if there is no more work.
struct no_more_work { const so_5::coop_handle_t coop_; }
...
void on_some_event(mhood_t<some_msg> cmd) {
...
if(no_more_work_left())
so_5::send<no_more_work>(parent_, so_coop());
}
};
class parent : public so_5::agent_t {
...
void on_no_work_for_worker(mhood_t<worker::no_more_work> cmd) {
// Worker can be safely destroyed now.
so_environment().deregister_coop(cmd->coop_, so_5::dereg_reason::normal);
}
};
Every agent coop has the default binder. This binder can be used when agent without a specific dispatcher binder is added. The default binder for the coop is specified at the coop creation. For example:
so_5::environment_t & env = ...;
// Create a new coop with a specific binder as the default binder.
env.introduce_coop(
so_5::disp::active_obj::make_dispatcher(env).binder(),
[](so_5::coop_t & coop) {
// All agents will be bound to the same dispatcher.
// It is because they will use the default binder of the coop.
coop.make_agent<first_agent>(...);
coop.make_agent<second_agent>(...);
});
If the default binder for the coop is not specified then a binder for the default SObjectizer's dispatcher will be used as the default binder.
A binder can be specified for agent during agent creation in so_5::coop_t::make_agent_with_binder()
method:
so_5::environment_t & env = ...;
// Create a new coop with a binder to the default dispatcher.
env.introduce_coop([](so_5::coop_t & coop) {
// This agent will be bound to the default dispatcher.
// It is because the default binder will be used.
coop.make_agent<first_agent>(...);
// This agent will be bound to a private one_thread dispatcher.
coop.make_agent_with_binder<second_agent>(
so_5::disp::one_thread::make_dispatcher(env).binder(),
...);
});
There is an overloaded so_5::coop_t::add_agent()
method which allows to specify a binder for an agent:
so_5::environment_t & env = ...;
// Create a new coop with a binder to the default dispatcher.
env.introduce_coop([](so_5::coop_t & coop) {
// This agent will be bound to the default dispatcher.
// It is because the default binder will be used.
coop.make_agent<first_agent>(...);
// This agent will be bound to a private one_thread dispatcher.
auto second = std::make_unique<second_agent>(...);
coop.add_agent(
std::move(second),
so_5::disp::one_thread::make_dispatcher(env).binder());
});
Each agent in SObjectizer has a personal (direct) MPSC mbox created by the SObjectizer. It can be accessed via so_5::agent_t::so_direct_mbox()
method.
If it's requred to obtain a new MPSC mbox bound to the agent, the so_5::agent_t::so_make_new_direct_mbox()
method can be used:
class my_agent final : public so_5::agent_t {
...
void so_evt_start() override {
const auto new_mbox = so_make_new_direct_mbox();
assert(new_mbox->id() != so_direct_mbox()->id());
so_subscribe( new_mbox ).event( [](mhood_t<hello) {
std::cout << "hello from a new mbox" << std::endl;
}
// Give a new mbox to someone else to use.
so_5::send<get_my_new_mbox>(dest, new_mbox);
}
};
Anonymous MPMC mbox is created by so_5::environment_t::create_mbox()
method without parameters:
so_5::environment_t & env = ...;
auto mbox = env.create_mbox();
Named MPMC mbox is created by so_5::environment_t::create_mbox()
method with a name of mbox as a single parameter:
so_5::environment_t & env = ...;
auto named_mbox = env.create_mbox("hello_box");
NOTE. If there is a mbox with the same name then create_mbox
returns the reference to the existing mbox:
so_5::environment_t & env = ...;
auto named_mbox1 = env.create_mbox("hello_box");
auto named_mbox2 = env.create_mbox("hello_box");
assert(named_mbox1->id() == named_mbox2->id());
Since v.5.8.0 there is so_5::environment_t::introduce_named_mbox()
that can be used to create a named user-supplied mbox:
class first_participant final : public so_5::agent_t {
const so_5::mbox_t broadcast_mbox_;
...
public:
first_participant(context_t ctx)
: so_5::agent_t{std::move(ctx)}
, broadcast_mbox_{so_environment().introduce_named_mbox(
so_5::mbox_namespace_name_t{"demo"},
"message-board",
[this]() { return so_5::make_unique_subscribers_mbox(so_environment()); } )
}
{}
...
};
class second_participant final : public so_5::agent_t {
const so_5::mbox_t broadcast_mbox_;
...
public:
second_participant(context_t ctx)
: so_5::agent_t{std::move(ctx)}
, broadcast_mbox_{so_environment().introduce_named_mbox(
so_5::mbox_namespace_name_t{"demo"},
"message-board",
[this]() { return so_5::make_unique_subscribers_mbox(so_environment()); } )
}
{}
...
};
There are several ways for make subscription to a message from the direct mbox:
class demo : public so_5::agent_t {
void handler1(mhood_t<first_msg> cmd) {...}
void handler2(mhood_t<second_msg> cmd) {...}
...
public:
...
void so_define_agent() override {
// Subscription for the default state.
so_subscribe_self()
.event(&demo::handler1)
.event(&demo::handler2)
.event([](mhood_t<third_msg> cmd) {...});
// Subscription for states st_one and st_two.
so_subscribe_self()
.in(st_one)
.in(st_two)
.event(&demo::handler1)
.event(&demo::handler2)
.event([](mhood_t<third_msg> cmd) {...});
// Subscription for state st_third.
st_third
.event(&demo::handler1)
.event(&demo::handler2)
.event([](mhood_t<third_msg> cmd) {...});
}
...
};
There are several ways for make subscription to a message from an external MPMC mbox:
class demo : public so_5::agent_t {
const so_5::mbox_t external_mbox_;
...
void handler1(mhood_t<first_msg> cmd) {...}
void handler2(mhood_t<second_msg> cmd) {...}
...
public:
...
void so_define_agent() override {
// Subscription for the default state.
so_subscribe(external_mbox_)
.event(&demo::handler1)
.event(&demo::handler2)
.event([](mhood_t<third_msg> cmd) {...});
// Subscription for states st_one and st_two.
so_subscribe(external_mbox_)
.in(st_one)
.in(st_two)
.event(&demo::handler1)
.event(&demo::handler2)
.event([](mhood_t<third_msg> cmd) {...});
// Subscription for state st_third.
st_third
.event(external_mbox_, &demo::handler1)
.event(external_mbox_, &demo::handler2)
.event(external_mbox_, [](mhood_t<third_msg> cmd) {...});
}
...
};
There is several ways to check presense of a subscription:
class demo : public so_5::agent_t {
const so_5::mbox_t external_mbox_;
...
void handler1(mhood_t<first_msg> cmd) {...}
void handler2(mhood_t<second_msg> cmd) {...}
...
void some_method_with_subscription_checking() {
// Is subscription to message of first_msg type from
// direct mbox prensent? Check is performed for the default state.
if(so_has_subscription(so_direct_mbox(), &demo::handler1)) {...}
// Is subscription to message of second_msg type from external_mbox_
// present? Check is performed for the default state.
if(so_has_subscription(external_mbox_, &demo::handler2)) {...}
// Is subscription to message of third_msg type from external_mbox_
// present? Check is performed for the default state.
if(so_has_subscription<third_msg>(external_mbox_)) {...}
// Is subscription to message of first_msg type from
// direct mbox prensent? Check is performed for state st_one.
if(so_has_subscription(so_direct_mbox(), st_one, &demo::handler1)) {...}
// Is subscription to message of second_msg type from external_mbox_
// present? Check is performed for state st_two.
if(so_has_subscription(external_mbox_, st_two, &demo::handler2)) {...}
// Is subscription to message of third_msg type from external_mbox_
// present? Check is performed for state st_three.
if(so_has_subscription<third_msg>(external_mbox_, st_three)) {...}
}
};
There are several ways of checking the presence of a subscription via so_5::state_t::has_subscription()
method:
class demo : public so_5::agent_t {
state_t st_one{...};
...
void handler1(mhood_t<first_msg> cmd) {...}
...
void some_method_with_subscription_checking() {
// Check the presence of subscription to message of type third_msg.
if(st_one.has_subscription<third_msg>(so_direct_mbox())) {...}
// Check the presence of subscription to message of type first_msg.
if(st_one.has_subscription(so_direct_mbox(), &demo::handler1)) {...}
...
}
};
There are several ways to destroy a subscription:
class demo : public so_5::agent_t {
state_t st_one{...};
...
void handler1(mhood_t<first_msg> cmd) {...}
void handler2(mhood_t<second_msg> cmd) {...}
...
void method_which_drops_subscriptions() {
// Drop the subscription to third_msg in the default state.
so_drop_subscription<third_msg>(so_direct_mbox());
// Drop the subscription to third_msg in state st_one.
so_drop_subscription<third_msg>(so_direct_mbox(), st_one);
// Or...
st_one.drop_subscription<third_msg>(so_direct_mbox());
// Drop the subscription to first_msg in the default state.
so_drop_subscription(so_direct_mbox(), &demo::handler1);
// Drop the subscription to first_msg in state st_one.
so_drop_subscription(so_direct_mbox(), st_one, &demo::handler1);
// Or...
st_one.drop_subscription(so_direct_mbox(), &demo::handler1);
// Drop the subscription to second_msg in all states.
so_drop_subscription_for_all_states<second_msg>(so_direct_mbox());
// Or...
so_drop_subscription_for_all_states(so_direct_mbox(), &demo::handler2);
}
};
A delivery filter for an immutable message is set by so_5::agent_t::so_set_delivery_filter()
method:
class demo : public so_5::agent_t {
const so_5::mbox_t data_mbox_;
...
public:
...
void so_define_agent() override {
// Set a delivery filter to messages of type some_msg from data_mbox_.
// Type of message is deduced automatically from lambda's argument type.
so_set_delivery_filter(data_mbox_, [](const some_msg & msg) {...});
...
}
};
A delivery filter for a mutable message is set by so_5::agent_t::so_set_delivery_filter_for_mutable_msg
method:
class demo : public so_5::agent_t {
const so_5::mbox_t data_mbox_;
...
public:
...
void so_define_agent() override {
// Set a delivery filter to mutable messages of type some_msg from data_mbox_.
// Type of message is deduced automatically from lambda's argument type.
// Please note that a message is mutable, but the delivery filter gets it
// by a const reference (delivery filter can't change the content of
// the message).
so_set_delivery_filter_for_mutable_msg(data_mbox_, [](const some_msg & msg) {...});
...
}
};
For more information about delivery filters see InDepth - Message Delivery Filters.
A delivery filter is removed by so_5::agent_t::so_drop_delivery_filter()
method:
class demo : public so_5::agent_t {
const so_5::mbox_t data_mbox_;
...
void some_method_where_delivery_filter_must_be_dropped() {
// Remove delivery filter to messages of type some_msg from data_mbox_.
so_drop_delivery_filter<some_msg>(data_mbox_);
...
}
};
For more information about delivery filters see InDepth - Message Delivery Filters.
Message limits are set during agent's construction:
class demo : public so_5::agent_t {
public:
demo(context_t ctx, ... /* other arguments */)
: so_5::agent_t(ctx
// No more then 20 messages then drop all newest messages.
+ limit_then_drop<request>(20u)
// No more then 1 message then abort the whole application.
+ limit_then_abort<reconfigure>(1u)
// No more then 100 messages then abort the whole application.
// But before abortiong this lambda must be invoked.
+ limit_then_abort<outgoing_package>(100u,
[](const so_5::agent_t & agent, const outgoing_package & msg) {
... /* Some debug logging here. */
})
// No more then 50 messages then all extra messages must be
// redirected to the different mbox.
+ limit_then_redirect<transform_data>(50u,
[this]{ return reserve_destination_; })
// No more then 5 messages then all extra messages must be transformed
// and resend to the different mbox.
// Type of message is deduced automatically.
+ limit_then_transform(5u,
[](const reserve_space & cmd) {
return make_transformed<failed_allocation>(
failure_registrator_,
cmd.request_id);
}) )
// Set a limit for all other messages.
+ limit_then_drop<any_unspecified_message>(10u)
...
{}
...
};
For more information about message limits see InDepth - Message Limits.
The priority for an agent is set during agent's construction:
class demo : public so_5::agent_t {
public:
demo(context_t ctx, ... /* other arguments */)
: so_5::agent_t(ctx
// This agent will have priority-3.
+ so_5::priority_t::p3)
...
{}
...
};
For more information about agent's priorities see InDepth - Priorities of Agents.
Message delivery tracing can be turned on in Environment's parameters before start of SObjectizer's Environment:
int main()
{
so_5::launch([](so_5::environment_t & env) {...},
[](so_5::environment_params_t & params) {
// Turn message delivery tracing on.
// Traces will be printed via std::cout.
params.message_delivery_tracer(so_5::msg_tracing::std_cout_tracer());
} );
return 0;
}
NOTE. There are also so_5::msg_tracing::std_cerr_tracer()
and so_5::msg_tracing::std_clog_tracer()
tracer factories.
By default so_5::select()
returns only when all mchains will be closed. But sometimes it is necessary to break message processing in so_5::select()
if any of mchain is closed. It can be done that way:
// Will be turned to 'true' on channel close.
bool channel_closed = false;
so_5::select(
so_5::from_all()
// Handle all incoming message.
.handle_all()
// Custom predicate for break select.
.stop_on([&channel_closed]{ return channel_closed; })
// A handler for closed mchain.
.on_close([&channel_closed](const so_5::mchain_t &) { channel_closed = true; }),
receive_case(...),
receive_case(...),
...);