Pipeline Behaviors - arcanic-kit/Mediator GitHub Wiki

Pipeline behaviors in Arcanic Mediator provide a powerful way to implement cross-cutting concerns that execute around your message handlers. They follow the decorator pattern and allow you to add functionality like logging, validation, caching, authorization, and more without cluttering your business logic.

Table of Contents

Overview

Pipeline behaviors wrap around your message handlers and execute in a specific order. They can:

  • Execute code before the handler (validation, authorization)
  • Execute code after the handler (logging, notifications)
  • Wrap the entire execution (transactions, error handling)
  • Modify the request or response (transformation, enrichment)
  • Short-circuit execution (caching, authorization failures)

Pipeline Types

Arcanic Mediator provides five types of pipeline behaviors, each optimized for different scenarios:

Pipeline Type Interface Scope Best For
Generic IPipelineBehavior<TMessage, TResponse> All message types Global concerns (metrics, correlation)
Request IRequestPipelineBehavior<TRequest, TResponse> Commands + Queries Shared read/write concerns
Command ICommandPipelineBehavior<TCommand, TResponse> Commands only Write-specific concerns (transactions)
Query IQueryPipelineBehavior<TQuery, TResponse> Queries only Read-specific concerns (caching)
Event IEventPipelineBehavior<TEvent, TResponse> Events only Event-specific concerns (audit, retry)

When to Use Each Type

Generic Pipeline

Use for concerns that apply to all message types including events:

  • Global error handling
  • Correlation ID tracking
  • Performance metrics
  • Security headers
  • Global logging

Request Pipeline

Use for concerns that apply to commands and queries but not events:

  • Request/response logging
  • Input validation
  • User context setup
  • Request enrichment

Command Pipeline

Use for write operation specific concerns:

  • Database transactions
  • Authorization for modifications
  • Business rule validation
  • Optimistic concurrency
  • Domain event coordination

Query Pipeline

Use for read operation specific concerns:

  • Response caching
  • Query performance monitoring
  • Read-only authorization
  • Result transformation
  • Query optimization

Event Pipeline

Use for domain event specific concerns:

  • Event audit logging
  • Retry logic for failed events
  • Event sourcing
  • Cross-service notifications
  • Event versioning

Implementation Examples

Generic Pipeline Examples

Global Metrics Collection

using Arcanic.Mediator.Abstractions.Pipeline;

public class GlobalMetricsPipelineBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
    where TMessage : notnull
{
    private readonly IMetricsCollector _metricsCollector;
    private readonly ILogger<GlobalMetricsPipelineBehavior<TMessage, TResponse>> _logger;

    public GlobalMetricsPipelineBehavior(
        IMetricsCollector metricsCollector,
        ILogger<GlobalMetricsPipelineBehavior<TMessage, TResponse>> logger)
    {
        _metricsCollector = metricsCollector;
        _logger = logger;
    }

    public async Task<TResponse> HandleAsync(TMessage message, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        var messageName = typeof(TMessage).Name;
        var correlationId = Guid.NewGuid();
        var stopwatch = System.Diagnostics.Stopwatch.StartNew();

        using var activity = _metricsCollector.StartActivity(messageName);
        
        _logger.LogDebug("Processing {MessageName} with correlation ID {CorrelationId}", 
            messageName, correlationId);

        try
        {
            var result = await next(cancellationToken);
            
            stopwatch.Stop();
            _metricsCollector.RecordExecution(messageName, stopwatch.Elapsed, success: true);
            
            _logger.LogDebug("Completed {MessageName} in {ElapsedMs}ms", 
                messageName, stopwatch.ElapsedMilliseconds);
            
            return result;
        }
        catch (Exception ex)
        {
            stopwatch.Stop();
            _metricsCollector.RecordExecution(messageName, stopwatch.Elapsed, success: false);
            
            _logger.LogError(ex, "Failed {MessageName} after {ElapsedMs}ms", 
                messageName, stopwatch.ElapsedMilliseconds);
            
            throw;
        }
    }
}

Global Error Handling

public class GlobalExceptionPipelineBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>
    where TMessage : notnull
{
    private readonly ILogger<GlobalExceptionPipelineBehavior<TMessage, TResponse>> _logger;

    public GlobalExceptionPipelineBehavior(ILogger<GlobalExceptionPipelineBehavior<TMessage, TResponse>> logger)
    {
        _logger = logger;
    }

    public async Task<TResponse> HandleAsync(TMessage message, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        try
        {
            return await next(cancellationToken);
        }
        catch (Exception ex)
        {
            var messageName = typeof(TMessage).Name;
            
            _logger.LogError(ex, 
                "Unhandled exception in {MessageName}. Message: {@Message}",
                messageName, message);

            // Could implement circuit breaker, dead letter queue, etc.
            throw;
        }
    }
}

