SO 5.7 InDepth Custom Worker Threads - Stiffstream/sobjectizer GitHub Wiki
- What Is It?
- abstract_work_thread_t Interface
- abstract_work_thread_factory_t Interface
- How To Specify Own Thread Factory To SObjectizer
Created by gh-md-toc
Until v.5.7.3 there wasn't a possibility to specify a custom thread to be used by SObjectizer dispatchers. But sometimes such a feature could be useful. For example, a specific thread stack size has to be set by pthread_attr_setstacksize
in POSIX Thread, or some signal handlers have to be set for a new worker thread on Unix. But SObjectizer created worked thread by itself by using std::thread
and didn't provide a way to tune new worker threads.
Since v.5.7.3 it is possible to instruct SObjectizer to use custom worker threads. This feature is based on two interfaces added in v.5.7.3: so_5::disp::abstract_work_thread_t
and so_5::disp::abstract_work_thread_factory_t
.
Since v.5.7.3 a user can define his/her own worker thread type by implementing so_5::disp::abstract_work_thread_t
interface. The user also has to define own thread factory by implementing so_5::disp::abstract_work_thread_factory_t
interface. An instance of such a factory has to be created and specified in the params to a SObjectizer dispatcher or to the whole SObjectizer Environment.
A user has to define his/her own class that inherits so_5::disp::abstract_work_thread_t
type and implements its pure virtual methods.
At the moment so_5::disp::abstract_work_thread_t
defines just two virtual methods that have to be implemented in a derived class:
virtual void start( body_func_t thread_body ) = 0;
virtual void join() = 0;
where body_func_t
is defined that way:
using body_func_t = std::function< void() >;
Method start
has to start the execution of thread_body
in the context of a separate thread. There are no limitations: it can be a newly created thread or preallocated one. There is no need to guarantee that thread_body
is already started its execution before the return from start
, but it is required to guarantee that all required resources are allocated and thread_body
has been passed to a separate execution context.
It should also be guaranteed that join
can safely be called just after the return from start
.
The join
method is an analog of std::thread::join
: once called it should return only when the separate execution context finished the execution of thread_body
passed to the previous call to start
.
SObjectizer guarantees that join
is called for a thread only once after the successful call to start
. If start
throws then join
isn't called.
Methods of abstract_work_thread_t
can be called from different threads. For example, start
method can be called from one thread, but release
will be called from another thread.
SObjectizer guarantees that there won't be parallel calls to start
and/or release
from different threads. There will be a single call to start
from some thread and, if that call success, there will be just a single call to release
method from some (probably another) thread.
At the moment SObjectizer can't guarantee that thread_func
passed to start
won't throw.
There is no requirement that a custom thread should intercept such an exception. If an exception from thread_body
is intercepted then the call to join
should be completed successfully.
Method start
can throw if it can't start execution of thread_body
in a separate worker context. That exception has to be inherited from std::exception
. That exception will be handled by SObjectizer. The join
method won't be called if start
throws.
Please note that join
isn't marked as noexcept
. It's because std::thread::join
is also not noexcept
. Thus, join
can throw. But the current version of SObjectizer can't cope with such an exception in most cases. It means that if join
throws then the application will probably be terminated because of an exception from a noexcept
method (like destructors of SObjectizer's dispatchers and so on).
A very simple implementation of a custom thread can look like this:
class simple_custom_thread final : public so_5::disp::abstract_work_thread_t
{
// Actual thread.
std::thread thread_;
public:
simple_custom_thread() = default;
void start(body_func_t thread_body) override
{
// Start a new thread and run thread_body on it.
thread_ = std::thread{ [tb = std::move(thread_body)] () {
... // Do some tuning of the new thread.
// Run thread_body without intercepting exceptions.
// It mean a crash of the whole application if thread_body throws,
// but we don't care in such a simple implementation.
tb();
}
};
}
void join() override
{
thread_.join();
}
};
A user has to define his/her own class that inherits so_5::disp::abstract_work_thread_factory_t
type and implements its pure virtual methods.
At the moment so_5::disp::abstract_work_thread_factory_t
defines just two virtual methods that have to be implemented in a derived class:
virtual so_5::disp::abstract_work_thread_t &
acquire( so_5::environment_t & env ) = 0;
virtual void release( so_5::disp::abstract_work_thread_t & thread ) noexcept = 0;
The acquire
method should return a valid reference (not a pointer!) to an instance that implements the abstract_work_thread_t
interface. It can be a reference to a dynamically allocated object, or to member of some aggregate, or something else. SObjectizer doesn't care about the nature of that instance.
SObjectizer will call start
method for a returned thread instance.
The acquire
method has to throw an exception derived from std::exception
if a new thread can't be acquired.
The release
method receives a reference acquired by a previous call to acquire
method. The release
method has to take all necessary actions for utilizing the returned thread object. For example, if acquire
allocates a new thread object on every call then release
has to deallocate the object passed to release
.
The release
method must not throw because SObjectizer has no way to restore if release
throws. Because of that release
is marked as noexcept
.
If acquire
successes then SObjectizer guarantees that obtained thread object will be returned to the factory by calling release
at the moment when SObjectizer no more needs that thread. The return occurs even if the thread object throws in its start
method.
Methods of abstract_work_thread_factory_t
can and in most cases will be called from different threads. For example, acquire
method can be called from one thread, but release
will be called from another thread.
Several calls to acquire
and release
methods can be performed at the same time from different threads.
A very simple example of a custom thread factory can look like this:
class simple_custom_factory final : public so_5::disp::abstract_work_thread_factory_t
{
public:
simple_custom_factory() = default;
so_5::disp::abstract_work_thread_t & acquire(so_5::environment_t & /*env*/) override
{
// Just allocate a new object on every call.
return *(new simple_custom_thread{});
}
void release(so_5::disp::abstract_work_thread_t & thread) noexcept override
{
// It's a reference to an object created in a previous call to acquire.
// So just deallocate it.
delete &thread;
}
};
To specify an own thread factory to SObjectizer it's required to create an instance of that thread factory and then pass this instance to:
- the params of a SObjectizer's dispatcher;
- the params of the whole SObjectizer Environment.
For example:
class my_thread final : public so_5::disp::abstract_work_thread_t {...};
class my_thread_factory final : public so_5::disp::abstract_work_thread_factory_t {...};
...
int main()
{
// Create an instance of the custom factory.
auto factory = std::make_shared<my_thread_factory>(...);
// Launch SObjectizer.
so_5::launch([factory](so_5::environment_t & env) {
// Create some agents as one coop.
env.introduce_coop([factory](so_5::coop_t & coop) {
// This agent will work on the default dispatcher.
// The default dispatcher will use a standard SObjectizer's thread.
coop.make_agent<some_agent>(...);
// Create an active_obj dispatcher that will use the custom thread factory.
auto ao_disp = so_5::disp::active_obj::make_dispatcher(
coop.environment(),
"disp_with_my_threads",
so_5::disp::active_obj::disp_params_t{}.work_thread_factory(factory));
// The following agents with be bound to a dispatcher with custom threads.
coop.make_agent_with_binder<another_agent>(ao_disp.binder(), ...);
coop.make_agent_with_binder<yet_another_agent>(ao_disp.binder(), ...);
...
});
});
}
In the example above only one dispatcher will use a custom thread factory because that factory is specified in the parameters for that dispatcher. All other dispatchers (including the default one) will use the standard factory provided by SObjectizer.
It is possible to set a custom factory for the whole SObjectizer Environment -- via the environment's parameters:
int main()
{
// Create an instance of the custom factory.
auto factory = std::make_shared<my_thread_factory>(...);
// Launch SObjectizer.
so_5::launch([](so_5::environment_t & env) {
// Create some agents as one coop.
env.introduce_coop([](so_5::coop_t & coop) {
// This agent will work on the default dispatcher.
// The default dispatcher will use a custom thread.
coop.make_agent<some_agent>(...);
// Create an active_obj dispatcher that will use the factory from SObjectizer.
auto ao_disp = so_5::disp::active_obj::make_dispatcher(
coop.environment());
// The following agents with be bound to a dispatcher with custom threads.
coop.make_agent_with_binder<another_agent>(ao_disp.binder(), ...);
coop.make_agent_with_binder<yet_another_agent>(ao_disp.binder(), ...);
...
});
},
[factory](so_5::environment_params_t & params) {
// Set a custom thread factory to the whole environment.
params.work_thread_factory(factory);
});
}
All standard SObjectizer's dispatchers have their own disp_params_t
types and since v.5.7.3 there are work_thread_factory
methods in all those types. The work_thread_factory
method allows specifying a thread factory to be used by an instance of the dispatcher. If that method isn't called by a user then the dispatcher gets thread factory from the SObjectizer Environment. It means that the factory specified directly to a dispatcher has precedence on the default factory from the environment.
class my_thread final : public so_5::disp::abstract_work_thread_t {...};
class my_thread_factory final : public so_5::disp::abstract_work_thread_factory_t {...};
class another_factory final : public so_5::disp::abstract_work_thread_factory_t {...};
...
int main()
{
// Create an instance of the custom factory.
auto factory = std::make_shared<my_thread_factory>(...);
// Launch SObjectizer.
so_5::launch([](so_5::environment_t & env) {
// Create some agents as one coop.
env.introduce_coop([](so_5::coop_t & coop) {
// This agent will work on the default dispatcher.
// The default dispatcher will use the default factory from Environment.
coop.make_agent<some_agent>(...);
// Create an active_obj dispatcher that will use the custom thread factory.
auto ao_disp = so_5::disp::active_obj::make_dispatcher(
coop.environment(),
"disp_with_my_threads",
so_5::disp::active_obj::disp_params_t{}
.work_thread_factory(std::make_shared<another_factory>(..)));
// The following agents with be bound to a dispatcher with custom threads.
coop.make_agent_with_binder<another_agent>(ao_disp.binder(), ...);
coop.make_agent_with_binder<yet_another_agent>(ao_disp.binder(), ...);
...
});
},
[factory](so_5::environment_params_t & params) {
// Set a custom thread factory to the whole environment.
params.work_thread_factory(factory);
});
}