How to write an augmentor - datacratic/rtbkit GitHub Wiki
In this document we'll write a simple frequency cap augmentor. For the uninitiated, an augmentor is responsible for adding data to a bid request as it goes through the router. This data is delivered to each agent that asked for the augmentation and can also be used by the router to apply custom filtering on the bid requests destined for a given agent.
The complete source code for the example is available in the rtbkit repository under the following 3 files:
- rtbkit/examples/augmentor_ex.h
- rtbkit/examples/augmentor_ex.cc
- rtbkit/examples/augmentor_ex_runner.cc
Note that the example is split into 3 files so that it could be more easily re-used in the rtbkit integration test and is in fact quite a bit simpler then the bidding agent example. Going forward, we'll also assume that accounts follow the <campaign>.<strategy>
pattern. For more information on accounts, see the banker's documentation.
We'll go straight to the meaty part of an augmentor: answer an augmentation request sent by the router. For a frequency cap augmentor this means adding the frequency count (the number won bids by an agent for a given user) to the bid request and determining whether the bid request should make it to the agent. Note that the augmentation will be unique to each agent which means that we will need to tailor our response for each currently active agent.
RTBKIT::AugmentationList
FrequencyCapAugmentor::
onRequest(const RTBKIT::AugmentationRequest& request)
{
recordHit("requests");
Each augmentation request will start here, at the onRequest
function which will be called by either the AsyncAugmentor
or the SyncAugmentor
class. Both these classes manage the protocol between the augmentor and the router as well a providing a thread pool to farm out the augmentation work.
const RTBKIT::UserIds& uids = request.bidRequest->userIds;
The bid request we're augmenting is available through the AugmentationRequest
object. In the case of our augmentor, we're only interested in the user that originated the bid request so we extract the various ids that are associated with that user.
RTBKIT::AugmentationList result;
Next up is the augmentation list which contains the augmentation for each agent. The object is a map keyed on account prefixes such that every entry that matches a given agent account will be merged together to form the final augmentation for that agent. As an example, take the account prefixes a
and a:b
. When applied to to the account a:b
then the final augmentation will be an aggregation of both entries. The account a:c
on the other hand will only contain the entry attached to the prefix a
.
for (const string& agent : request.agents) {
RTBKIT::AgentConfigEntry config = agentConfig.getAgentEntry(agent);
if (!config.valid()) {
recordHit("unknownConfig");
continue;
}
Next we'll iterate through all the currently active agents using C++11's for each statement and extract the corresponding AgentConfig
object out of the AgentConfigurationListener
. Note that because of the asynchronous nature of the agent configuration protocol, it's entirely possible that an augmentation request destined for a specific agent will reach us before our configuration listener has received the configuration for that agent. In our case, there's very little we can do without the agent's configuration so we just skip the agent altogether.
const RTBKIT::AccountKey& account = config.config->account;
size_t count = storage->get(account[0], uids);
result[account[0]].data = count;
In this part we augment the bid request with some data that is tailored to a specific agent. This is done by querying our storage backend for the frequency count of the current agent's campaign which is then inserted into our augmentation object using the campaign name as the key. The storage backend of this augmentor is out of scope for this tutorial but any low latency persistent storage (eg. redis) will do.
Note that the data member of the augmentation object is a json blob and can follow whatever structure is required by the current implementation.
if (count < getCap(request.augmentor, agent, config)) {
result[account].tags.insert("pass-frequency-cap-ex");
recordHit("accounts." + account[0] + ".passed");
}
else recordHit("accounts." + account[0] + ".capped");
}
Next we compare the frequency count for a given campaign to the cap supplied in the agent's configuration and we add a tag to the augmentation to allow the router to filter any bid requests that are over the cap.
return result;
}
Time to send our AugmentationList
object back to the router and the method we'll use to do this will depend on whether we're using deriving/composing a SyncAugmentor
or an AsyncAugmentor
. Here we're using a SyncAugmentor
so we can simply return the result from the onRequest
function and we're done. The AsyncAugmentor
is useful when we send we query the data from an external database where the reply will come in on a separate polling thread. For this situation, onRequest
also supplies a callback that should be used to send the response back to the router.
Note that each augmentor has 5 milliseconds to augment a bid request which is enforced by the router. If the augmentor doesn't answer within the allocated window, the router will forward the bid request down the pipeline and will ignore any late responses by the augmentors. In other words, we don't have to do any time keeping while writing an augmentor.
size_t
FrequencyCapAugmentor::
getCap( const string& augmentor,
const string& agent,
const RTBKIT::AgentConfigEntry& config) const
{
for (const auto& augConfig : config.config->augmentations) {
if (augConfig.name != augmentor) continue;
return augConfig.config.asInt();
}
return 0;
}
Here we read an agent's configuration and extract the cap we want to apply on the frequency count. Since we have to deal with agent configurations, it's entirely possible that our augmentor doesn't yet have an entry in the agent's configuration. We therefor need a sane default to fall back on; in this case we simply return a cap of 0 which will filter out all bid requests until we receive a proper configuration.
In the previous section we made use of a database to read the frequency counts. In this section we'll populate the database with win events published by the post auction loop. Note that the entirety of the code presented in this section will go in the init
function which we will detail in the next section.
Datacratic::ZmqNamedMultipleSubscriber palEvents;
palEvents.messageHandler = [&] (const vector<zmq::message_t>& msg)
{
First up, we create a ZmqNamedMultipleSubscriber
object which will subscribe to the event stream published by the post auction loop that will in turn trigger our lambda.
RTBKIT::AccountKey account(msg[19].toString());
RTBKIT::UserIds uids =
RTBKIT::UserIds::createFromJson(msg[15].toString());
Here we extract and parse the bits of information we need for our augmentor out of the post auction event. That is, the account that won the auction and the user id that we bid on. You may notice a slightly shoddy interface here: magic values to access the various piece of published data. We're currently aware of this issue and we'll be working on cleaning up this interface when we have a chance (patches are welcome).
storage->inc(account, uids);
recordHit("wins");
};
Once the information is extracted out of the post auction event, we can register the win event in our database.
palEvents.connectAllServiceProviders(
"rtbPostAuctionService", "logger", {"MATCHEDWIN"});
addSource("FrequencyCapAugmentor::palEvents", palEvents);
}
Finally, we connect our subscriber to the post auction loop through our zookeeper service discovery and we set a filter on MATCHWIN
events which will be sent every time and auction wins an auction on a particular spot. The call to addSource
registers our subscriber with the SyncAugmentor
MessageLoop
which will poll the zmq endpoint for new post auction events.
Time for some glue code to tie everything together.
struct FrequencyCapAugmentor : public RTBKIT::SyncAugmentor { ... };
Our class will derive directly from SyncAugmentor
but it's also possible to compose the class by overwriting SyncAugmentor
's doRequest
callback.
void
FrequencyCapAugmentor::
init()
{
SyncAugmentor::init(2 /* numThreads */);
In the init
function we initialize the various components that make up our service. We begin by initializing our parent class and specifying how many worker threads this augmentor will use to answer augmentation requests. The augmentor API will automagically farm all augmentation requests to a thread pool to keep its network thread responsive.
Note that if we decided to compose the SyncAugmentor
class we would also have to specify an additional start
and shutdown
function which would simply be forwarded to our base augmentor.
// Post auction loop event handler goes here.
agentConfig.init(getServices()->config);
addSource("FrequencyCapAugmentor::agentConfig", agentConfig);
}
Next up we initialize our AgentConfigListener
and attach it to the SyncAugmentor
's MessageLoop
. We also include the code from the frequency counting section in here.
int main(int argc, char** argv)
{
Home stretch!
ServiceProxies serviceProxies;
RTBKit requires a Zookeeper instance to do service discovery and a Carbon instance to dump runtime metrics. These are accessed through a ServiceProxies
object which acts as a proxy to these services. While this object can be constructed manually, it's easier to just setup a bootstrap.json
file and let the object initialize itself. Note that the various services in ServiceProxies
have default localized versions which can be used to write tests.
RTBKIT::FrequencyCapAugmentor augmentor(serviceProxies, "frequency-cap-ex");
augmentor.init();
augmentor.start();
while (true) this_thread::sleep_for(chrono::seconds(10));
}
We finish things up by instantiating our agent using the service proxies object we just created and a unique service name which will be used for service discovery through Zookeeper. All that's left is to initialize the augmentor, start it and put the main thread to sleep while the background message loop does its thing.