Request Pipeline Examples

Request Validation

using Arcanic.Mediator.Request.Abstractions;
using Arcanic.Mediator.Request.Abstractions.Pipeline;
using FluentValidation;

public class ValidationRequestPipelineBehavior<TRequest, TResponse> : IRequestPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest
{
    private readonly IEnumerable<IValidator<TRequest>> _validators;

    public ValidationRequestPipelineBehavior(IEnumerable<IValidator<TRequest>> validators)
    {
        _validators = validators;
    }

    public async Task<TResponse> HandleAsync(TRequest request, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        if (_validators.Any())
        {
            var context = new ValidationContext<TRequest>(request);
            
            var validationResults = await Task.WhenAll(
                _validators.Select(v => v.ValidateAsync(context, cancellationToken)));
            
            var failures = validationResults
                .SelectMany(r => r.Errors)
                .Where(f => f != null)
                .ToList();

            if (failures.Any())
            {
                throw new ValidationException(failures);
            }
        }

        return await next(cancellationToken);
    }
}

Command Pipeline Examples

Transaction Management

using Arcanic.Mediator.Command.Abstractions;
using Arcanic.Mediator.Command.Abstractions.Pipeline;
using Microsoft.EntityFrameworkCore;

public class TransactionCommandPipelineBehavior<TCommand, TResponse> : ICommandPipelineBehavior<TCommand, TResponse>
    where TCommand : ICommand
{
    private readonly ApplicationDbContext _context;
    private readonly ILogger<TransactionCommandPipelineBehavior<TCommand, TResponse>> _logger;

    public TransactionCommandPipelineBehavior(
        ApplicationDbContext context,
        ILogger<TransactionCommandPipelineBehavior<TCommand, TResponse>> logger)
    {
        _context = context;
        _logger = logger;
    }

    public async Task<TResponse> HandleAsync(TCommand command, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        var commandName = typeof(TCommand).Name;
        
        if (_context.Database.CurrentTransaction != null)
        {
            // Already in transaction, continue
            return await next(cancellationToken);
        }

        _logger.LogInformation("Starting transaction for {CommandName}", commandName);

        using var transaction = await _context.Database.BeginTransactionAsync(cancellationToken);
        
        try
        {
            var result = await next(cancellationToken);
            
            await transaction.CommitAsync(cancellationToken);
            _logger.LogInformation("Transaction committed for {CommandName}", commandName);
            
            return result;
        }
        catch (Exception ex)
        {
            await transaction.RollbackAsync(cancellationToken);
            _logger.LogError(ex, "Transaction rolled back for {CommandName}", commandName);
            throw;
        }
    }
}

Command Authorization

using Microsoft.AspNetCore.Authorization;
using System.Security.Claims;

public class AuthorizationCommandPipelineBehavior<TCommand, TResponse> : ICommandPipelineBehavior<TCommand, TResponse>
    where TCommand : ICommand
{
    private readonly ICurrentUser _currentUser;
    private readonly IAuthorizationService _authorizationService;

    public AuthorizationCommandPipelineBehavior(ICurrentUser currentUser, IAuthorizationService authorizationService)
    {
        _currentUser = currentUser;
        _authorizationService = authorizationService;
    }

    public async Task<TResponse> HandleAsync(TCommand command, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        var requiresAuth = typeof(TCommand).GetCustomAttributes(typeof(AuthorizeAttribute), true).Any();
        
        if (requiresAuth)
        {
            var authorizationResult = await _authorizationService.AuthorizeAsync(
                _currentUser.User, command, typeof(TCommand).Name);

            if (!authorizationResult.Succeeded)
            {
                throw new UnauthorizedAccessException($"User not authorized to execute {typeof(TCommand).Name}");
            }
        }

        return await next(cancellationToken);
    }
}

Query Pipeline Examples

Response Caching

using Arcanic.Mediator.Query.Abstractions;
using Arcanic.Mediator.Query.Abstractions.Pipeline;
using Microsoft.Extensions.Caching.Memory;

