Event.Subscribe - dotnet-shashlik/shashlik.eventbus GitHub Wiki
事件订阅使用接口IEventHandler<>
,泛型参数即为事件类型,即表示该事件处理类订阅哪个事件,事件处理类将自动注册为Transient
,不再需要手动注册服务。事件处理类不关心是否为延迟事件,延迟是否由发布方决定。
EventBus不能保证业务的消息的幂等性,为了保证消息的可靠传输,EventBus以及消息中间件对消息QOS处理等级必须为At least once
:至少达到一次,一般消息中间件都需要开启消息持久化避免消息丢失。简单的说就是一个事件处理类可能处理多次同一个事件,比如用户订单付款完成为一个事件,付款完成后需要修改订单状态为待发货,那么在事件处理类中可能收到多次这个订单的付款完成事件,那么业务的幂等性处理就可以使用悲观锁,判断订单状态,如果订单状态已经为待发货,则直接返回并忽略本次事件响应。
IEventHandler:
public interface IEventHandler<in TEvent> where TEvent : IEvent
{
/// <summary>
/// 执行事件处理
/// </summary>
/// <param name="event">事件实例</param>
/// <param name="items">附加数据</param>
/// <returns></returns>
Task Execute(TEvent @event, IDictionary<string, string> items);
}
}
-
TEvent @event
: 事件实例对象 -
IDictionary<string, string> items
: 附加数据,EventBus已自动添加以下键值:- "eventbus-msg-id": 消息id
- "eventbus-send-at": 消息发送时间
- "eventbus-delay-at": 消息延迟执行时间
- "eventbus-event-name": 事件名称
例:
// 一个事件可以有多个处理类,可以分布在不同的微服务中,但类名不可重复,实际上默认的类名会定义为消息队列中的queue,同CAP的group
// 用于发送短信的事件处理类
public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>
{
public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
{
// 发送短信...
}
}
// 用于发放消费券的事件处理类
public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>
{
public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
{
// 业务处理...
}
}
// 用于新用户延迟活动的事件处理类,将在指定时间自行
public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>
{
public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items)
{
// 业务处理...
}
}
事件名称/事件处理类名称定义,对应消息队列中的routingKey/topic、queue/groupId,不使用此特性时,默认命名规则为Type.Name
.Environment
。
例:[EventBusName("new_user_event_for_sms_handler")]public class NewUserEventForSmsHandler
,以RabbitMQ/Production为例,将绑定到队列new_user_event_for_sms_handler.Production
。
例:public class NewUserEventForSmsHandler
,以RabbitMQ/Production为例,将绑定到队列NewUserEventForSmsHandler.Production
。