CDP4ServicesMessaging - STARIONGROUP/COMET-SDK-Community-Edition GitHub Wiki
The CDP4ServicesMessaging library is a library that provides an implementation with handy access to functionalities and managed sessions for the AMQP protocol 0-9-1 . It depends on the following .Net client which is maintained by the RabbitMQ team at VMware
- Support for Direct and Broadcast messages
- Sessions reuse per thread, using client level disposable pattern
- IObservable Listener.
- Pushing messages in a 'Fire and forget' manner.
- Background service to offload publishing messages.
- Auto-disposal of sessions when pushing messages.
- Routing for broadcasted messages.
For any exchange type, the message receival acknowledgement is set to automatic for any exchange type. Meaning that one message is never delivered twice to the same queue. It is a choice of design to not expose the choice to set the message receival acknowledgement mode.
Beyond exchanging messages in a network between services, the CDP4ServicesMessaging, provides a ThingMessageProducer
and a ThingMessageConsumer
. These two distinct services are designed to specifically send and receive ThingsChangedMessages which carries a collections of things that have been either deleted, created or updated.
The ThingMessageProducer
and a ThingMessageConsumer
depends on an implementation of the ICdp4MessageSerializer
w.
/// <summary>
/// Broadcast a message about the changed <see cref="Thing"/> instances
/// </summary>
/// <param name="changedThings">The collection of changed <see cref="Thing"/> instances.</param>
/// <param name="thingMessageProducer">The <see cref="IThingMessageProducer"/></param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/></param>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task Send(IEnumerable<Thing> changedThings, [FromServices] IThingMessageProducer thingMessageProducer, CancellationToken cancellationToken = default)
{
var message = new ThingChangedMessage()
{
ChangedThings = changedThings.ToList()
};
await thingMessageProducer.Push(message, cancellationToken);
}
/// <summary>
/// Queue a message to be in the future broadcasted about the changed <see cref="Thing"/> instances
/// </summary>
/// <param name="changedThings">The collection of changed <see cref="Thing"/> instances.</param>
/// <param name="thingMessageProducer">The <see cref="IBackgroundThingsMessageProducer"/></param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/></param>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task Send(IEnumerable<Thing> changedThings, [FromServices] IBackgroundThingsMessageProducer thingMessageProducer)
{
var message = new ThingChangedMessage()
{
ChangedThings = changedThings.ToList()
};
await thingsMessageProducer.EnqueueAsync(message);
}
/// <summary>
/// Add observable that yield messages of type <see cref="string"> when ever there is any available</see>
/// </summary>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task AddListener([FromServices] IMessageClientService messageQueueClient)
{
IObservable<string> observable = await messageQueueClient.Listen<string>("QueueName", ExchangeType.Direct);
observable.Subscribe(x => Console.WriteLine(x));
}
When using the following method
Task<IObservable<TMessage>> Listen<TMessage>(string queueName, ExchangeType exchangeType = ExchangeType.Default, CancellationToken cancellationToken = default) where TMessage : class;
as in the example above the implementation takes care of opening/getting a connection, opening/using a channel, ensuring the proper exchange is declared depending on the provided string queue
and ExchangeType exchangeType
and lastly de-serializing the messages.
Below is an example of using the other method to receive messages
Task<IDisposable> AddListener(string queueName, EventHandler<BasicDeliverEventArgs> onReceive, ExchangeType exchangeType = ExchangeType.Default, CancellationToken cancellationToken = default);
This one, does the same as the previous one except that it does not take care of de-serializing the message. Meaning that the whole message envelope can be use for inspection or custom de-serialization.
```CSharp
/// <summary>
/// Adds a listener and assign an event handler when ever there is any message available</see>
/// </summary>
/// <returns>An asynchronous task representing the operation.</returns>
public async Task AddListener([FromServices] IMessageClientService messageQueueClient)
{
IDisposable channel = await messageQueue.AddListener("QueueName", (_, message ) => this.MessageReceived(message));
}
private void MessageReceived(BasicDeliverEventArgs message)
{
string message = Encoding.UTF8.GetString(envelope.Body.ToArray()); // Convert the body as byte array to string
Console.WriteLine($"Received message: {message}");
}
Note when disposing of the channel initialized in the above example, the connection and the channel are closed and disposed and the even handler will not be called anymore.