public class CachingQueryPipelineBehavior<TQuery, TResponse> : IQueryPipelineBehavior<TQuery, TResponse>
    where TQuery : IQuery<TResponse>
{
    private readonly IMemoryCache _cache;
    private readonly ILogger<CachingQueryPipelineBehavior<TQuery, TResponse>> _logger;

    public CachingQueryPipelineBehavior(IMemoryCache cache, ILogger<CachingQueryPipelineBehavior<TQuery, TResponse>> logger)
    {
        _cache = cache;
        _logger = logger;
    }

    public async Task<TResponse> HandleAsync(TQuery query, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        // Only cache queries that implement ICacheable
        if (query is not ICacheable cacheableQuery)
        {
            return await next(cancellationToken);
        }

        var cacheKey = cacheableQuery.CacheKey;
        
        if (_cache.TryGetValue(cacheKey, out TResponse cachedResult))
        {
            _logger.LogInformation("Cache hit for {QueryName}: {CacheKey}", typeof(TQuery).Name, cacheKey);
            return cachedResult;
        }

        _logger.LogInformation("Cache miss for {QueryName}: {CacheKey}", typeof(TQuery).Name, cacheKey);

        var result = await next(cancellationToken);
        
        var cacheOptions = new MemoryCacheEntryOptions
        {
            AbsoluteExpirationRelativeToNow = cacheableQuery.CacheDuration,
            Priority = CacheItemPriority.Normal
        };

        _cache.Set(cacheKey, result, cacheOptions);
        
        return result;
    }
}

public interface ICacheable
{
    string CacheKey { get; }
    TimeSpan CacheDuration { get; }
}

Query Performance Monitoring

public class PerformanceQueryPipelineBehavior<TQuery, TResponse> : IQueryPipelineBehavior<TQuery, TResponse>
    where TQuery : IQuery<TResponse>
{
    private readonly ILogger<PerformanceQueryPipelineBehavior<TQuery, TResponse>> _logger;
    private readonly IMetricsCollector _metrics;

    public PerformanceQueryPipelineBehavior(
        ILogger<PerformanceQueryPipelineBehavior<TQuery, TResponse>> logger,
        IMetricsCollector metrics)
    {
        _logger = logger;
        _metrics = metrics;
    }

    public async Task<TResponse> HandleAsync(TQuery query, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        var stopwatch = System.Diagnostics.Stopwatch.StartNew();
        var queryName = typeof(TQuery).Name;

        try
        {
            var result = await next(cancellationToken);
            
            stopwatch.Stop();
            var elapsed = stopwatch.ElapsedMilliseconds;

            // Record metrics
            _metrics.RecordQueryExecution(queryName, elapsed);
            
            // Log slow queries
            if (elapsed > 1000)
            {
                _logger.LogWarning("Slow query detected: {QueryName} took {ElapsedMs}ms. Query: {@Query}", 
                    queryName, elapsed, query);
            }
            else
            {
                _logger.LogDebug("Query {QueryName} completed in {ElapsedMs}ms", queryName, elapsed);
            }

            return result;
        }
        catch (Exception ex)
        {
            stopwatch.Stop();
            _metrics.RecordQueryFailure(queryName, stopwatch.ElapsedMilliseconds);
            
            _logger.LogError(ex, "Query {QueryName} failed after {ElapsedMs}ms", 
                queryName, stopwatch.ElapsedMilliseconds);
            throw;
        }
    }
}

Event Pipeline Examples

Event Audit Logging

using Arcanic.Mediator.Event.Abstractions;
using Arcanic.Mediator.Event.Abstractions.Pipeline;

public class AuditEventPipelineBehavior<TEvent, TResponse> : IEventPipelineBehavior<TEvent, TResponse>
    where TEvent : IEvent
{
    private readonly IAuditService _auditService;
    private readonly ICurrentUser _currentUser;
    private readonly ILogger<AuditEventPipelineBehavior<TEvent, TResponse>> _logger;

    public AuditEventPipelineBehavior(
        IAuditService auditService,
        ICurrentUser currentUser,
        ILogger<AuditEventPipelineBehavior<TEvent, TResponse>> logger)
    {
        _auditService = auditService;
        _currentUser = currentUser;
        _logger = logger;
    }

    public async Task<TResponse> HandleAsync(TEvent @event, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        var eventName = typeof(TEvent).Name;
        var correlationId = Guid.NewGuid();
        var userId = _currentUser.UserId;

        // Create audit entry before processing
        var auditEntry = new EventAuditEntry
        {
            EventName = eventName,
            EventData = JsonSerializer.Serialize(@event),
            UserId = userId,
            CorrelationId = correlationId,
            Timestamp = DateTimeOffset.UtcNow,
            Status = "Processing"
        };

        await _auditService.LogEventAsync(auditEntry, cancellationToken);
        
        _logger.LogInformation("Processing event {EventName} with correlation ID {CorrelationId}", 
            eventName, correlationId);

        try
        {
            var result = await next(cancellationToken);
            
            // Update audit entry with success
            auditEntry.Status = "Completed";
            auditEntry.CompletedAt = DateTimeOffset.UtcNow;
            await _auditService.UpdateEventAuditAsync(auditEntry, cancellationToken);
            
            return result;
        }
        catch (Exception ex)
        {
            // Update audit entry with failure
            auditEntry.Status = "Failed";
            auditEntry.Error = ex.Message;
            auditEntry.CompletedAt = DateTimeOffset.UtcNow;
            await _auditService.UpdateEventAuditAsync(auditEntry, cancellationToken);
            
            _logger.LogError(ex, "Failed to process event {EventName} with correlation ID {CorrelationId}", 
                eventName, correlationId);
            throw;
        }
    }
}

