Transfer.StackExchangeRedis - dotnet-shashlik/shashlik.eventbus GitHub Wiki

Redis Stream (StackExchange.Redis)

NuGet:Shashlik.EventBus.StackExchangeRedis

Shashlik.EventBus.StackExchangeRedis 使用 StackExchange.Redis 作为底层 Redis 客户端,提供:

  • AddStackExchangeRedisMQ:基于 Redis Streams 的消息传输
  • AddStackExchangeRedisWorkerId:基于 Redis 分布式协调的雪花 ID WorkerId 自动分配

两个扩展方法共用同一个 EventBusStackExchangeRedisOptions 实例和同一个 IConnectionMultiplexer 连接。

FreeRedis 版本 功能完全等价,仅底层客户端库不同。推荐场景:项目已注册 IConnectionMultiplexer、使用 StackExchange.Redis 集群/Sentinel/SSL 高级特性,或希望选用社区更活跃的标准库。

注册方式

方式一:MQ + WorkerId 完整配置

// 注册 IConnectionMultiplexer 单例
services.AddSingleton<IConnectionMultiplexer>(_ =>
    ConnectionMultiplexer.Connect("localhost:6379,abortConnect=false"));

services.AddEventBus()
    .AddStackExchangeRedisMQ(r =>
    {
        r.MaxLength = 10000;
        r.ConsumerPoolSize = 4;
    })
    .AddStackExchangeRedisWorkerId(r =>
    {
        r.AppName = "OrderService";
    });

方式二:仅使用 MQ

services.AddEventBus()
    .AddStackExchangeRedisMQ();

AddStackExchangeRedisMQ 默认从 DI 容器中获取 IConnectionMultiplexer 实例。

方式三:仅使用 WorkerId

services.AddEventBus()
    .AddStackExchangeRedisWorkerId(r =>
    {
        r.AppName = "OrderService";
    });

方式四:自定义 IConnectionMultiplexer 工厂

services.AddEventBus()
    .AddStackExchangeRedisMQ(r =>
    {
        r.ConnectionMultiplexerFactory = sp =>
        {
            var options = ConfigurationOptions.Parse("localhost:6379,abortConnect=false");
            options.Password = "your-password";
            options.ConnectTimeout = 5000;
            return ConnectionMultiplexer.Connect(options);
        };
    });

配置项 EventBusStackExchangeRedisOptions

配置项 默认值 适用 说明
ConnectionMultiplexerFactory sp => sp.GetService<IConnectionMultiplexer>() MQ + WorkerId IConnectionMultiplexer 实例获取工厂,MQ 与 WorkerId 共享
AppName "DEFAULT" WorkerId WorkerId 分配的应用标识,不同 AppName 互不干扰
MaxLength 10000 MQ 每个 Stream 消息堆积最大数量,防止内存溢出
ConsumerPoolSize 4 MQ 每个 EventHandler 的并发消费者数量
MaxLengthFactory null MQ 消息堆积最大数量动态配置器,优先级高于 MaxLength

消息传输 (MQ)

  • 基于 Redis Streams 的 Consumer Group 机制
  • 同一 EventHandler 对应一个 Consumer Group,组内多消费者自动分担消息
  • 启动时自动创建 Consumer Group(已存在则忽略)
  • 消息处理成功后自动确认(XAck
  • MaxLength 通过 StreamAddAsyncmaxLength + useApproximateMaxLength 限制 Stream 长度

WorkerId 分配

通过 Redis 协调多实例自动获得唯一 WorkerId,免去手动管理 WORKER_ID 环境变量。详见 配置参考 - ID 生成器

核心行为:

  • 启动时自动从 Redis 抢占空闲 WorkerId(0~1023)
  • 后台定时续约避免租约过期
  • 进程退出时主动释放占用的 WorkerId
  • 不同 AppName 之间互不干扰
  • 所有 WorkerId 都被占用时启动失败

完整示例

services.AddSingleton<IConnectionMultiplexer>(sp =>
{
    var config = ConfigurationOptions.Parse("localhost:6379,abortConnect=false");
    config.Password = Configuration["Redis:Password"];
    return ConnectionMultiplexer.Connect(config);
});

services.AddEventBus(options =>
{
    options.Environment = "Production";
})
    .AddStackExchangeRedisMQ(r =>
    {
        r.AppName = "OrderService";           // 与 WorkerId 共享
        r.MaxLength = 50000;
        r.ConsumerPoolSize = 8;
        r.MaxLengthFactory = msg =>
            msg.EventName.Contains("HighVolume") ? 100000 : 10000;
    })
    .AddStackExchangeRedisWorkerId(r =>
    {
        r.AppName = "OrderService";           // 与 MQ 共享
    })
    .AddRelationDb(opt => opt.UseConnection(DataType.MySql, "..."));

选型建议

场景 推荐
已使用 FreeRedis Shashlik.EventBus.Redis
已使用 StackExchange.Redis(已注册 IConnectionMultiplexer Shashlik.EventBus.StackExchangeRedis
需要 Redis 集群、Sentinel、SSL、Streams 高级特性 Shashlik.EventBus.StackExchangeRedis
新项目,无历史包袱 Shashlik.EventBus.StackExchangeRedis(社区更活跃、API 更标准)