integration redis pubsub service - grecosoft/NetFusion GitHub Wiki

IMAGERedis: PubSub Service

This topic shows how to use the ISubscriptionService to subscribe dynamically to a channel that might not be known until runtime. The ISubscriptionService service is registered within the microservice's dependency-injection container when the plugin is added to the composite application and can be injected into other register components.

This example will expose WebApi methods to allow subscribing and unsubscribing a method delegate to a specified channel.

Define Domain-Event

Define the following domain-event within the following directory: src/Components/Examples.Redis.Domain/Events

using NetFusion.Messaging.Types;

namespace Examples.Redis.Domain.Events;

public class LogEntryCreated : DomainEvent
{
    public string LogLevel { get; }
    public int Severity { get; }
    public string Message { get; }

    public LogEntryCreated(string logLevel, int severity, string message)
    {
        LogLevel = logLevel;
        Severity = severity;
        Message = message;
    }
}

Publishing Microservice

The publishing microservice will specify when a LogEntryCreated domain-event is published that it should be delivered to a channel named log-entries. Add the following code to the OnDefineEntities method:

DefineChannel<LogEntryCreated> (channel =>
{
    channel.ChannelName = "log-entries";
    channel.AppliesWhen(log => 2 < log.Severity);
    channel.SetEventData(log => $"{log.LogLevel}.{log.Severity}");
});

If the SetEventData method is called as in the above example, the returned string based on the domain-event's data will be appended to the channel name. In the above example, "log-entries.error.5" would be the channel name for a domain-event with a log level of "error" having a severity of "5". Optional is the call to the AppliesWhen method specifying a predicate indicating if the domain-event should be published to the channel.

Subscribing Microservice

The subscribing microservice defines a service component into which ISubscriptionService is injected. Since the use of Redis is an infrastructure concern, add the following class to the Examples.Redis.Infra project:

using System;
using System.Threading.Tasks;
using Examples.Redis.App.Services;
using Examples.Redis.Domain.Events;
using NetFusion.Common.Extensions;
using NetFusion.Integration.Redis;

namespace Examples.Redis.Infra;

public class EventLogSubscriber : IEventLogSubscriber
{
    private readonly ISubscriptionService _subscription;
    
    public EventLogSubscriber(
        ISubscriptionService subscription)
    {
        _subscription = subscription;
    }

    public void Subscribe(string channel)
    {
        _subscription.Subscribe<LogEntryCreated>("testDb", channel, OnEventLog);
    }

    public void UnSubscribe(string channel)
    {
        _subscription.UnSubscribe("testDb", channel);
    }

    private void OnEventLog(LogEntryCreated domainEvent)
    {
        Console.WriteLine(domainEvent.ToIndentedJson());
    }
}

Define the IEventLogSubscriber interface within the Services directory of the Examples.Redis.App project:

namespace Examples.Redis.App.Services;

public interface IEventLogSubscriber
{
    void Subscribe(string channel);
    void UnSubscribe(string channel);
}

Since this service will be injected in the the WebApi controller, it must be added to the dependency-injection container. This can be done by defining the following module within the Examples.Redis.Infra project at the following location: ./src/Components/Examples.Redis.Infra/Plugin/Modules

using Examples.Redis.App.Services;
using Microsoft.Extensions.DependencyInjection;
using NetFusion.Core.Bootstrap.Plugins;

namespace Examples.Redis.Infra.Plugin.Modules;

public class ServiceModule : PluginModule
{
    public override void RegisterServices(IServiceCollection services)
    {
        services.AddSingleton<IEventLogSubscriber, EventLogSubscriber>();
    }
}

Define Api Controller

Add the following two methods to the ExamplesController to subscribe/unsubscribe to channels and a method to publish domain-events to the channel:

using Examples.Redis.App.Services;
using Examples.Redis.Domain.Events;
using Examples.Redis.WebApi.Models;
using Microsoft.AspNetCore.Mvc;
using NetFusion.Integration.Redis;
using NetFusion.Messaging;
using StackExchange.Redis;

namespace Examples.Redis.WebApi.Controllers;

[ApiController, Route("api/[controller]")]
public class ExamplesController : ControllerBase
{
    private readonly IMessagingService _messaging;
    private readonly IEventLogSubscriber _eventLogSubscriber;
    
    public ExamplesController(
        IMessagingService messaging,
        IEventLogSubscriber eventLogSubscriber)
    {
        _messaging = messaging;
        _eventLogSubscriber = eventLogSubscriber;
    }

    [HttpPost("log/entries")]
    public async Task<IActionResult> PublishLogEntry([FromBody]LogEntryModel model)
    {
        if (!ModelState.IsValid)
        {
            return BadRequest(ModelState);
        }

        var domainEvent = new LogEntryCreated(model.LogLevel, model.Severity) { Message = model.Message };
        await _messaging.PublishAsync(domainEvent);

        return Ok();
    }

    [HttpPost("log/entries/subscribe/{channel}")]
    public IActionResult SubscribeToLog(string channel)
    {
        _eventLogSubscriber.Subscribe(channel);
        return Ok();
    }

    [HttpDelete("log/entries/subscribe/{channel}")]
    public IActionResult UnSubscribeFromLog(string channel)
    {
        _eventLogSubscriber.UnSubscribe(channel);
        return Ok();
    }
}

Execute Example

Complete the following to run the example microservice and send requests to the example controller:

Execute Microservice

cd ./Examples.Redis/src/Examples.Redis.WebApi/
dotnet run

Send Requests

Post the following requests to subscribe to multiple channel names:

POST http://localhost:5005/api/examples/log/entries/subscribe/log-entries.error.20

POST http://localhost:5005/api/examples/log/entries/subscribe/log-entries.warning.*

Next, post several messages matching the above channel names:

{
    "logLevel": "error",
    "severity": 20,
    "message": "Invalid input"
}
{
    "logLevel": "warning",
    "severity": 30,
    "message": "Invalid input"
}
{
    "logLevel": "warning",
    "severity": 80,
    "message": "Invalid input"
}

The following shows the subscribed method delegate being called:

IMAGE

Next, make a request to the controller to unsubscribe from the following channel:

log-entries.warning.*

DELETE http://localhost:5005/api/examples/log/entries/subscribe/log-entries.warning.*

Retry posting the following message and verify that the domain-event is no longer received:

{
    "logLevel": "warning",
    "severity": 80,
    "message": "Invalid input"
}

However, the following message will still be delivered:

{
    "logLevel": "error",
    "severity": 20,
    "message": "Invalid input"
}