Event Retry Logic

using Polly;
using Polly.Extensions.Http;

public class RetryEventPipelineBehavior<TEvent, TResponse> : IEventPipelineBehavior<TEvent, TResponse>
    where TEvent : IEvent
{
    private readonly ILogger<RetryEventPipelineBehavior<TEvent, TResponse>> _logger;
    private readonly IAsyncPolicy _retryPolicy;

    public RetryEventPipelineBehavior(ILogger<RetryEventPipelineBehavior<TEvent, TResponse>> logger)
    {
        _logger = logger;
        _retryPolicy = Policy
            .Handle<HttpRequestException>()
            .Or<TaskCanceledException>()
            .Or<SocketException>()
            .WaitAndRetryAsync(
                retryCount: 3,
                sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                onRetry: (outcome, timespan, retryCount, context) =>
                {
                    _logger.LogWarning("Retry {RetryCount} for {EventName} in {DelaySeconds}s. Exception: {Exception}",
                        retryCount, typeof(TEvent).Name, timespan.TotalSeconds, outcome.Exception?.Message);
                });
    }

    public async Task<TResponse> HandleAsync(TEvent @event, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        return await _retryPolicy.ExecuteAsync(async () =>
        {
            return await next(cancellationToken);
        });
    }
}

Registration and Configuration

Basic Registration

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddArcanicMediator()
    // Generic pipelines - execute for ALL message types
    .AddPipelineBehavior(typeof(GlobalMetricsPipelineBehavior<,>))
    .AddPipelineBehavior(typeof(GlobalExceptionPipelineBehavior<,>))
    
    // Request pipelines - execute for commands and queries
    .AddRequestPipelineBehavior(typeof(ValidationRequestPipelineBehavior<,>))
    .AddRequestPipelineBehavior(typeof(LoggingRequestPipelineBehavior<,>))
    
    // Command-specific pipelines
    .AddCommandPipelineBehavior(typeof(AuthorizationCommandPipelineBehavior<,>))
    .AddCommandPipelineBehavior(typeof(TransactionCommandPipelineBehavior<,>))
    
    // Query-specific pipelines  
    .AddQueryPipelineBehavior(typeof(CachingQueryPipelineBehavior<,>))
    .AddQueryPipelineBehavior(typeof(PerformanceQueryPipelineBehavior<,>))
    
    // Event-specific pipelines
    .AddEventPipelineBehavior(typeof(AuditEventPipelineBehavior<,>))
    .AddEventPipelineBehavior(typeof(RetryEventPipelineBehavior<,>))
    
    // Register modules
    .AddCommands(Assembly.GetExecutingAssembly())
    .AddQueries(Assembly.GetExecutingAssembly())
    .AddEvents(Assembly.GetExecutingAssembly());

Conditional Registration

builder.Services.AddArcanicMediator()
    .AddCommands(Assembly.GetExecutingAssembly())
    .AddQueries(Assembly.GetExecutingAssembly());

// Only add caching in production
if (builder.Environment.IsProduction())
{
    builder.Services
        .AddMemoryCache()
        .AddArcanicMediatorQueryPipelineBehavior(typeof(CachingQueryPipelineBehavior<,>));
}

// Only add transaction support if using Entity Framework
if (builder.Configuration.GetConnectionString("DefaultConnection") != null)
{
    builder.Services
        .AddDbContext<ApplicationDbContext>()
        .AddArcanicMediatorCommandPipelineBehavior(typeof(TransactionCommandPipelineBehavior<,>));
}

Pipeline Configuration with Options

// Configure pipeline with options
builder.Services.Configure<CachingOptions>(options =>
{
    options.DefaultCacheDuration = TimeSpan.FromMinutes(5);
    options.MaxCacheSize = 1000;
});

