Auto flowing user context extensibility example - rebus-org/Rebus GitHub Wiki
In a similar way to how Correlation IDs work, it can be useful to configure a user context to also flow through message handlers - i.e. if the special header username
has been added to a handled message, the header should automatically be transferred to all outgoing messages when they're sent.
This page describes how that can be achieved, at the same time serving as an example on how Rebus can be extended.
Extend OptionsConfigurer
by attaching a configuration extension method to it:
public static class UsernameFlowConfigurationExtensions
{
public static void EnableUsernameFlow(this OptionsConfigurer configurer)
{
// we'll put some code in here in a second
}
}
which in the end will provide you with a nice, fluent configuration experience like this:
Configure.With(activator)
.Transport(t => t.UseMsmq("username-flow-test"))
.Options(o => o.EnableUsernameFlow())
.Start();
The extension method can of course easily be augmented to accept arguments or even to return a fluent builder of some sorts.
All outgoing messages must be provided with a username if Thread.CurrentPrincipal
is set. With Rebus, a task like that is usually solved by inserting a pipeline step into the Outgoing messages pipeline. Outgoing pipeline steps must implement IOutgoingStep
, and in this case it could look like this:
class SetUsernameOutgoingStep : IOutgoingStep
{
public async Task Process(OutgoingStepContext context, Func<Task> next)
{
var currrentUsername = Thread.CurrentPrincipal?.Identity?.Name;
if (currrentUsername != null)
{
var message = context.Load<Message>();
var headers = message.Headers;
headers["username"] = currrentUsername;
}
await next();
}
}
As you can see, we used C# 6's null-propagation capabilities to elegantly extract the username of the current thread's identity if it has been set.
If we found something different from null
, we add it as the username
header on the outgoing message.
In order to properly identify the user when handling a received message, we will now set the current principal from the username
of the handled message (if it exists). We do this in a similar manner with an incoming pipeline step by implementing IIncomingStep
like so:
class SetCurrentPrincipalIncomingStep : IIncomingStep
{
public async Task Process(IncomingStepContext context, Func<Task> next)
{
var message = context.Load<Message>();
var headers = message.Headers;
var originalPrincipal = Thread.CurrentPrincipal;
try
{
string username;
if (headers.TryGetValue("username", out username))
{
Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(username), new string[0]);
}
await next();
}
finally
{
Thread.CurrentPrincipal = originalPrincipal;
}
}
}
As you can see, we do a little extra work to restore the current thread's principal after having invoked the rest of the pipeline.
Rebus gets its pipeline steps by asking the IPipeline
service for them, so in order to insert new steps into either pipeline we must decorate IPipeline
and change what is returned from SendPipeline()
and ReceivePipeline()
respectively.
Since this operation is such a common Rebus extensibility scenario, a special pipeline decorator exists for that purpose: PipelineStepInjector
. The step injector is capable of injecting a step into either pipeline, positioning the step relative to another existing step.
You can get an overview of the contents of the two pipelines by having Rebus log their contents at startup. In this case we will position our two steps right after deserialization and right before serialization, respectively. We can now fill in the EnableUsernameFlow
method with an implementation that looks like this:
public static class UsernameFlowConfigurationExtensions
{
public static void EnableUsernameFlow(this OptionsConfigurer configurer)
{
configurer.Decorate<IPipeline>(c =>
{
var outgoingStep = new SetUsernameOutgoingStep();
var incomingStep = new SetCurrentPrincipalIncomingStep();
var pipeline = c.Get<IPipeline>();
return new PipelineStepInjector(pipeline)
.OnReceive(incomingStep, PipelineRelativePosition.After, typeof (DeserializeIncomingMessageStep))
.OnSend(outgoingStep, PipelineRelativePosition.Before, typeof (SerializeOutgoingMessageStep));
});
}
}
This concludes the username-flow extensibility example.
In case you would like to use another header key, it should be easy to extend the code to use a custom header key provided to it from the extension method.
Another possible extension could be the ability to serialize claims, thus picking up the current claims identity and flowing that one instead of the old-school GenericPrincipal
/GenericIdentity
.