messaging filters - grecosoft/NetFusion GitHub Wiki

IMAGE Message Filters

Message filters provide extension points invoked before and after a message is dispatched to the consumer. This allows code to be injected for altering the message before it is sent to the consumer or before the result is returned.

A message filter is a class implementing one or both of the below interfaces:

  • IPreMessageFilter: Provides a method passed the message before the consumer is called.
  • IPostMessageFilter: Provides a method passed the message after the consumer is called but before the result is returned to the caller.

Creating and registering a message filter is similar to adding a message enricher.

While to example used within this topic is not something that would be needed, it shows how to create and register a filter. The example filter checks if the message implements the IScoredMessage interface and applies pre and post filter logic.

Define Interface

Define the following interface used by the filter to determine if the filter applies to the message. Add the interface to the following location: Examples.Messaging.Domain/Commands.

using System.Collections.Generic;

namespace Examples.Messaging.Domain.Commands;

public interface IScoredMessage
{
    public IEnumerable<int> Scores { get; }
    public IEnumerable<int> Results { get; }
}

Define Command

The following defines a command conforming to the IScoredMessage interface:

using System;
using System.Collections.Generic;
using Examples.Messaging.Domain.Entities;
using NetFusion.Messaging.Types;

namespace Examples.Messaging.Domain.Commands;

public class PublishScoresCommand : Command<PublishedScoreResult>,
    IScoredMessage
{
    public IEnumerable<int> Scores { get; }
    
    public PublishScoresCommand(IEnumerable<int> scores)
    {
        Scores = scores;
    }

    public IEnumerable<int> Results => Result?.PublishedScores ?? Array.Empty<int>();
}

Define Consumer

Add the consumer to the following location: Examples.Messaging.App/Handlers

using System;
using System.Linq;
using Examples.Messaging.Domain.Commands;
using Examples.Messaging.Domain.Entities;

namespace Examples.Messaging.App.Handlers;

public class PublishScoreHandler
{
    public PublishedScoreResult OnPublish(PublishScoresCommand command)
    {
        var random = new Random();

        var updatedScores = command.Scores.Select(s => s + random.Next(0, 50)).ToArray();
        return new PublishedScoreResult(updatedScores);
    }
}

Add the following line to the InMemoryRouter to route the command to the consumer:

OnCommand<PublishScoresCommand, PublishedScoreResult>(route => 
    route.ToConsumer<PublishScoreHandler>(c => c.OnPublish));

Define Filter

At this point in the example, creating and routing the command to a consumer is the same as all prior examples. The following defines a filter checking if the received message implements the IScoredMessage interface and adds additional message attributes. Define the filter at the following location: Examples.Messaging.Infra/ScoreMessageFilter.cs

using System.Linq;
using System.Threading.Tasks;
using Examples.Messaging.Domain.Commands;
using NetFusion.Messaging.Filters;
using NetFusion.Messaging.Types.Attributes;
using NetFusion.Messaging.Types.Contracts;

namespace Examples.Messaging.Infra;

public class ScoreMessageFilter : IPreMessageFilter, IPostMessageFilter
{
    public Task OnPreFilterAsync(IMessage message)
    {
        if (message is IScoredMessage scoredMessage)
        {
            message.Attributes.SetIntValue("PreMaxScore", scoredMessage.Scores.Max());
        }
        
        return Task.CompletedTask;
    }

    public Task OnPostFilterAsync(IMessage message)
    {
        if (message is IScoredMessage scoredMessage)
        {
            message.Attributes.SetIntValue("PostMaxScore", scoredMessage.Results.Max());
        }
        
        return Task.CompletedTask;
    }
    
}

Register Filter

The filler is registered by adding to the MessageDispatchConfig when the microservice is bootstrapped:

// Add Plugins to the Composite-Container:
    builder.Services.CompositeContainer(builder.Configuration, new SerilogExtendedLogger())
        .AddSettings()
        .AddMessaging()
        .InitPluginConfig<MessageDispatchConfig>(c =>
        {
            c.AddFilter<ScoreMessageFilter>();  // <-- Add this line
        })

        .AddPlugin<InfraPlugin>()
        .AddPlugin<AppPlugin>()
        .AddPlugin<DomainPlugin>()
        .AddPlugin<WebApiPlugin>()
        .Compose();

Define WebApi Model

Define the following module to which data is posted:

namespace Examples.Messaging.WebApi.Models;

public class ScoreModel
{
    public IEnumerable<int> Scores { get; set; } = Array.Empty<int>();
}

Define WebApi Controller

Define the following controller method to create the command from the posed data:

using Examples.Messaging.Domain.Commands;
using Examples.Messaging.WebApi.Models;
using Microsoft.AspNetCore.Mvc;
using NetFusion.Messaging;

namespace Examples.Messaging.WebApi.Controllers;

[ApiController, Route("api/messaging")]
public class MessageController : ControllerBase
{
    private readonly IMessagingService _messaging;

    public MessageController(IMessagingService messaging)
    {
        _messaging = messaging;
    }
    [HttpPost("score/filter")]
    public async Task<IActionResult> FilterTest([FromBody]ScoreModel model)
    {
        var command = new PublishScoresCommand(model.Scores);
        var result = await _messaging.SendAsync(command);

        return Ok(new { result, command.Attributes});
    }
}

Execute Example

Post the following to http://localhost:5670/api/messaging/score/filter

{
    "scores": [10, 30, 50, 60]
}

IMAGE

Filter Logs

IMAGE

The above shows the log entries expanded written by the message pipeline when filters are applied. Below shows the bootstrap log listing the configure filters:

IMAGE

⚠️ **GitHub.com Fallback** ⚠️