SO 5.7 InDepth Coops - Stiffstream/sobjectizer GitHub Wiki
- Intro
- Parent-Child Relationship
- Resource Lifetime Management
- Reg/Dereg Notificators
- Registration And Deregistration Of A Cooperation Are Asynchronous And Potentially Long Lasting Procedures
Created by gh-md-toc
This part of InDepth series is dedicated to such an important feature as cooperations. In particular:
- parent-child relationship;
- resource lifetime management;
- reg/dereg notificators;
- some details of register_coop and deregister_coop operations.
If you don't understand what a cooperation is then please return and re-study the Basic part of SObjectizer's tutorials.
Cooperation (or coop in short) is a way for binding several tightly related agents into a whole entity.
Coop is registered in SObjectizer Environment in a transactional manner: all agents from the cooperation must be registered successfully or no one of them.
When a coop is being deregistered all its agents are deregistered and destroyed at the same time.
There could be agents which require creation of additional coop(s) for performing their work.
Let's imagine an agent which is responsible for receiving and processing some requests. Payment requests, for example.
Processing of each payment request requires several operations:
- checking payments parameters,
- checking the operation risks,
- checking the availability of funds
- and so on...
The request receiver could do those actions for every payment by itself. But this will lead to very complicated logic of the request receiver.
There is a much simpler approach: delegation of processing of one request to a separate processor agent.
In that case the request receiver will only receive new requests and create new coops with actual request processors for each new request. Receiver and processor agents will have more simple logic and it is good. But there will be a new question: Who and how will control the lifetime of all cooperations with request processors?
Very simple view of that problem:
Someone could call deregister_coop() for the request receiver’s coop. As a result, all coops with request processors must be deregistered too.
But how could it be done?
Such a feature as child coop is coming into play here.
SObjectizer-5 allows to mark new coop as a child of any existing coop. In this case SObjectizer guarantees that all child coops will be deregistered and destroyed before its parent coop.
It means that if we have, for example, a parent coop with a child coop and do call:
env.deregister_coop( parent_coop_id, so_5::dereg_reason::normal );
Then SObjectizer-5 will deregister and destroy the child coop and only then the parent coop will be deregistered and destroyed.
A child coop could have its own child coops too.
It means that a request_receiver
coop could have any number of child coops like request_processor_1
, request_processor_2
and so on.
All of them will be automatically destroyed when the top-level parent coop manager
is deregistered.
Parent-child relationship between coops allows to build coops hierarchies: a top-level coop creates child coops, those create its own child coops and so on.
If a top-level coop is being deregistered for some reason then all its child coops (and children of children and so on) will be deregistered too. A programmer could not take care of this.
There are several ways to make a child coop...
The first way is to specify the ID of the parent coop during the creation of a coop instance:
// Now we can create a child coop.
auto child = env.make_coop(parent_id); // We tell that this coop has the parent.
... // Fille the child.
env.register_coop(std::move(child));
The second way is to use so_5::create_child_coop
free functions. For example:
class owner : public so_5::agent_t
{
public :
...
void so_evt_start() override
{
auto child = so_5::create_child_coop( *this );
child->make_agent< worker >();
...
so_environment().register_coop( std::move( child ) );
}
};
And the third way is to use so_5::introduce_child_coop
free functions. For example:
class owner : public so_5::agent_t
{
public :
...
void so_evt_start() override
{
so_5::introduce_child_coop( *this, []( so_5::coop_t & coop ) {
coop.make_agent< worker >();
} );
}
};
Sometimes it is necessary to manage the lifetime of some resources. For example all agents from your coop should have a reference to the same DB connection object.
If this connection object is allocated dynamically you can pass a shared_ptr to every agent's constructor. Something like:
class first_agent : public so_5::agent_t { ...
public :
first_agent( context_t ctx, std::shared_ptr< db_connection > conn, ... );
...
private :
std::shared_ptr< db_connection > connection_;
};
class second_agent : public so_5::agent_t { ...
public :
second_agent( context_t ctx, std::shared_ptr< db_connection > conn, ... );
...
private :
std::shared_ptr< db_connection > connection_;
};
env.introduce_coop( []( so_5::coop_t & coop ) {
std::shared_ptr< db_connection > connection = connect_to_db(...);
coop.make_agent< first_agent >( connection, ... );
coop.make_agent< second_agent >( connection, ... );
...
} );
In such a case you make tight coupling between application domain logic of your agents and DB connection lifetime management.
What if this lifetime management needs to be changed in the future? What if the connection object is controlled by someone else and you have a simple reference to it (not shared_ptr)?
You will need to do some rewriting...
class first_agent : public so_5::agent_t { ...
public :
first_agent( context_t ctx, db_connection & conn, ... ); // The constructor has to be changed.
...
private :
db_connection & connection_; // Declaration of the member has to be changed.
};
class second_agent : public so_5::agent_t { ...
public :
second_agent( context_t ctx, db_connection & conn, ... ); // The constructor has to be changed.
...
private :
db_connection & connection_; // Declaration of the member has to be changed.
};
...
db_connection & conn = receive_connection_from_somewhere();
env.introduce_coop( [&conn]( so_5::coop_t & coop ) {
coop.make_agent< first_agent >( conn, ... );
coop.make_agent< second_agent >( conn, ... );
...
} );
SObjectizer-5 has a tool for decoupling resource lifetime management from agent's domain-specific logic. This is coop_t::take_under_control()
.
Method coop_t::take_under_control()
allows passing a dynamically allocated object under the control of the cooperation. The object placed under control will be deallocated only after destroying all agents of the coop.
The behavior of take_under_control() allows us to use the reference to a controlled object even from the agent's destructor...
class request_processor : public so_5::agent_t
{
public :
request_processor( context_t ctx, db_connection & connection );
~request_processor() {
if( !commited() )
// Assume that this reference is still valid.
connection_.commit();
}
...
private :
db_connection & connection_;
};
...
env.introduce_coop( []( so_5::coop_t & coop )
{
// Place DB connection under the control of the cooperation.
auto & connection = *coop.take_under_control(create_connection(...));
// Reference to DB connection will be valid even in requests_processor's destructor.
coop->make_agent< request_processor >( connection );
...
} );
Parent-child relationship could also be used for resource lifetime management...
class request_receiver : public so_5::agent_t
{
std::unique_ptr< db_connection > connection_;
...
public :
void so_evt_start() override {
connection_ = make_db_connection(...);
...
}
...
private :
void evt_new_request( const request & evt ) {
// New request handler must be created.
so_5::introduce_child_coop( *this, [=]( so_5::coop_t & coop ) {
// Reference to DB connection will be valid even in requests_processor's destructor.
// It is because agents from child cooperation will be destroyed before any of
// agents from the parent cooperation.
coop->make_agent< request_processor >( connection );
...
} );
}
};
It is not easy to detect precise moments when a coop is completely registered or completely deregistered.
The biggest problem is a detection of complete coop deregistration.
It is because a call to environment_t::deregister_coop()
just initiates the coop deregistration process. But the entire process of deregistration could take a long time.
To simplify that there are such things as registration and deregistration notificators.
A notificator could be bound to a coop and it will be called when coop registration/deregistration process is finished.
The simplest reg/dereg notificators:
env.introduce_coop( []( so_5::coop_t & coop ) {
coop.add_reg_notificator(
[]( so_5::environment_t &, const so_5::coop_handle_t & handle ) {
std::cout << "registered: " << handle << std::endl;
} );
coop.add_dereg_notificator(
[]( so_5::environment_t &,
const so_5::coop_handle_t & handle,
const so_5::coop_dereg_reason_t & ) {
std::cout << "deregistered: " << handle << std::endl;
} );
...
} );
The ID of coop will be printed to stdout on coop registration and deregistration.
Usually reg/dereg notifications are used for sending messages to some mboxes.
Because this scenario is widely used there are two ready-to-use notificators in SObjectizer-5.
They are created by make_coop_reg_notificator()
and make_coop_dereg_notificator()
functions:
auto notify_mbox = env.create_mbox();
env.introduce_coop( [&]( so_5::coop_t & coop ) {
// An instance of so_5::msg_coop_registered will be sent to notify_mbox
// when the cooperation is registered.
coop.add_reg_notificator( so_5::make_coop_reg_notificator( notify_mbox ) );
// An instance of so_5::msg_coop_deregistered will be sent to
// notify_mbox when the cooperation is deregistered.
coop.add_dereg_notificator( so_5::make_coop_dereg_notificator( notify_mbox ) );
...
} );
Coop dereg notificators can be used for implementation of Erlang-like supervisors.
For example, a request receiver could receive notification about deregistration of child coops and restart them if they fail:
class request_receiver : public so_5::agent_t
{
public :
virtual void so_define_agent() override {
// msg_coop_deregistered must be handled.
so_subscribe_self().event( &request_receiver::evt_child_finished );
...
}
...
private :
void evt_new_request( const request & req ) {
// Create a child coop instance.
auto child_coop = so_5::create_child_coop(*this);
... // Filling the coop with agents.
// Dereg notificator is necessary to receive info about child disappearance.
// Standard notificator will be used.
coop->add_dereg_notificator(
// We want a message to request_receiver direct_mbox.
so_5::make_coop_dereg_notificator( so_direct_mbox() ) );
// Register coop and store its handle.
auto child_handle = so_environment().register_coop( std::move(child_coop) );
store_request_info( req, child_handle );
}
void evt_child_finished( const so_5::msg_coop_deregistered & evt ) {
// If child cooperation failed its dereg reason will differ
// from the normal value.
if( so_5::dereg_reason::normal != evt.m_reason.reason() )
recreate_child_coop( evt.m_coop );
else
remove_request_info( evt.m_coop );
}
...
};
Registration And Deregistration Of A Cooperation Are Asynchronous And Potentially Long Lasting Procedures
It's important to note that register_coop
and deregister_coop
are asynchronous operations that can take a long time.
During the registration of a new coop several actions have to be performed by SObjectizer Environment:
- Some resources should be preallocated for agents from the new coop.
- The
so_define_agent
method has to be called for all agents from the new coop. - Agents from the new coop have to be bound to the corresponding dispatchers.
- Special internal message has to be sent to every agent from the new coop.
- If there are coop_reg_notificators they have to be called.
- Agents from the new coop should receive this internal message and call the
so_evt_start
method.
Steps from 1 to 5 are performed inside register_coop
and the completion of step 4 means that coop is registered in SObjectizer. But the return from register_coop
doesn't mean that agents already started their work.
Deregistration of a coop is a less deterministic operation. When deregister_coop
is called then SObjectizer only do the following things:
- Mark the cooperation as being deregistering.
- Send a special internal message to every agent from the coop and mark agents a special way (prohibit delivering of new messages to agents from deregistering coop).
And that's all.
Agents from the deregistering coop will continue their work until they have previously received messages.
Only when all agents from the coop receive the special internal message and call the so_evt_finish
method the coop will be finally destroyed. The destruction will be performed in several steps:
- Unbinding agents from their dispatchers.
- Destruction of agents (e.g.
delete
is called for every agent object). - If there are coop_dereg_notificators they are called.
- The coop object is destroyed.
All of that mean that the actual destruction of the coop can happen any time from the call to deregister_coop
: sometimes coop can be physically destroyed before the return from deregister_coop
, sometimes it will be destroyed long time after the return from deregister_coop
.
To make things more complex it's necessary to mention the presence of child coops. For example, if deregister_coop
is called for a parent coop then SObjectizer first deregisters and destroys its children and only then the parent coop will be destroyed.
If you need to know when a coop is destroyed you can use coop_dereg_notificatiors.
The registration of a coop is an asynchronous operation that can take some time. The return from register_coop
tells a user that the new coop is fully registered inside the SObjectizer Environment and agents from the registered coop are about to start. But due to the asynchronous and multithreading nature of SObjectizer a user can't know if agents have already started or not yet. Calls of so_evt_start
can be completed before the return from register_coop
or some time after.
Sometimes the execution of so_evt_start
may start some time after the return from register_coop
. This is because SObjectizer sends a special initial message for each new agent, and this message is queued in the agent's queue, and we don't know when it will be dequeued and processed ("processed" means that so_evt_start
is called for the agent). For example, if we bind our agent to a dispatcher that is currently busy (has many unprocessed messages in the queue), then the initial message for our agent will wait while all previous messages stored in the queue are dequeued and processed.
The only agent's method that is guaranteed to be called and completed inside the register_coop
call is the so_define_agent
method.
Sometimes it creates a temptation to use the so_define_agent
method for starting agent's work. This temptation can be especially strong in cases where new agents have to perform some important actions and we can't continue our work without the results of those actions. Let's imagine something like that:
void parent_agent::evt_init_device(mhood_t<msg_dev_params> cmd)
{
// We have to create a device manager that initializes the device.
so_environment().introduce_coop([&](so_5::coop_t & coop) {
auto disp = ... // Creating a dispatcher for a new agent.
coop.make_agent_with_binder<device_manager>(
disp.binder(),
... /* agent's parameters */ );
});
// Trying to read data from the device.
auto data = read_device();
}
If we think that a new device_manager
has completed all required actions in its so_evt_start
before we call read_device
, we're wrong. The so_evt_start
for the new agent could not be started yet!
In that case one may think: "Let's start the work in so_define_agent()".
And this is a dangerous path. Don't go that way.
First of all, so_define_agent
is called in a different working context. It's called on the thread where the register_coop
is called. But this may not be the worker thread of the new agent. So so_evt_start
, so_evt_finish
, and all other event handlers will be called on a different worker thread. This can be important if you're working with thread-local values (directly or indirectly).
But the main reason is that an agent doesn't have its event queue when so_define_agent
is working. The binding of an agent to an event queue is performed after all so_define_agent
methods are completed.
It may look a bit odd at first: we make subscriptions in so_define_agent
but an agent has no event queue at that moment, and any messages delivered because of those subscriptions will be ignored. But that's the way SObjectizer works and there are some reasons for this behavior. Exception safety, for example: if there are multiple agents in an our coop and one of them throws in so_define_agent
, then we can rollback the whole registration procedure.
Anyway, an agent doesn't have an event queue while the so_define_agent
works.
Let's imagine that we have two agents in our coop that do something like that:
class first_agent final : public so_5::agent_t
{
so_5::mbox_t m_target;
public:
...
void set_target(const so_5::mbox_t & v) { m_target = v; }
void so_define_agent() override {
so_subscribe_self().event(...);
// Sending a message to another agent.
so_5::send<msg_do_something>(m_target, ...);
}
};
class second_agent final : public so_5::agent_t
{
so_5::mbox_t m_target;
public:
...
void set_target(const so_5::mbox_t & v) { m_target = v; }
void so_define_agent() override {
so_subscribe_self().event(...);
// Sending a message to another agent.
so_5::send<msg_do_something_else>(m_target, ...);
}
};
And we register those agents in the same coop:
env.introduce_coop([](so_5::coop_t & coop) {
auto * first = coop.make_agent<first_agent>(...);
auto * second = coop.make_agent<second_agent>(...);
first->set_target(second->so_direct_mbox());
second->set_target(first->so_direct_mbox());
});
In that case messages msg_do_something
and msg_do_something_else
will be lost because the destinations won't have event queues to store the messages sent.
That all means that so_define_agent
has to be used only for subscribing an agent to messages and to changing the agent's initial state. All actual agent's work has to be postponed to the so_evt_start
event.
The so_evt_start
method is the first agent's method that is called on the agent's working context. For example, if an agent is bound to the active_obj dispatcher then so_evt_start
will be called on a new worker thread created for that agent by the active_obj dispatcher.
The so_evt_start
method is called as a result of handling the initial internal message sent to the agent during registration procedure (at step 4 of the description above). This means that register_coop
first calls the so_define_agent
method for all agents from the coop (and agents have no event queue at this point), then register_coop
binds all agents to their event queues and sends initial internal messages to those queues.
But message sending is an asynchronous action and that means that the first agent can receive the initial internal message before the register_coop
sends the same message to the next agent from the same coop. It may be dangerous because some agents from one coop may complete so_evt_start
before other agents even receive the initial internal message.
To protect a user from such a situation SObjectizer handles the initial internal messages in a special way: so_evt_start
isn't called until the initial messages have been sent to all of coop's agents.
Let's imagine that we have three agents in the coop and all agents are bound to different contexts. The register_agent
sends the initial message to the first agent and this message is processed without any delay (e.g. it will be dequeued immediately after enqueuing). SObjectizer will try to call so_evt_start
for the first agent, but it'll see that not all initial messages have been sent yet. So SObjectizer will suspend processing the initial message for the first agent until all initial messages have been sent. Only then will SObjectizer resume message processing for the first agent and call so_evt_start
.
Note that this waiting within the initial message processing will suspend the agent's worker thread for some time (and that thread can't process other messages until so_evt_start
will be called and finished).