How to test incoming and outgoing steps - rebus-org/Rebus GitHub Wiki

If you are extending Rebus by inserting steps into either the incoming or the outgoing pipelines, you might want to test that those steps work too.

An example could be this step that copies the value of the current principal's http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn claim to an outgoing header:

[StepDocumentation("Sets the 'x-upn' header on outgoing messages to the current user's UPN.")]
class SetUsernameOutgoingStep : IOutgoingStep
{
    public Task Process(OutgoingStepContext context, Func<Task> next)
    {
        var upn = ClaimsPrincipal.Current?.FindFirst(ClaimTypes.Upn)?.Value;

        if (upn != null)
        {
            var message = context.Load<Message>();
            var headers = message.Headers;

            headers["x-upn"] = upn;
        }

        return next();
    }
}

While it would certainly be possible to simply new up the step and exercise it in isolation in a unit test, it's much more interesting to see if it works in conjunction with all the other steps in the pipeline, so we create an integration test for it instead.

In this case, because we're testing an outgoing step, our test should consist of

  • a one-way client configured with our step
  • a message sent to a queue
  • an in-mem network, so we can inspect the sent message

The test could look like this (assuming we're using NUnit):

[Test]
public async Task VerifyUpnHeaderIsSet()
{
    // this is the UPN
    var upn = Guid.NewGuid().ToString();

    // establish identity with UPN claim
    var claimsIdentity = new ClaimsIdentity(new[] { new Claim(ClaimTypes.Upn, upn), });
    ClaimsPrincipal.ClaimsPrincipalSelector = () => new ClaimsPrincipal(claimsIdentity);

    // create one-way client
    using (var activator = new BuiltinHandlerActivator())
    {
        var network = new InMemNetwork();

        var bus = Configure.With(activator)
            .Transport(t => t.UseInMemoryTransportAsOneWayClient(network))
            .Options(o => o.Decorate<IPipeline>(c =>
            {
                var pipeline = c.Get<IPipeline>();
                var step = new SetUsernameOutgoingStep();
                return new PipelineStepInjector(pipeline)
                    .OnSend(step, PipelineRelativePosition.Before, typeof(AssignDefaultHeadersStep));
            }))
            .Options(o => o.LogPipeline(verbose: true))
            .Start();

        // create destination queue
        network.CreateQueue("dst-queue");

        // send string message to destination queue
        await bus.Advanced.Routing.Send("dst-queue", "hej med dig");

        // retrieve message that was sent
        var message = network.GetNextOrNull("dst-queue");

        Assert.That(message, Is.Not.Null, "Expected to receive a message");
        Assert.That(message.Headers, Contains.Key("x-upn").And.ContainValue(upn));
    }
}

As you can see, it's slightly more involved this way, but we can now be pretty sure that our step works as intended.

Btw., did you notice the little .Options(o => o.LogPipeline(verbose: true)) thing above? It instructs Rebus to output the contents of its send/receive pipelines to the log, in this case resulting in the following output:

------------------------------------------------------------------------------
Message pipelines
------------------------------------------------------------------------------
Send pipeline:
    RebusWikiCodeSamples.SetUsernameOutgoingStep
        Sets the 'x-upn' header on outgoing messages to the current
        user's UPN.

    Rebus.Pipeline.Send.AssignDefaultHeadersStep
        Assigns these default headers to the outgoing message:
        
        1) a new GUID as the 'rbs2-msg-id' header (*).
        
        2) a 'rbs2-return-address' (unless the bus is a one-way
        client) (*).
        
        3) a 'rbs2-senttime' with the current time.
        
        4) 'rbs2-msg-type' with the message's simple assembly-qualified
        type name (*).
        
        (*) Unless explicitly set to something else

    Rebus.Pipeline.Send.FlowCorrelationIdStep
        Sets the 'rbs2-corr-id' header of the outgoing message
        to one of the following three things:
        
        1) The correlation ID of the message currently being handled.
        2) The message ID of the message currently being handled.
        3) The message's own message ID.

    Rebus.Pipeline.Send.AutoHeadersOutgoingStep
        If the outgoing message type has [HeaderAttribute(...,
        ...)] on it, the found headers will automatically
        be picked up and added to the outgoing message.
        
        Headers already on the message will not be overwritten.

    Rebus.Pipeline.Send.SerializeOutgoingMessageStep
        Serializes the outgoing message using the configured
        serializer, storing the resulting transport message back
        to the context.

    Rebus.Pipeline.Send.ValidateOutgoingMessageStep
        Checks the consistency of the outgoing message

    Rebus.Pipeline.Send.SendOutgoingMessageStep
        Final step that uses the current transport to send the transport
        message found in the context to all addresses found by looking
        up the DestinationAddress object from the context.


Receive pipeline:
    Rebus.Retry.Simple.SimpleRetryStrategyStep
        Wraps the invocation of the entire receive pipeline in an exception
        handler, tracking the number of times the received message
        has been attempted to be delivered.
        
        If the maximum number of delivery attempts is reached,
        the message is moved to the error queue.

    Rebus.Retry.FailFast.FailFastStep
        Checks if a message has failed with specific exceptions
        and marks it as "failed too many times". This allows the SimpleRetryStrategyStep
        to move it to the error queue.

    Rebus.Pipeline.Receive.HandleDeferredMessagesStep
        If the incoming message should not be handled now, this step
        saves the message until it is time to deliver the message.
        
        This is done by checking if the incoming message has a 'rbs2-deferred-until'
        header with a desired time to be delivered.

    Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep
        Incoming step that 'hydrates' big messages, if the payload
        was stored as a data bus attachment.

    Rebus.Pipeline.Receive.DeserializeIncomingMessageStep
        Deserializes the current transport message using the configured
        serializer, saving the deserialized message back to the context.

    Rebus.Pipeline.Receive.HandleRoutingSlipsStep
        If the message being handled is a routing slip with an itinerary,
        this step ensures that the routing slip is forwarded
        to the next destination.

    Rebus.Pipeline.Receive.ActivateHandlersStep
        Looks at the incoming message and decides how to handle
        it. A HandlerInvokers object is saved to the context to be
        invoked later.

    Rebus.Sagas.LoadSagaDataStep
        Looks at the handler invokers in the context and sees if there's
        one or more saga handlers in there.
        
        If that's the case, relevant saga data is loaded/created,
        and the rest of the pipeline gets invoked.
        
        Afterwards, all the created/loaded saga data is updated
        appropriately.

    Rebus.Pipeline.Receive.DispatchIncomingMessageStep
        Gets all the handler invokers from the current context
        and invokes them in order.
        
        Please note that each invoker might choose to ignore the invocation
        internally.
        
        If no invokers were found, a RebusApplicationException
        is thrown.

------------------------------------------------------------------------------
⚠️ **GitHub.com Fallback** ⚠️