builder.Services.Configure<RetryOptions>(options =>
{
    options.MaxRetries = 3;
    options.BaseDelay = TimeSpan.FromSeconds(1);
});

Execution Order

Pipeline behaviors execute in a nested fashion, with the order determined by registration sequence:

Execution Flow

1. Generic Pipeline (Metrics) - Start
  2. Generic Pipeline (Exception Handling) - Start
    3. Request Pipeline (Validation) - Start  
      4. Command Pipeline (Authorization) - Start
        5. Command Pipeline (Transaction) - Start
          6. Pre-Handler (if any)
            7. Main Handler - Execute business logic
          8. Post-Handler (if any)
        9. Command Pipeline (Transaction) - End
      10. Command Pipeline (Authorization) - End  
    11. Request Pipeline (Validation) - End
  12. Generic Pipeline (Exception Handling) - End
13. Generic Pipeline (Metrics) - End

Visual Representation

┌─ Global Metrics ────────────────────────────────────────┐
│ ┌─ Global Exception Handling ─────────────────────────┐ │
│ │ ┌─ Request Validation ──────────────────────────────┐ │ │
│ │ │ ┌─ Command Authorization ─────────────────────────┐ │ │ │
│ │ │ │ ┌─ Command Transaction ─────────────────────────┐ │ │ │ │
│ │ │ │ │ ┌─ Pre-handlers ──────────────────────────────┐ │ │ │ │ │
│ │ │ │ │ │     🎯 MAIN HANDLER (Business Logic)       │ │ │ │ │ │
│ │ │ │ │ └─ Post-handlers ─────────────────────────────┘ │ │ │ │ │
│ │ │ │ └───────────────────────────────────────────────┘ │ │ │ │
│ │ │ └─────────────────────────────────────────────────┘ │ │ │
│ │ └───────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────┘

Best Practices

1. Keep Behaviors Focused

Each pipeline behavior should have a single responsibility:

// ✅ Good - Single responsibility
public class ValidationPipelineBehavior<TRequest, TResponse> : IRequestPipelineBehavior<TRequest, TResponse>
{
    // Only handles validation
}

// ❌ Bad - Multiple responsibilities  
public class ValidationAndLoggingPipelineBehavior<TRequest, TResponse> : IRequestPipelineBehavior<TRequest, TResponse>
{
    // Handles both validation AND logging
}

2. Use Appropriate Pipeline Types

Choose the most specific pipeline type for your use case:

// ✅ Good - Use command pipeline for transaction management
public class TransactionCommandPipelineBehavior<TCommand, TResponse> : ICommandPipelineBehavior<TCommand, TResponse>

// ❌ Bad - Using generic pipeline when command-specific would be better
public class TransactionPipelineBehavior<TMessage, TResponse> : IPipelineBehavior<TMessage, TResponse>

3. Handle Exceptions Properly

Always consider exception handling in your behaviors:

public async Task<TResponse> HandleAsync(TCommand command, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
{
    try
    {
        // Setup logic
        var result = await next(cancellationToken);
        // Success logic
        return result;
    }
    catch (Exception ex)
    {
        // Cleanup logic
        throw; // Re-throw to maintain exception flow
    }
}

4. Use Dependency Injection

Leverage DI for testability and flexibility:

public class CachingPipelineBehavior<TQuery, TResponse> : IQueryPipelineBehavior<TQuery, TResponse>
{
    private readonly IDistributedCache _cache; // Injected dependency
    private readonly ILogger<CachingPipelineBehavior<TQuery, TResponse>> _logger;
    private readonly IOptions<CachingOptions> _options;
    
    // Constructor injection
}

5. Consider Performance

Pipeline behaviors add overhead, so optimize for frequently used paths:

public async Task<TResponse> HandleAsync(TQuery query, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
{
    // Quick exit if caching not applicable
    if (query is not ICacheable cacheableQuery)
    {
        return await next(cancellationToken);
    }
    
    // Expensive caching logic only when needed
}

6. Make Behaviors Configurable

Use configuration to control behavior:

public class LoggingPipelineBehavior<TRequest, TResponse> : IRequestPipelineBehavior<TRequest, TResponse>
{
    private readonly IOptions<LoggingOptions> _options;
    
    public async Task<TResponse> HandleAsync(TRequest request, PipelineDelegate<TResponse> next, CancellationToken cancellationToken = default)
    {
        if (!_options.Value.EnableRequestLogging)
        {
            return await next(cancellationToken);
        }
        
        // Logging logic
    }
}

Next Steps

Related Documentation

⚠️ **GitHub.com Fallback** ⚠️