SO 5.8 ByExample News Board - Stiffstream/sobjectizer GitHub Wiki
Note. This example is one of the biggest and complex examples in SO-5.8 distribution. For understanding of it a good knowledge of SO-5.8 Basics, SO-5.8 InDepth - Priorities of Agents, SO-5.8 InDepth - Dispatchers is required.
Introduction
Support of agent's priorities has been added in v.5.5.8. Three new priority-respected dispatches were introduced. They implement various priority handling policies. This example demostrates usage of one of these dispatchers -- prio_one_thread::quoted_round_robin.
This example shows abitility of prio_one_thread::quoted_round_robin to group events with respect to its priorities and then do batch-processing of events with the same priority. This ability is used in imitation of some news publishing service which works with two types of client: news publishers and news readers.
Because this example is long its full source code is not represented here. The full source can be found in the repository.
What Example Does
This example is a very simple imitiation of a news publishing system which deals with two types of clients. There are news publishers and news readers. There are much more readers than publishers. Because of that there is a necessity of quoting processing of various types of requests.
This is done via usage of prio_one_thread::quoted_round_robin dispatcher. There are some requests processing agents and some clients. Clients send requests to news board and those requests are handled by different requests processors. Every requests processor has its own priority and there is a quote for every priority. It leads to processing requests in series: one serie of requests of one type, then one serie of requests of another type and so on.
Working Scheme
There is a single MPMC-mbox representing news board. There are three agents behind this mbox. Those agents handle requests from clients. They work under control of prio_one_thread::quoted_round_robin dispacher.
There are five news publisher agents. They work on common working thread under control of one_thread dispatcher. Every publisher works by simple scheme: it sleeps for some time, then generates new story and sends it to news board mbox, then waits for acknowledgements and goes to sleep again.
There are fifty news reader agents. They work on a common working thread under the control of one_thread dispatcher. Every reader works by the following scheme: it asks for a list of new stories from news board, when it receives this list, the reader requests a full story text for no more than three recent stories, when it receives these texts, it asks a list of new stories and so on. If reader receives an empty list (no new stories since the last update) it goes to sleep for some time.
There is also a helper logger agent. It works on the default dispatcher and is used for synchronized logging by other agents.
Request-Response Specific Details
This example uses a request-response scheme for interaction between the news board and clients. There is information which is distributed inside requests and responses that is used for demonstration purposes. It is a timestamp of an initial request. This timestamp is sent with an initial request and then this value is returned with the response. This allows to show how much time was taken by request processing.
This means that timestamp field must be present in every request-response pair. To reduce amount of code in definitions of request-response messages there is a base class with timestamp field. All request-response messages are derived from it. For example:
// Base class for all messages. It stores timestamp.
struct news_board_message_base : public so_5::message_t
{
// Time at which an operation was started.
// This time will be used for calculation of operation duration.
const clock_type::time_point m_timestamp;
news_board_message_base( clock_type::time_point timestamp )
: m_timestamp( std::move(timestamp) )
{}
};
...
struct msg_publish_story_resp final : public news_board_message_base
{
story_id_type m_id;
msg_publish_story_resp(
clock_type::time_point timestamp,
story_id_type id )
: news_board_message_base( std::move(timestamp) )
, m_id( id )
{}
};
There is yet another base type for requests: news_board_request_base. This base class is used for storing a reply mbox which is necessary for every request (reply mbox will be used for sending a reply to a client). Because of that every request type is derived from news_board_request_base:
// Base class for all request messages. It stores reply_to value.
struct news_board_request_base : public news_board_message_base
{
// Mbox of request initiator.
// Response will be sent to that mbox.
const so_5::mbox_t m_reply_to;
news_board_request_base(
clock_type::time_point timestamp,
so_5::mbox_t reply_to )
: news_board_message_base( std::move(timestamp) )
, m_reply_to( std::move(reply_to) )
{}
};
...
// Request for publishing new story.
struct msg_publish_story_req final : public news_board_request_base
{
const std::string m_title;
const std::string m_content;
msg_publish_story_req(
clock_type::time_point timestamp,
so_5::mbox_t reply_to,
std::string title,
std::string content )
: news_board_request_base(
std::move(timestamp), std::move(reply_to) )
, m_title( std::move(title) )
, m_content( std::move(content) )
{}
};
New Board Impementation Principle
There are three agents which use the shared data block. Those agents are represented as instances of one_event_handler_t
with different priorities.
Agent news_receiver has the lowest priority p1. It handles requests for new stories publishing. It also removes too old stories. There is a smallest quote for the priority p1.
Agent news_director has the next priority p2. It handles requests for list of new stories. It receives last ID known to a reader and replies with a list of new stories (if any) or with empty list (if there is no new stories). The quote for p2 is greater than quote for p1.
Agent story_extractor has the next priority p3. It handles requests for full text of a specified story. It receives ID of story and replies with story text by msg_story_content_resp_ack
if story is still present or by msg_story_content_resp_nack
if story is not found (it could be deleted by news_receiver).
All three agents use the shared data block -- an instance of news_board_data
. The lifetime of that object is controlled by coop for news board agents:
so_5::mbox_t create_board_coop(
so_5::environment_t & env,
const so_5::mbox_t & logger_mbox )
{
auto board_mbox = env.create_mbox();
using namespace so_5::disp::prio_one_thread::quoted_round_robin;
using namespace so_5::prio;
// Board cooperation will use quoted_round_robin dispatcher
// with different quotes for agents.
env.introduce_coop(
make_dispatcher( env,
quotes_t{ 1 }
.set( p1, 10 ) // 10 events for news_receiver.
.set( p2, 20 ) // 20 events for news_directory.
.set( p3, 30 ) // 30 events for story_extractor.
).binder(),
[&]( so_5::coop_t & coop )
{
// Lifetime of news board data will be controlled by cooperation.
auto board_data = coop.take_under_control(
std::make_unique< news_board_data >() );
define_news_receiver_agent(
coop, *board_data, board_mbox, logger_mbox );
define_news_directory_agent(
coop, *board_data, board_mbox, logger_mbox );
define_story_extractor_agent(
coop, *board_data, board_mbox, logger_mbox );
} );
return board_mbox;
}
Work with shared data object is completely safe because all three agents are working on the context of the same working thread.
Example Result
There is a sample output from running example. It is possible to see that various types of requests are handled in batchs: some events for news_receiver, some events for news_directory, some events for story_extractor and so on:
{publisher3}: Publish new story: A story from publisher3 #1
{board.receiver}: new story published, id=1, title=A story from publisher3 #1
{publisher3}: Publish finished, id=1, publish took 54ms
{publisher4}: Publish new story: A story from publisher4 #1
{board.receiver}: new story published, id=2, title=A story from publisher4 #1
{reader44}: requesting updates, last_id=0
{board.directory}: request for updates received, last_id=0
{publisher4}: Publish finished, id=2, publish took 65ms
{reader2}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader44}: 2 updates received, took 151ms
{reader44}: requesting story {1}
{reader30}: requesting updates, last_id=0
{reader31}: requesting updates, last_id=0
{publisher5}: Publish new story: A story from publisher5 #1
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader2}: 2 updates received, took 168ms
{reader2}: requesting story {1}
{board.directory}: 2 new stories found
{reader30}: 2 updates received, took 143ms
{reader30}: requesting story {1}
{board.directory}: request for updates received, last_id=0
{reader25}: requesting updates, last_id=0
{publisher3}: Publish new story: A story from publisher3 #2
{reader34}: requesting updates, last_id=0
{publisher2}: Publish new story: A story from publisher2 #1
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader31}: 2 updates received, took 209ms
{reader31}: requesting story {1}
{reader35}: requesting updates, last_id=0
{reader9}: requesting updates, last_id=0
{reader11}: requesting updates, last_id=0
{reader28}: requesting updates, last_id=0
{reader43}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader25}: 2 updates received, took 126ms
{reader25}: requesting story {1}
{reader46}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader34}: 2 updates received, took 112ms
{reader34}: requesting story {1}
{reader41}: requesting updates, last_id=0
{reader19}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader35}: 2 updates received, took 158ms
{reader35}: requesting story {1}
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader9}: 2 updates received, took 197ms
{reader9}: requesting story {1}
{reader7}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader11}: 2 updates received, took 314ms
{reader11}: requesting story {1}
{reader13}: requesting updates, last_id=0
{reader17}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{reader28}: 2 updates received, took 425ms
{reader28}: requesting story {1}
{board.directory}: request for updates received, last_id=0
{reader10}: requesting updates, last_id=0
{reader22}: requesting updates, last_id=0
{publisher1}: Publish new story: A story from publisher1 #1
{reader33}: requesting updates, last_id=0
{reader16}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader43}: 2 updates received, took 507ms
{reader43}: requesting story {1}
{reader6}: requesting updates, last_id=0
{reader50}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader46}: 2 updates received, took 580ms
{reader46}: requesting story {1}
{reader29}: requesting updates, last_id=0
{reader18}: requesting updates, last_id=0
{reader3}: requesting updates, last_id=0
{reader36}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader41}: 2 updates received, took 615ms
{reader41}: requesting story {1}
{reader40}: requesting updates, last_id=0
{reader4}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader19}: 2 updates received, took 608ms
{reader19}: requesting story {1}
{reader45}: requesting updates, last_id=0
{reader20}: requesting updates, last_id=0
{reader23}: requesting updates, last_id=0
{reader26}: requesting updates, last_id=0
{reader38}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader7}: 2 updates received, took 572ms
{reader7}: requesting story {1}
{reader24}: requesting updates, last_id=0
{reader39}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader13}: 2 updates received, took 599ms
{reader13}: requesting story {1}
{reader47}: requesting updates, last_id=0
{reader32}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader17}: 2 updates received, took 675ms
{reader17}: requesting story {1}
{reader1}: requesting updates, last_id=0
{publisher4}: Publish new story: A story from publisher4 #2
{reader5}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader10}: 2 updates received, took 621ms
{reader10}: requesting story {1}
{board.directory}: 2 new stories found
{board.directory}: request for updates received, last_id=0
{reader22}: 2 updates received, took 641ms
{reader22}: requesting story {1}
{reader8}: requesting updates, last_id=0
{reader27}: requesting updates, last_id=0
{reader14}: requesting updates, last_id=0
{board.directory}: 2 new stories found
{board.receiver}: new story published, id=3, title=A story from publisher5 #1
{reader33}: 2 updates received, took 726ms
{reader33}: requesting story {1}
{board.receiver}: new story published, id=4, title=A story from publisher3 #2
{publisher5}: Publish finished, id=3, publish took 1477ms
{reader21}: requesting updates, last_id=0
{reader37}: requesting updates, last_id=0
{publisher3}: Publish finished, id=4, publish took 1422ms
{board.receiver}: new story published, id=5, title=A story from publisher2 #1
{reader49}: requesting updates, last_id=0
{board.receiver}: new story published, id=6, title=A story from publisher1 #1
{publisher2}: Publish finished, id=5, publish took 1498ms
{reader12}: requesting updates, last_id=0
{reader48}: requesting updates, last_id=0
{reader42}: requesting updates, last_id=0
{board.receiver}: new story published, id=7, title=A story from publisher4 #2
{publisher1}: Publish finished, id=6, publish took 1050ms
{reader15}: requesting updates, last_id=0
{board.extractor}: request for story content received, id=1
{publisher4}: Publish finished, id=7, publish took 587ms
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader44}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1937ms
{reader44}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader2}: read story {1} 'A story from publisher3 #1': "This is a ...er3", took 1927ms
{reader2}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader30}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1956ms
{reader30}: requesting story {2}
{publisher3}: Publish new story: A story from publisher3 #3
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader31}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1930ms
{reader31}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader25}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1971ms
{reader25}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader34}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 2003ms
{reader34}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader35}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1964ms
{reader35}: requesting story {2}
{publisher2}: Publish new story: A story from publisher2 #2
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader9}: read story {1} 'A story from publisher3 #1': "This is a ...er3", took 1995ms
{reader9}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader11}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1949ms
{reader11}: requesting story {2}
{publisher5}: Publish new story: A story from publisher5 #2
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader28}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1882ms
{reader28}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader43}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1824ms
{reader43}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader46}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1852ms
{reader46}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader41}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1879ms
{reader41}: requesting story {2}
{publisher1}: Publish new story: A story from publisher1 #2
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader19}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1966ms
{reader19}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader7}: read story {1} 'A story from publisher3 #1': "This is a ...er3", took 1929ms
{reader7}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader13}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1942ms
{reader13}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader17}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1964ms
{reader17}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader10}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 1968ms
{reader10}: requesting story {2}
{publisher4}: Publish new story: A story from publisher4 #3
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=1
{reader22}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 2042ms
{reader22}: requesting story {2}
{board.extractor}: story {1} found
{board.extractor}: request for story content received, id=2
{reader33}: read story {1} 'A story from publisher3 #1': "This is a...her3", took 2004ms
{reader33}: requesting story {2}
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader44}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1682ms
{reader44}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader2}: read story {2} 'A story from publisher4 #1': "This is a ...er4", took 1627ms
{reader2}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader30}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1675ms
{reader30}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader31}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1701ms
{reader31}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader25}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1649ms
{reader25}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader34}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1631ms
{reader34}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader35}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1666ms
{reader35}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader9}: read story {2} 'A story from publisher4 #1': "This is a ...er4", took 1664ms
{reader9}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader11}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1654ms
{reader11}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.directory}: request for updates received, last_id=0
{reader28}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 1636ms
{reader28}: requesting updates, last_id=2
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader16}: 7 updates received, took 3529ms
{reader16}: requesting story {5}
{board.directory}: 7 new stories found
{reader6}: 7 updates received, took 3558ms
{reader6}: requesting story {5}
{board.directory}: request for updates received, last_id=0
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader50}: 7 updates received, took 3550ms
{reader50}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader29}: 7 updates received, took 3593ms
{reader29}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader18}: 7 updates received, took 3701ms
{reader18}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader3}: 7 updates received, took 3691ms
{reader3}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader36}: 7 updates received, took 3777ms
{reader36}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader40}: 7 updates received, took 3823ms
{reader40}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader4}: 7 updates received, took 3918ms
{reader4}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader45}: 7 updates received, took 3988ms
{reader45}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader20}: 7 updates received, took 4097ms
{reader20}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader23}: 7 updates received, took 4157ms
{reader23}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader26}: 7 updates received, took 4255ms
{reader26}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader38}: 7 updates received, took 4273ms
{reader38}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader24}: 7 updates received, took 4344ms
{reader24}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader39}: 7 updates received, took 4344ms
{reader39}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader47}: 7 updates received, took 4335ms
{reader47}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader32}: 7 updates received, took 4436ms
{reader32}: requesting story {5}
{board.directory}: 7 new stories found
{board.directory}: request for updates received, last_id=0
{reader1}: 7 updates received, took 4455ms
{reader1}: requesting story {5}
{board.directory}: 7 new stories found
{reader5}: 7 updates received, took 4498ms
{reader5}: requesting story {5}
{board.receiver}: new story published, id=8, title=A story from pub...
{board.receiver}: new story published, id=9, title=A story from pub...
{publisher3}: Publish finished, id=8, publish took 3770ms
{board.receiver}: new story published, id=10, title=A story from pu...
{publisher2}: Publish finished, id=9, publish took 3576ms
{publisher5}: Publish finished, id=10, publish took 3593ms
{board.receiver}: new story published, id=11, title=A story from pu...
{board.receiver}: new story published, id=12, title=A story from pu...
{publisher1}: Publish finished, id=11, publish took 3317ms
{board.extractor}: request for story content received, id=2
{publisher4}: Publish finished, id=12, publish took 2891ms
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader43}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 3698ms
{reader43}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader46}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 3614ms
{reader46}: requesting updates, last_id=2
{board.extractor}: story {2} found
{board.extractor}: request for story content received, id=2
{reader41}: read story {2} 'A story from publisher4 #1': "This is a...her4", took 3595ms
{reader41}: requesting updates, last_id=2
{board.extractor}: story {2} found