SO 5.8 ByExample Queue Size Stats - Stiffstream/sobjectizer GitHub Wiki
Note. This example is not simple one. It shows more complex usage of SObjectizer's features. Because of that it is recommended to read SO-5.8 Basics, SO-5.8 InDepth Run-Time Monitoring, SO-5.8 InDepth Message Limits, and SO-5.8 InDepth Dispatchers before.
Version 5.5.4 of SObjectizer added a simple way to receive run-time statistic from SObjectizer's internals. A user can turn on run-time monitoring for SObjectizer Environment and receive messages with various data like quantity of registered agents, quantity of delayed and periodic messages, queues sizes and so on.
The main idea for addition of such way for working with run-time statistics in v.5.5.4 was providing ability to obtain and store run-time monitoring information with tools like statd+graphite/Zabbix/Nagios and so on. E.g. a user has ability to write an agent which receives run-time monitoring information from SObjectizer and redirects it to the appropriate monitoring system.
This example shows the simplest way for writing an agent for receiving run-time monitoring information from SObjectizer Environment. The received information is filtered and printed to the standard output.
There is one cooperation in the example. It contains several different agents.
There is a logger agent. It works on the default dispatcher and is used for printing log messages to the standard output.
There is a listener for run-time monitoring information. It is implemented by the type a_stats_listener_t
. It receives messages with run-time statistics from SObjectizer Environment and filters it by the data-source suffix. Only messages with information about event queue sizes are processed.
Information from every message with an event queue size converted to log message and this log message is sent to the logger agent.
There is a work generator agent. It is implemented by the type a_generator_t
. It generates a bunch of messages every 600 milliseconds. Messages are sent to workers actors consequently. The generator agent works on active_obj dispatcher.
There are five worker agents. They are implemented by the type a_worker_t
. They work on the thread_pool dispatcher with three working threads. When a worker receives msg_start_thinking
message, it sleeps for 10 milliseconds and blocks the working thread for this time (this is just an imitation of some time-consuming work). Because the number of workers is greater than the number of working threads some workers will be waiting for the working context. The event queues for them will grow (the maximum size of the event queue is limited by defining message limits).
This example shows the current state of event queue sizes in almost real-time. It can looks like that:
+4.293ms] -- --- DISTRIBUTION STARTED ---
[+4.293ms] -- stats: 'disp/ot/DEFAULT/wt-0/demands.count': 0
[+4.293ms] -- stats: 'disp/ot/stats_listener/wt-0/demands.count': 2
[+4.293ms] -- stats: 'disp/tp/workers/aq/0x558a2ceed490/demands.count': 20
[+4.293ms] -- stats: 'disp/tp/workers/aq/0x558a2ceedab0/demands.count': 24
[+4.293ms] -- stats: 'disp/tp/workers/aq/0x558a2ceee080/demands.count': 27
[+4.293ms] -- stats: 'disp/tp/workers/aq/0x558a2ceee680/demands.count': 23
[+4.293ms] -- stats: 'disp/tp/workers/aq/0x558a2ceeec30/demands.count': 23
[+4.293ms] -- stats: 'disp/ao/generator/wt-0x558a2ceef2f0/demands.count': 0
[+4.293ms] -- --- DISTRIBUTION FINISHED ---
[+4.623ms] -- --- DISTRIBUTION STARTED ---
[+4.623ms] -- stats: 'disp/ot/DEFAULT/wt-0/demands.count': 0
[+4.623ms] -- stats: 'disp/ot/stats_listener/wt-0/demands.count': 2
[+4.623ms] -- stats: 'disp/tp/workers/aq/0x558a2ceed490/demands.count': 0
[+4.623ms] -- stats: 'disp/tp/workers/aq/0x558a2ceedab0/demands.count': 3
[+4.623ms] -- stats: 'disp/tp/workers/aq/0x558a2ceee080/demands.count': 11
[+4.623ms] -- stats: 'disp/tp/workers/aq/0x558a2ceee680/demands.count': 2
[+4.623ms] -- stats: 'disp/tp/workers/aq/0x558a2ceeec30/demands.count': 2
[+4.623ms] -- stats: 'disp/ao/generator/wt-0x558a2ceef2f0/demands.count': 0
[+4.623ms] -- --- DISTRIBUTION FINISHED ---
[+4.803ms] -- 120 requests are sent
[+4.953ms] -- --- DISTRIBUTION STARTED ---
[+4.953ms] -- stats: 'disp/ot/DEFAULT/wt-0/demands.count': 0
[+4.953ms] -- stats: 'disp/ot/stats_listener/wt-0/demands.count': 2
[+4.953ms] -- stats: 'disp/tp/workers/aq/0x558a2ceed490/demands.count': 12
[+4.953ms] -- stats: 'disp/tp/workers/aq/0x558a2ceedab0/demands.count': 16
[+4.953ms] -- stats: 'disp/tp/workers/aq/0x558a2ceee080/demands.count': 14
[+4.953ms] -- stats: 'disp/tp/workers/aq/0x558a2ceee680/demands.count': 18
[+4.953ms] -- stats: 'disp/tp/workers/aq/0x558a2ceeec30/demands.count': 18
[+4.953ms] -- stats: 'disp/ao/generator/wt-0x558a2ceef2f0/demands.count': 0
[+4.953ms] -- --- DISTRIBUTION FINISHED ---
#include <chrono>
#include <cstdlib>
#include <ctime>
#include <deque>
#include <iostream>
#include <sstream>
#include <string>
#include <random>
#include <so_5/all.hpp>
// A signal to worker agent to do something.
struct msg_start_thinking : public so_5::signal_t {};
// Message for logger.
struct log_message
{
// Text to be logged.
std::string m_what;
};
// Logger agent.
class a_logger_t final : public so_5::agent_t
{
public :
a_logger_t( context_t ctx )
: so_5::agent_t( ctx
// Limit the count of messages.
// Because we can't lost log messages the overlimit
// must lead to application crash.
+ limit_then_abort< log_message >( 100 ) )
, m_started_at( std::chrono::steady_clock::now() )
{}
void so_define_agent() override
{
so_default_state().event(
[this]( const log_message & evt ) {
std::cout << "[+" << time_delta()
<< "] -- " << evt.m_what << std::endl;
} );
}
private :
const std::chrono::steady_clock::time_point m_started_at;
std::string time_delta() const
{
auto now = std::chrono::steady_clock::now();
std::ostringstream ss;
ss << double(
std::chrono::duration_cast< std::chrono::milliseconds >(
now - m_started_at ).count()
) / 1000.0 << "ms";
return ss.str();
}
};
// Agent for receiving run-time monitoring information.
class a_stats_listener_t final : public so_5::agent_t
{
public :
a_stats_listener_t(
// Environment to work in.
context_t ctx,
// Address of logger.
so_5::mbox_t logger )
: so_5::agent_t( ctx )
, m_logger( std::move( logger ) )
{}
void so_define_agent() override
{
using namespace so_5::stats;
auto & controller = so_environment().stats_controller();
// Set up a filter for messages with run-time monitoring information.
so_set_delivery_filter(
// Message box to which delivery filter must be set.
controller.mbox(),
// Delivery predicate.
[]( const messages::quantity< std::size_t > & msg ) {
// Process only messages related to dispatcher's queue sizes.
return suffixes::work_thread_queue_size() == msg.m_suffix;
} );
// We must receive messages from run-time monitor.
so_default_state()
.event(
// This is mbox to that run-time statistic will be sent.
controller.mbox(),
&a_stats_listener_t::evt_quantity )
.event( controller.mbox(),
[this]( const messages::distribution_started & ) {
so_5::send< log_message >( m_logger, "--- DISTRIBUTION STARTED ---" );
} )
.event( controller.mbox(),
[this]( const messages::distribution_finished & ) {
so_5::send< log_message >( m_logger, "--- DISTRIBUTION FINISHED ---" );
} );
}
void so_evt_start() override
{
// Change the speed of run-time monitor updates.
so_environment().stats_controller().set_distribution_period(
std::chrono::milliseconds( 330 ) );
// Turn the run-timer monitoring on.
so_environment().stats_controller().turn_on();
}
private :
const so_5::mbox_t m_logger;
void evt_quantity(
const so_5::stats::messages::quantity< std::size_t > & evt )
{
std::ostringstream ss;
ss << "stats: '" << evt.m_prefix << evt.m_suffix << "': " << evt.m_value;
so_5::send< log_message >( m_logger, ss.str() );
}
};
// Load generation agent.
class a_generator_t final : public so_5::agent_t
{
public :
a_generator_t(
// Environment to work in.
context_t ctx,
// Address of logger.
so_5::mbox_t logger,
// Addresses of worker agents.
std::vector< so_5::mbox_t > workers )
: so_5::agent_t( ctx )
, m_logger( std::move( logger ) )
, m_workers( std::move( workers ) )
, m_turn_pause( 600 )
{}
void so_define_agent() override
{
so_default_state()
.event( &a_generator_t::evt_next_turn );
}
void so_evt_start() override
{
// Start work cycle.
so_5::send< msg_next_turn >( *this );
}
private :
// Signal about start of the next turn.
struct msg_next_turn : public so_5::signal_t {};
// Logger.
const so_5::mbox_t m_logger;
// Workers.
const std::vector< so_5::mbox_t > m_workers;
// Pause between working turns.
const std::chrono::milliseconds m_turn_pause;
void evt_next_turn(mhood_t< msg_next_turn >)
{
// Create and send new requests.
generate_new_requests( random( 100, 200 ) );
// Wait for next turn and process replies.
so_5::send_delayed< msg_next_turn >( *this, m_turn_pause );
}
void generate_new_requests( unsigned int requests )
{
const auto size = m_workers.size();
for( unsigned int i = 0; i != requests; ++i )
so_5::send< msg_start_thinking >( m_workers[ i % size ] );
so_5::send< log_message >( m_logger,
std::to_string( requests ) + " requests are sent" );
}
static unsigned int
random( unsigned int left, unsigned int right )
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::uniform_int_distribution< unsigned int >{left, right}(gen);
}
};
// Worker agent.
class a_worker_t final : public so_5::agent_t
{
public :
a_worker_t( context_t ctx )
: so_5::agent_t( ctx
// Limit the maximum count of messages.
+ limit_then_drop< msg_start_thinking >( 50 ) )
{}
void so_define_agent() override
{
so_default_state().event( [](mhood_t< msg_start_thinking >) {
std::this_thread::sleep_for(
std::chrono::milliseconds( 10 ) );
} );
}
};
void init( so_5::environment_t & env )
{
env.introduce_coop( [&env]( so_5::coop_t & coop ) {
// Logger will work on the default dispatcher.
auto logger = coop.make_agent< a_logger_t >();
// Run-time stats listener will work on a dedicated
// one-thread dispatcher.
coop.make_agent_with_binder< a_stats_listener_t >(
so_5::disp::one_thread::make_dispatcher(
env, "stats_listener" ).binder(),
logger->so_direct_mbox() );
// Bunch of workers.
// Must work on dedicated thread_pool dispatcher.
auto worker_disp = so_5::disp::thread_pool::make_dispatcher(
env,
"workers", // Name of dispatcher (for convience of monitoring).
3 ); // Count of working threads.
const auto worker_binding_params = so_5::disp::thread_pool::bind_params_t{}
.fifo( so_5::disp::thread_pool::fifo_t::individual );
std::vector< so_5::mbox_t > workers;
for( int i = 0; i != 5; ++i )
{
auto w = coop.make_agent_with_binder< a_worker_t >(
worker_disp.binder( worker_binding_params ) );
workers.push_back( w->so_direct_mbox() );
}
// Generators will work on dedicated active_obj dispatcher.
auto generator_disp = so_5::disp::active_obj::make_dispatcher(
env, "generator" );
coop.make_agent_with_binder< a_generator_t >(
generator_disp.binder(),
logger->so_direct_mbox(),
std::move( workers ) );
});
// Take some time to work.
std::this_thread::sleep_for( std::chrono::seconds(50) );
env.stop();
}
int main()
{
try
{
so_5::launch( &init );
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}