Middleware Pipeline - PogovorovDaniil/Requestum GitHub Wiki
Middleware provides a powerful way to add cross-cutting concerns to your request processing pipeline. This guide explains how the middleware pipeline works in Requestum.
Middleware in Requestum allows you to intercept and process requests before they reach handlers. Think of it as a chain of responsibility pattern where each middleware can:
- Execute code before the handler
- Execute code after the handler
- Modify the request
- Modify the response
- Short-circuit the pipeline
- Handle exceptions
Client Request
↓
Middleware 1 (Before)
↓
Middleware 2 (Before)
↓
Middleware 3 (Before)
↓
Handler Execution
↓
Middleware 3 (After)
↓
Middleware 2 (After)
↓
Middleware 1 (After)
↓
Response to Client
Request: CreateUserCommand
Middlewares: [Logging, Validation, Transaction]
Handler: CreateUserCommandHandler
┌─────────────────────────────────────┐
│ IRequestum.ExecuteAsync │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Logging Middleware (Before) │
│ Log: "Executing CreateUserCommand" │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Validation Middleware (Before) │
│ Validate: Name, Email are valid │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Transaction Middleware (Before) │
│ Begin Database Transaction │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ CreateUserCommandHandler.Execute │
│ Create user in database │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Transaction Middleware (After) │
│ Commit Database Transaction │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Validation Middleware (After) │
│ No action needed │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Logging Middleware (After) │
│ Log: "Completed CreateUserCommand" │
└─────────────────────────────────────┘
↓
Response
Requestum supports two types of middleware:
public interface IRequestMiddleware<TRequest, TResponse>
{
TResponse Invoke(TRequest request, RequestNextDelegate<TRequest, TResponse> next);
}public interface IAsyncRequestMiddleware<TRequest, TResponse>
{
Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken = default);
}The next delegate is crucial for middleware execution:
For synchronous middleware:
public readonly struct RequestNextDelegate<TRequest, TResponse>
{
// Invokes the next middleware or handler in the pipeline
public TResponse Invoke(TRequest request);
}Usage:
public class LoggingMiddleware<TRequest, TResponse> : IRequestMiddleware<TRequest, TResponse>
{
public TResponse Invoke(TRequest request, RequestNextDelegate<TRequest, TResponse> next)
{
Console.WriteLine("Before handler");
// Call next middleware or handler
var response = next.Invoke(request);
Console.WriteLine("After handler");
return response;
}
}For asynchronous middleware:
public readonly struct AsyncRequestNextDelegate<TRequest, TResponse>
{
// Asynchronously invokes the next middleware or handler
public Task<TResponse> InvokeAsync(TRequest request);
}Usage:
public class LoggingMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
Console.WriteLine("Before handler");
// Call next middleware or handler
var response = await next.InvokeAsync(request);
Console.WriteLine("After handler");
return response;
}
}public class LoggingMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly ILogger<LoggingMiddleware<TRequest, TResponse>> _logger;
public LoggingMiddleware(ILogger<LoggingMiddleware<TRequest, TResponse>> logger)
{
_logger = logger;
}
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
var requestType = typeof(TRequest).Name;
var stopwatch = Stopwatch.StartNew();
_logger.LogInformation("Executing {RequestType}", requestType);
try
{
var response = await next.InvokeAsync(request);
stopwatch.Stop();
_logger.LogInformation(
"Completed {RequestType} in {ElapsedMs}ms",
requestType,
stopwatch.ElapsedMilliseconds);
return response;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(
ex,
"Failed {RequestType} after {ElapsedMs}ms",
requestType,
stopwatch.ElapsedMilliseconds);
throw;
}
}
}public class ValidationMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly IValidator<TRequest>? _validator;
public ValidationMiddleware(IValidator<TRequest>? validator = null)
{
_validator = validator;
}
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
// Only validate if validator is registered
if (_validator != null)
{
var validationResult = await _validator.ValidateAsync(request, ct);
if (!validationResult.IsValid)
{
throw new ValidationException(validationResult.Errors);
}
}
return await next.InvokeAsync(request);
}
}public class ExceptionHandlerMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly ILogger<ExceptionHandlerMiddleware<TRequest, TResponse>> _logger;
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
try
{
return await next.InvokeAsync(request);
}
catch (ValidationException ex)
{
_logger.LogWarning(ex, "Validation failed for {RequestType}", typeof(TRequest).Name);
throw; // Re-throw validation exceptions
}
catch (NotFoundException ex)
{
_logger.LogWarning(ex, "Resource not found for {RequestType}", typeof(TRequest).Name);
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Unhandled exception in {RequestType}", typeof(TRequest).Name);
throw new ApplicationException("An error occurred processing your request", ex);
}
}
}public class TransactionMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly IDbContext _dbContext;
public TransactionMiddleware(IDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
// Only use transactions for commands
if (request is not ICommand)
{
return await next.InvokeAsync(request);
}
await using var transaction = await _dbContext.BeginTransactionAsync(ct);
try
{
var response = await next.InvokeAsync(request);
await transaction.CommitAsync(ct);
return response;
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
}public class CachingMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly IMemoryCache _cache;
public CachingMiddleware(IMemoryCache cache)
{
_cache = cache;
}
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
// Only cache queries
if (request is not IQuery<TResponse>)
{
return await next.InvokeAsync(request);
}
var cacheKey = $"{typeof(TRequest).Name}:{request.GetHashCode()}";
// Try get from cache
if (_cache.TryGetValue<TResponse>(cacheKey, out var cachedResponse))
{
return cachedResponse!;
}
// Execute and cache
var response = await next.InvokeAsync(request);
_cache.Set(cacheKey, response, TimeSpan.FromMinutes(5));
return response;
}
}public class PerformanceMonitoringMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly ILogger<PerformanceMonitoringMiddleware<TRequest, TResponse>> _logger;
private readonly IMetricsCollector _metrics;
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
var stopwatch = Stopwatch.StartNew();
var requestType = typeof(TRequest).Name;
try
{
var response = await next.InvokeAsync(request);
stopwatch.Stop();
var elapsed = stopwatch.ElapsedMilliseconds;
// Log slow requests
if (elapsed > 1000)
{
_logger.LogWarning(
"Slow request detected: {RequestType} took {ElapsedMs}ms",
requestType,
elapsed);
}
// Collect metrics
_metrics.RecordRequestDuration(requestType, elapsed);
_metrics.IncrementRequestCount(requestType, "success");
return response;
}
catch (Exception)
{
stopwatch.Stop();
_metrics.IncrementRequestCount(requestType, "failure");
throw;
}
}
}public class AuthorizationMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
private readonly ICurrentUserService _currentUser;
private readonly IAuthorizationService _authorizationService;
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
// Check if request requires authorization
var authAttribute = typeof(TRequest).GetCustomAttribute<AuthorizeAttribute>();
if (authAttribute != null)
{
var user = _currentUser.User;
if (user == null)
{
throw new UnauthorizedException("User is not authenticated");
}
if (!string.IsNullOrEmpty(authAttribute.Roles))
{
var requiredRoles = authAttribute.Roles.Split(',');
if (!requiredRoles.Any(role => user.IsInRole(role)))
{
throw new ForbiddenException("User does not have required role");
}
}
}
return await next.InvokeAsync(request);
}
}
// Usage with attribute
[Authorize(Roles = "Admin")]
public record DeleteUserCommand(int UserId) : ICommand;Execute middleware only for specific request types:
public class CommandTransactionMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
// Only apply to commands
if (request is ICommand)
{
// Transaction logic
await using var transaction = await _dbContext.BeginTransactionAsync(ct);
try
{
var response = await next.InvokeAsync(request);
await transaction.CommitAsync(ct);
return response;
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
// Skip middleware for non-commands
return await next.InvokeAsync(request);
}
}Middleware can stop the pipeline and return early:
public class CachingMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
var cacheKey = GetCacheKey(request);
// Check cache
if (_cache.TryGetValue<TResponse>(cacheKey, out var cachedValue))
{
// Short-circuit: return cached value without calling next
return cachedValue!;
}
// Cache miss: continue pipeline
var response = await next.InvokeAsync(request);
_cache.Set(cacheKey, response);
return response;
}
}// ✅ Good - single responsibility
public class LoggingMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
public async Task<TResponse> InvokeAsync(...)
{
// Only logging
_logger.LogInformation("Executing {RequestType}", typeof(TRequest).Name);
var response = await next.InvokeAsync(request);
_logger.LogInformation("Completed {RequestType}", typeof(TRequest).Name);
return response;
}
}
// ❌ Bad - multiple responsibilities
public class EverythingMiddleware<TRequest, TResponse>
: IAsyncRequestMiddleware<TRequest, TResponse>
{
public async Task<TResponse> InvokeAsync(...)
{
// Logging
_logger.LogInformation("...");
// Validation
Validate(request);
// Caching
if (_cache.TryGetValue(...)) return cached;
// Transaction
await using var transaction = ...;
// Too many responsibilities!
}
}// ✅ Good - always calls next (or returns early with valid reason)
public async Task<TResponse> InvokeAsync(
TRequest request,
AsyncRequestNextDelegate<TRequest, TResponse> next,
CancellationToken ct = default)
{
// Do work before
DoSomething();
// Call next
var response = await next.InvokeAsync(request);
// Do work after
DoSomethingElse();
return response;
}
// ❌ Bad - forgets to call next
public async Task<TResponse> InvokeAsync(...)
{
DoSomething();
// Forgot to call next!
return default(TResponse); // Pipeline broken!
}// ✅ Good - catches, logs, and re-throws
public async Task<TResponse> InvokeAsync(...)
{
try
{
return await next.InvokeAsync(request);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in middleware");
throw; // Re-throw to let outer middleware handle
}
}
// ❌ Bad - swallows exceptions
public async Task<TResponse> InvokeAsync(...)
{
try
{
return await next.InvokeAsync(request);
}
catch
{
return default(TResponse); // Silent failure!
}
}// ✅ Good - only applies to relevant requests
public async Task<TResponse> InvokeAsync(...)
{
if (request is not ICommand)
{
// Skip this middleware for non-commands
return await next.InvokeAsync(request);
}
// Command-specific logic
await using var transaction = ...;
// ...
}- Dependency Injection - DI with middleware