Module Quickstart (Protocol Buffers) - FREEDM-DGI/FREEDM GitHub Wiki
This is incomplete. when it's done, it should be edited into the original document for revision purposes. -Tom
Here's a checklist for creating a module:
- Create the source files
- Implement your handlers and methods
- Hook the new module into the execution cycle.
- Add your new implementation files to Broker/src/CMakeLists.txt
Here is a sample for creating a module with some examples to help get started. A module can interact with other modules (such as GroupManagement and StateCollection) via DGI's Message Passing Interface:
Here is a sample header.
#ifndef YOURAGENT_HPP
#define YOURAGENT_HPP
#include "CBroker.hpp"
#include "CPeerNode.hpp"
#include "PeerSets.hpp"
#include "IDGIModule.hpp"
#include "messages/ModuleMessage.pb.h"
#include <boost/shared_ptr.hpp>
namespace freedm {
namespace broker {
namespace ym {
class YourAgent
: public IDGIModule
{
public:
/// Constructor
YourAgent();
/// Called to start the module
int Run();
private:
/// Creates a new SomeType message to send to another DGI module
ModuleMessage MessageSomeType(int value);
/// Adds header information to the SomeType message prior to sending
ModuleMessage PrepareForSending(const YourAgentMessage & m, std::string recipient = "ym");
/// Sends received messages to the appropriate message handlers
void HandleIncomingMessage(boost::shared_ptr<const ModuleMessage> m, CPeerNode peer);
/// Handles recieving peer lists; recommended
void HandlePeerList(const gm::PeerListMessage & m, CPeerNode peer);
/// You want one handler for each type of message you expect to receive
void HandleSomeType(const SomeType & m, CPeerNode peer);
/// An example of how to use timers; called when our timer expires
void OnTimerExpired(const boost::system::error_code& ec);
/// A timer, to run a function at some point in the future
CBroker::TimerHandle m_timer;
/// Set containing all peer DGI
PeerSet m_peers;
};
} // namespace ym
} // namespace broker
} // namespace freedm
#endif
Here are the highlights:
- The Run Function: The start point for your module. It should take no arguments because it is inserted as a scheduled task, which takes no arguments by definition. Any arguments that your object needs should be passed in by the constructor
- Message Handlers: The functions which handle incoming message. Each expected message type should have its own handling function. The possible message types are defined in the ModuleMessage file included in the header, which will be discussed later.
- Timer Handle: (Optional) if you need to run some task(s) periodically, you'll need a timer. This is an example timer handle. You'll need to request the Broker to allocate you one (See the constructor implementation).
And here's a .cpp
#include "YourModule.hpp"
#include "CBroker.hpp"
#include "CLogger.hpp"
#include "gm/GroupManagement.hpp"
#include <iostream>
#include <stdexcept>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/system/error_code.hpp>
#include <boost/system/system_error.hpp>
namespace freedm {
namespace broker {
namespace ym {
namespace {
/// This file's logger.
CLocalLogger Logger(__FILE__);
}
///////////////////////////////////////////////////////////////////////////////
/// Creates a new YourAgent
///////////////////////////////////////////////////////////////////////////////
YourAgent::YourAgent()
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* When requesting a timer from the Broker, it is ours forever, so
we store it in a member variable. Be careful not to leak these. */
m_timer = CBroker::Instance().AllocateTimer("ym");
}
///////////////////////////////////////////////////////////////////////////////
/// Runs the module. This function is called the first time the module starts
/// because we schedule it using CBroker.Schedule() in main(). It is called
/// again in the future because it schedules itself.
///
/// The only real work we do here is to send a message to ourself.
///////////////////////////////////////////////////////////////////////////////
void YourAgent::Run()
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* Create a new SomeType message that stores the value 42 */
ModuleMessage m = MessageSomeType(42);
/* Send the message to ourself. The GetMe() function is provided by
IDGIModule, which all modules must inherit from. */
GetMe().Send(m);
/* Schedule ourself so that we run again. This will occur almost
immediately, unless we have passed the end of our phase, in which case
it will happen at the start of our next phase. */
CBroker::Instance().Schedule("ym",
boost::bind(&YourAgent::Run, this),
false);
}
///////////////////////////////////////////////////////////////////////////////
/// Creates a SomeType message that stores the passed value. In general, all
/// messages should have a similar function that creates and returns a properly
/// formatted message object.
///
/// The message classes in this function are from the ModuleMessage header file
/// and must be defined by us before we complete this example.
///////////////////////////////////////////////////////////////////////////////
ModuleMessage YourAgent::MessageSomeType(int value)
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* This is the default message type for our module. All other messages, such
as the SomeType message we are creating, are stored inside this type. */
YourAgentMessage msg;
/* This creates and returns a SomeType message inside the module message. */
SomeTypeMessage * submsg = msg.mutable_some_type_message();
/* We then must initialize the message and append header information. */
submsg->set_stored_value(value);
return PrepareForSending(msg);
}
///////////////////////////////////////////////////////////////////////////////
/// Each module must have a PrepareForSending function that wraps its messages
/// into a ModuleMessage. The ModuleMessage is the unified interface for all
/// DGI message types and contains additional header information required to
/// route messages to the correct message handlers.
///
/// The recipient parameter refers to the identifier for the module that will
/// receive this message. In the header, the recipient is always set to the
/// sending module ("ym" in this instance, for YourModule) as a default value.
/// The recipient can be changed by calling PrepareForSending with a second
/// argument.
///////////////////////////////////////////////////////////////////////////////
ModuleMessage YourAgent::PrepareForSending(const YourAgentMessage & m, std::string recipient)
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* Store our message inside a ModuleMessage and set its destination */
ModuleMessage mm;
mm.mutable_your_agent_message()->CopyFrom(m);
mm.set_recipient_module(recipient);
return mm;
}
///////////////////////////////////////////////////////////////////////////////
/// This function is called by the DGI when a new message has arrived for your
/// module. It is given the message that was sent, as well as the peer who
/// sent the message. The purpose of this function is to figure out what type
/// of message was sent, and to send it to the appropriate handler function.
///////////////////////////////////////////////////////////////////////////////
void YourAgent::HandleIncomingMessage(boost::shared_ptr<const ModuleMessage> m, CPeerNode peer)
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* We first need to check what module sent the message, as this will
determine what possible types the message could be. */
if(m->has_your_agent_message())
{
/* We extract our YourAgentMessage from the ModuleMessage that
was created in the PrepareForSending function above. */
YourAgentMessage yam = m->your_agent_message();
/* Then we must determine what type of message the module sent.
In this case, there is only one type of message we expect to
receive from our own module. */
if(yam.has_some_type_message())
{
/* And all we have to do is send it to our handler function. */
HandleSomeType(yam.some_type_message(), peer);
}
else
{
Logger.Warn << "Dropped unknown message:\n" << m->DebugString();
}
}
else if(m->has_group_management_message())
{
gm::GroupManagementMessage gmm = m->group_management_message();
if(gmm.has_peer_list_message())
{
HandlePeerList(gmm.peer_list_message(), peer);
}
else
{
Logger.Warn << "Dropped unknown message:\n" << m->DebugString();
}
}
else
{
/* We only expect to receive a PeerList message from Group Management,
and the SomeType message from ourself. If the message was sent by
any other module, we aren't expecting it, so we drop it. */
Logger.Warn << "Dropped message of unexpected type:\n" << m->DebugString();
}
}
///////////////////////////////////////////////////////////////////////////////
/// Receives and processes a SomeType message.
///////////////////////////////////////////////////////////////////////////////
void YourAgent::HandleSomeType(const SomeTypeMessage & m, CPeerNode peer)
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* We can extract information about the sender from the peer parameter,
and our data from the message parameter. */
Logger.Notice << "Received SomeType message from "
<< peer->GetUUID() << ": "
<< m.value()
<< std::endl;
/* Schedule a function to be called in the future. If this time is past the
end of our phase, or is not_a_date_time, then the function will be called
during our next phase. */
CBroker::Instance().Schedule(m_timer,
boost::posix_time::seconds(1),
boost::bind(&YourAgent::OnTimerExpired,
this,
boost::asio::placeholders::error));
}
///////////////////////////////////////////////////////////////////////////////
/// Receives an updated list of peers that we are connected to.
///////////////////////////////////////////////////////////////////////////////
void YourAgent::HandlePeerList(const gm::PeerListMessage & m, CPeerNode /* peer */)
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* In this example, we don't do anything with this list of peers,
but it's usually pretty important to know who's in your group. */
m_peers = gm::GMAgent::ProcessPeerList(msg);
}
///////////////////////////////////////////////////////////////////////////////
/// Scheduled to run in the future by HandleSomeType()
///////////////////////////////////////////////////////////////////////////////
void YourAgent::OnTimerExpired(const boost::system::error_code& ec)
{
Logger.Trace << __PRETTY_FUNCTION__ << std::endl;
/* All functions scheduled by CBroker can be called with some error condition.
One of these error conditions, operation aborted, is benign and merely states
that the DGI is closing. All other cases are exceptional conditions. */
if (ec && ec != boost::asio::error::operation_aborted)
{
throw boost::system::system_error(ec);
}
Logger.Notice << "Received a message some time ago" << std::endl;
}
} // namespace ym
} // namespace broker
} // namespace freedm
Note that CBroker::Instance() was added after the DGI 1.5 release. If you are working with 1.5 or earlier, you will need to pass a reference to the broker in the constructor and store the reference as a member variable.
Similarly, GMAgent::ProcessPeerList() requires a second argument in DGI 1.5 and earlier. For the additional argument, just pass GetConnectionManager().
Timers are objects which set a minimum "deadline" for a function to be executed. A timer specifies some interval after which the specified function will be called. The term minimum is used because a timer may expire during another modules phase and will placed into a ready queue until the module has regained control of the worker.
Real-Time timers are an abstraction of Boost timers, and are allocated using the TimerHandle type. Timers can be allocated by requesting an allocation from the broker:
TimerHandle m_timer = broker.AllocateTimer("yourmodule");
Timers are set by providing a boost::posix_time::time_duration and a functor to execute (typically via boost ::bind).
m_broker.Schedule(m_timer, boost::posix_time::milliseconds(600),
boost::bind(&YourModule::Method, this, boost::asio::placeholders::error));
Note that the resolved function are the same as for boost's normal implementation of timers: The function that is referenced by the functor takes only one argument which is of type boost::system::error_code. That is they must have a signature of the form:
YourModule::Method( const boost::system::error_code& err)
Timers can also take a special timing value: boost::posix_time::not_a_date_time for the expiration time. That timer will expire immediately after the module loses control of execution (For it to run the next time the phase is available)
TimerHandles only need to be allocated once, and it is recommended to do so in the constructor, although they can be created at any time.
The DGI provides a object for interacting with peers: the PeerNode object. Typically you are provided with a PeerNodePtr, a pointer to a peer object. A PeerNodePtr is a construct to facilitate interacting with other nodes. A PeerNode provides GetUUID() and Send() methods which are used to get the UUID and to send messages to a node.
PeerSets are containers which store a number of PeerNodePtrs. The IAgent class provides several functions for manipulating PeerSets.
GroupManagement provides a static function for processing the "any.PeerList" message it sends when the group status changes. This function, GMAgent::ProcessPeerList takes in a message and a connection manager (Available by the GetConnectionManager() method provided via the PeerNode class your module inherits from) and returns a PeerSet.
Message handlers select message subtypes for processing. All messages contain a special key that indicates which handler should process a message. (See Messages) We recommend creating a handler for each message type you expect to receive. A message handler should have a function signature:
void YourModule::HandleSomeType(const MessageType & m, CPeerNode peer)
MessageType must be defined in the ModuleMessage header before it can be used in the code. This will be discussed further in the section below.
You must call each individual message handler in the HandleIncomingMessage function shown in the example above. This HandleIncomingMessage function is called for you each time your module receives a new message, but you must determine which type of message has been received and define how the module should handle that message type.
The DGI uses Google Protocol Buffers for its messages.
Data is transported in messages using the boost::property_tree container, which allows for a complex key-value structure. Here is an example of creating a message:
CMessage m_;
m_.SetHandler("gm.Invite");
m_.m_submessages.put("gm.source", m_GroupLeader);
m_.m_submessages.put("gm.groupid",m_GroupID);
m_.m_submessages.put("gm.groupleader",m_GroupLeader);
m_.m_submessages.put("gm.groupleaderhost",p->GetHostname());
m_.m_submessages.put("gm.groupleaderport",p->GetPort());
m_submessages is a public member of the CMessage (The property tree) please refer to the documentation for boost property trees for information on manipulating the tree. Note that you must call SetHandler to specify what should process the message (Module, then message type)
As before, read the boost documentation on manipulating property trees for further documentation on manipulating them.
Here is an example of accessing values from a CMessage:
ptree &pt = msg->GetSubMessages();
unsigned int GroupID = pt.get<unsigned int>("gm.groupid");
std::string m_GroupLeader = pt.get<std::string>("gm.groupleader");
We get the property_tree by reference because it is faster than making a copy of the entire structure.
Modules are registered in PosixMain.cpp. First, in PosixMain, make an instance of your class. Find the others:
gm::GMAgent GM(id);
sc::SCAgent SC(id);
lb::LBAgent LB(id);
Add your own:
ym::YourAgent YM(id);
Look for the block where the other modules are registered. Here is a sample of registering a module
CBroker::Instance().RegisterModule("ym",boost::posix_time::seconds(3));
CDispatcher::Instance().RegisterReadHandler("ym", "ym", &YM)
The first line registers the module with the scheduler. The first parameter is a short string that is used to name the owner of the queues and timers the broker creates. The second is the amount of time that the module is given to execute. The execution time should be sufficient for your module to execute the entirety of its periodic task. (Note that it would be better to use the existing timings interface so that the timing is configurable, see Timer Settings in the Real Time Code
The second line registers the module with the Dispatcher. The first parameter is a string that identifies the module that will receive the messages that the dispatcher delivers. It should be the same as the name given as the first parameter of the first line. The second parameter specifies the token that the module will ask for (as part of CMessage.SetHandler), typically this should be the same as the first argument. The string "any" will deliver EVERY message received by the DGI to the given module. The third argument is the instance of your module that you created earlier.