Configuration - dotnet-shashlik/shashlik.eventbus GitHub Wiki

配置参考

Shashlik EventBus 主要依赖消息存储介质和消息传输中间件。消息存储介质通常使用关系型数据库,因为对 ACID 事务有良好的支持,对数据库的事务隔离级别最低要求为 Read Committed,否则会影响对事务已提交的判断。

注册方式

最简配置(内存存储 + 内存队列,仅适用于测试):

services.AddEventBus()
    .AddMemoryQueue()
    .AddMemoryStorage();

自定义配置:

services.AddEventBus(options => {
    options.Environment = "Production";
    options.RetryFailedMax = 60;
});

通过配置文件:

services.AddEventBus(configuration.GetSection("EventBus"))
    .AddRelationDb(options => options.UseConnection(DataType.MySql, "连接字符串"))
    .AddRabbitMQ(configuration.GetSection("RabbitMQ"));

EventBusOptions 配置项

Environment 运行环境

  • 默认值:Production
  • 用于区分当前应用运行的环境。存储的数据和注册到消息中间件的事件/处理器名称后都会加上此后缀。如多个环境共用同一套消息中间件和存储,可通过此值区分。

TransactionCommitTimeout 事务提交等待超时

  • 默认值:60
  • 单位:秒
  • 发布事件后,EventBus 轮询 ITransactionContext.IsDone() 来确认事务是否已提交。该值为此轮询的超时时间。必须小于 StartRetryAfter

StartRetryAfter 重试器介入时间

  • 默认值:300
  • 单位:秒
  • 消息首次发送或消费失败后,经过此时间重试器开始介入处理。

RetryLimitCount 重试器单次处理数据量

  • 默认值:200
  • 重试器单次执行时读取的消息数量上限。

RetryMaxDegreeOfParallelism 重试器并行度

  • 默认值:5
  • 重试器并发执行数量,数据量大时可根据 CPU 内核数调整。

RetryFailedMax 最大重试次数

  • 默认值:60
  • 最小值:5
  • 超过最大重试次数后需要人工介入。达到最大失败次数的记录不会被过期删除。

RetryInterval 重试间隔

  • 默认值:5
  • 单位:秒
  • 重试器两次执行之间的间隔时间。

LockTime 重试锁定时长

  • 默认值:60
  • 单位:秒
  • 重试器处理数据时通过乐观锁锁定消息,防止多实例重复处理。必须小于 RetryInterval

MessageExpireHour 成功消息过期时间

  • 默认值:72
  • 单位:小时
  • 处理成功的消息在此时间后被过期清理器删除,减少存储压力。失败消息永不过期。

DelayedMessageToleranceSeconds 延迟消息容忍度

  • 默认值:3
  • 单位:秒
  • 延迟消息到达执行时间点后,若距离执行时间已不足此秒数,则变成立即执行,避免 Timer 调度精度问题。

HandlerServiceLifetime 处理器 DI 生命周期

  • 默认值:Transient
  • 事件处理器注册到 DI 容器的生命周期类型。

ID 生成器

Shashlik EventBus 内涉及两类 ID:

  • IMsgIdGenerator:传输消息 ID,全局唯一,字符串类型,默认 GuidMsgIdGenerator(GUID)。一般无需调整。
  • IIdGenerator:存储消息主键 ID,long 类型,作为关系型存储/消息存储表的主键列。默认实现基于雪花算法,多实例部署时需要保证各实例 ID 不重复。

默认实现

  • 包:Shashlik.EventBus(已默认引用,无需额外安装)
  • 框架启动时自动注册为 IIdGenerator 的默认实现
  • 适用于单实例部署;多实例部署时各实例必须使用不同的 WorkerId,否则可能产生重复 ID

WorkerId 取值规则

按以下优先级选取 WorkerId:

  1. 环境变量 WORKER_ID:必须是 0 ~ 1023 之间的整数。未设置或解析失败时进入下一步。
  2. 随机值:框架启动时从 0 ~ 1023 之间随机分配一个 WorkerId。

多实例部署必须显式设置 WORKER_ID 环境变量(或使用下文 Redis 自动分配方案),否则随机值在多实例之间可能冲突,导致 ID 重复。

通过环境变量指定 WorkerId

# bash / linux
export WORKER_ID=3
# powershell / windows
$env:WORKER_ID = "3"

docker-compose.yml 示例:

services:
  order-service:
    environment:
      - WORKER_ID=1   # 同一服务的不同实例必须分配不同值

替换默认实现

实现 IIdGenerator 接口并在 AddEventBus 之前注册:

public class MyIdGenerator : IIdGenerator
{
    public long NextId() => /* 自定义生成逻辑 */;
}

services.AddSingleton<IIdGenerator, MyIdGenerator>();

services.AddEventBus()
    .AddRelationDb(/* ... */)
    .AddRabbitMQ(/* ... */);

分布式部署:Redis WorkerId 自动分配

多实例部署时,手动管理 WORKER_ID 环境变量容易出错。框架提供基于 Redis 的 WorkerId 自动分配扩展,通过 Redis 协调各实例自动获得唯一 WorkerId,无需手动配置。

扩展方法
Shashlik.EventBus.Redis AddRedisWorkerId
Shashlik.EventBus.StackExchangeRedis AddStackExchangeRedisWorkerId

WorkerId 分配与 MQ 传输共用同一个 Redis 连接和同一个 options 类,可在同一次链式调用中开启。

注册示例(FreeRedis)

services.AddSingleton(sp => new RedisClient("localhost:6379"));

services.AddEventBus()
    .AddRedisMQ()
    .AddRedisWorkerId(r =>
    {
        r.AppName = "OrderService";
    });

注册示例(StackExchange.Redis)

services.AddSingleton<IConnectionMultiplexer>(_ =>
    ConnectionMultiplexer.Connect("localhost:6379,abortConnect=false"));

services.AddEventBus()
    .AddStackExchangeRedisMQ()
    .AddStackExchangeRedisWorkerId(r =>
    {
        r.AppName = "OrderService";
    });

配置项说明

字段 默认值 说明
AppName "DEFAULT" 当前应用的标识。不同 AppName 的 WorkerId 互不干扰,可多应用共用同一 Redis
RedisClientFactory / ConnectionMultiplexerFactory 从 DI 容器获取 Redis 连接工厂,与 MQ 共用

何时该用 Redis 自动分配:生产环境、Kubernetes 多副本部署、无法固定 WORKER_ID 的场景。何时可以不用:本地开发、单实例部署、可控的固定环境变量场景。

参数约束关系

启动时通过 EventBusOptionsValidation 校验参数,以下约束必须满足:

约束 说明
TransactionCommitTimeout > 0 事务等待必须有效
StartRetryAfter > 0 重试介入时间必须有效
TransactionCommitTimeout < StartRetryAfter 否则发布路径在重试路径启动前就放弃了
RetryInterval > 0 重试间隔必须有效
LockTime > 0 锁定时长必须有效
LockTime < RetryInterval 否则同一行在下次重试前锁未释放,会被重复拾取
RetryFailedMax >= 5 最大重试次数不能太小
RetryLimitCount > 0 单次数量必须有效
RetryMaxDegreeOfParallelism > 0 并行度必须有效
MessageExpireHour > 0 过期时间必须有效
Environment 非空 环境标识不能为空