Dispatchers - BisocM/CQRSharp GitHub Wiki

Introduction

The Dispatcher serves as the central component of the library, managing the dispatching and queuing of commands as well as invoking their associated handlers. It implements the IDispatcher interface and follows the singleton design pattern.

Among the typical request Dispatcher/IDispatcher, there is a notification system fully supported by the library. Please refer to other sections of the wiki to learn more about notification systems.


Dependency Injection (DI) Integration

The Dispatcher is registered in the DI container via the AddCqrs method as follows:

services.AddCqrs(options =>
{
    // Configure the run mode (Sync or Async)
    options.RunMode = RunMode.Sync;
});

To utilize the Dispatcher within your classes, inject it through constructor dependency injection. It is important to avoid resolving the Dispatcher from the root IServiceProvider directly and to carefully consider the lifetime of your services:

public class ExampleClass(IDispatcher dispatcher)
{
    public async Task DoSomething()
    {
        var exampleCommand = new ExampleCommand()
        {
            exampleData = "EXAMPLE"
        };

        await dispatcher.ExecuteCommand(exampleCommand, CancellationToken.None);
    }
}

Runmodes

The Dispatcher supports two primary run modes:

1. Synchronous Mode

In synchronous mode, the Dispatcher completely blocks the current thread until command execution is finished. Execution is awaited inline, ensuring the Dispatcher resumes only after the command handling concludes:

CommandResult commandResult = await dispatcher.ExecuteCommand(myCommand);

2. Asynchronous Mode

In asynchronous mode, requests are handed off to a background execution queue (BackgroundTaskQueue). This enables non-blocking operation, improving scalability and responsiveness.

Retrieving Results in Asynchronous Mode

When using asynchronous mode, two primary mechanisms exist to retrieve command results:

TaskCompletionSource (TCS)

The ExecuteCommand method internally creates a TaskCompletionSource<CommandResult> as a placeholder for the result of the asynchronous execution. The caller receives a task that completes once the command execution concludes, allowing for standard asynchronous handling:

CommandResult commandResult = await dispatcher.ExecuteCommand(myCommand);

Background Task Queue (Internal)

Note: The Background Task Queue powers asynchronous run‑mode under the hood. You do not need to interact with it directly—simply configure your Dispatcher via AddCqrs, and the queue will be used automatically when RunMode.Async is selected.

Configuration via DispatcherOptions

When you call:

services.AddCqrs(opts =>
{
    opts.RunMode = RunMode.Async;
});

the library registers a BackgroundTaskQueue and its consumer(s) for you. You can further customize the queue behavior by configuring BackgroundTaskQueueOptions in DI:

services.Configure<BackgroundTaskQueueOptions>(opts =>
{
    // Bound the queue to 500 items; full queue will drop oldest entries.
    opts.Capacity = 500;
    opts.FullMode = BoundedChannelFullMode.DropOldest;

    // Control how many work items each consumer processes per batch.
    opts.DequeueBatchSize = 50;

    // How many notifications to buffer before applying back‑pressure.
    opts.CallbackChannelCapacity = 128;
});
Option Description Default
Capacity Maximum number of pending tasks. ≤ 0unbounded. 0
FullMode Policy when capacity is reached:
Wait ⇒ block until space is available
DropOldest ⇒ remove the oldest item
DropNewest/DropWrite ⇒ drop the incoming item immediately
Wait
DequeueBatchSize Number of items each consumer pulls per loop. ≤ 0 ⇒ drain until empty. 0
CallbackChannelCapacity Maximum number of notifications buffered in the internal notification channel before enqueue callers are back‑pressured or dropped. 1024
How It Works
  1. Enqueuing

    • In Async mode, Dispatcher.ExecuteCommand(...) calls BackgroundTaskQueue.QueueBackgroundWorkItemAsync(...).
    • Each work item is wrapped with a monotonically increasing sequence number.
    • If you’ve set a bounded queue and it’s full, the configured FullMode is applied:
      • Wait — the caller awaits until space frees up.
      • DropOldest — the oldest item is removed before enqueuing yours.
      • DropNewest/DropWrite — your new item is rejected immediately.
  2. Decoupled Notifications

    • Rather than publishing metrics or “task enqueued/rejected” events inline, the queue writes notification objects to a separate, bounded Channel<INotification>.
    • A dedicated background loop (ProcessNotificationsAsync) reads from this channel and dispatches each notification via INotificationDispatcher.Publish().
    • That loop applies retry logic (e.g. up to 3 attempts with a 2 s back‑off) and logs both warnings and permanent failures—ensuring the enqueue path stays minimal and non‑blocking.
  3. Consumption

    • A hosted service (BackgroundTaskQueueConsumer) spins up one or more consumers (default: per logical processor).
    • Each consumer:
      1. Waits until there’s at least one queued item.
      2. Drains up to DequeueBatchSize tasks.
      3. Uses a SemaphoreSlim to cap concurrency at ConsumerCount (default = processor count).
      4. Executes each delegate on the thread‑pool and logs any exceptions without crashing the loop.

When to Tweak These Settings

  • High‑throughput scenarios:
    Use a bounded queue with DropOldest to shed excess load gracefully, and lower the notification buffer if you want tighter feedback on when you’re saturating your notification channel.
  • Reliability concerns:
    The built‑in retry/back‑off in the notification loop helps surface transient publish errors without slowing your enqueue path.
  • Observability & Debugging:
    Tap into the TaskEnqueuedNotification and TaskRejectedNotification to feed metrics into Prometheus, Application Insights, or any other telemetry sink.

NotificationDispatcher

Every dispatched command or query emits notifications upon initiation and completion. Commands specifically emit four notifications:

  • QueryInitiatedNotification<TResult>
  • QueryCompletedNotification<TResult>
  • CommandInitiatedNotification
  • CommandCompletedNotification

You can subscribe to these notifications using the notification dispatcher pattern:

public class OnCommandCompletedHandler : INotificationHandler<CommandCompletedNotification>
{
    public Task Handle(CommandCompletedNotification notification, CancellationToken cancellationToken)
    {
        // Handle the command completion notification
        Console.WriteLine($"{notification.CommandName} finished with result: {notification.Result}");
        return Task.CompletedTask;
    }
}

⚠️ **GitHub.com Fallback** ⚠️