Event.Publish - dotnet-shashlik/shashlik.eventbus GitHub Wiki
首先需要定义事件,事件分为普通事件
和延迟事件
,普通事件就是发出去以后,事件订阅方会立刻收到事件(不考虑网络等延迟),延迟事件是需要在指定时间执行的事件,比如订单30分钟未付款需要关闭订单。事件类需要实现接口IEvent
,IEvent
用于约束与规范事件定义和处理,没有实际业务意义。
延迟事件是基于本地延迟,并非基于消息中间件延迟,这也是为了最大程度保证消息不丢失,避免延迟过程中因为消息中间件挂掉而导致的消息无法消费。延迟事件同样适用于分布式事务最终一致性,但如果延迟事件处理类处理异常由重试器介入处理后,那么最终的延迟执行时间和期望的延迟时间就会产生较大的差异,是否忽略这里的时间差需要由具体的业务来决定。比如订单30分钟未付款需要关闭订单,30分钟后关闭订单出现了异常,最后由重试器到了40分钟后才关闭,也不影响订单,那么认为这个时间差可以容忍。又比如双11啦,发布一个延迟事件,晚上12点叫醒我起来买买买,只有1分钟时间,过了就买不到了,那么这种情况可以在事件处理类中,自行根据当前时间、事件发送时间、延迟执行时间等要素,自行决定业务如何处理。
例:
// 新用户注册完成事件,实现接口IEvent
public class NewUserEvent : IEvent
{
public string Id { get;set; }
public string Name { get; set; }
}
// 定义新用户注册延迟活动推送事件
public class NewUserPromotionEvent : IEvent
{
public string Id { get;set; }
public string Name { get; set; }
public string PromotionId { get; set; }
}
事件名称/事件处理类名称定义,对应消息队列中的routingKey/topic、queue/groupId,不使用此特性时,默认命名规则为{Type.Name}
.{Environment}
。
例1:[EventBusName("new_user_event")]public class NewUserEvent
,以RabbitMQ/Production为例,将路由到new_user_event.Production
。
例2:public class NewUserEvent
,以RabbitMQ/Production为例,将路由到NewUserEvent.Production
。
如图所示,消息数据需要和业务数据在同一的事务中进行提交或者回滚,最后Shashlik.EventBus会检查消息数据是否已提交,如果已提交才会执行真正的消息发送。所以要求事务的隔离级别最低为读已提交(RC)。
ITransactionContext
为抽象的事务上下文接口,Shashlik已实现RelationDbStorageTransactionContext
和XaTransactionContext
,用于关系型数据库事务和XA分布式事务。
源码定义如下:
public interface ITransactionContext
{
bool IsDone();
}
关系型数据库推荐使用RelationDbStorageTransactionContext
,基于本地数据库事务IDbTransaction
,EF Core可以使用Shashlik.EventBus.Extensions.EfCore
包扩展以更加方便的使用事务上下文,详见下方Demo。
XaTransactionContext
主要用于TransactionScope
创建的分布式事务,获取方式:XaTransactionContext.Current
,这里对事务的是否提交判断会延迟到TransactionScope
的Dispose后,所以你创建TransactionScope
必须Dispose,这也是应该的。
事件的发布主要使用IEventPublisher接口,其定义如下:
public interface IEventPublisher
{
/// <summary>
/// 普通事件发布
/// </summary>
/// <param name="event">事件实例</param>
/// <param name="transactionContext">事务和连接信息</param>
/// <param name="additionalItems">附加事件数据</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <returns></returns>
Task PublishAsync<TEvent>(
TEvent @event,
ITransactionContext? transactionContext,
IDictionary<string, string>? additionalItems = null,
CancellationToken cancellationToken = default
) where TEvent : IEvent;
/// <summary>
/// 延迟事件发布
/// </summary>
/// <param name="event">事件实例</param>
/// <param name="transactionContext">事务和连接信息</param>
/// <param name="delayAt">延迟执行时间</param>
/// <param name="additionalItems">附加事件数据</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <returns></returns>
Task PublishAsync<TEvent>(
TEvent @event,
DateTimeOffset delayAt,
ITransactionContext? transactionContext,
IDictionary<string, string>? additionalItems = null,
CancellationToken cancellationToken = default)
where TEvent : IEvent;
}
-
TEvent @event
: 事件实例对象。 -
DateTimeOffset delayAt
: 事件延迟执行时间,小于等于当前时间时,将作为非延迟事件处理。 -
ITransactionContext? transactionContext
:本地事务上下文对象,如果仅仅作为事件总线使用,不需要考虑分布式事务最终一致性,可以传null
。关系型数据库此接口的实现类为RelationDbStorageTransactionContext
(需引入包Shashlik.EventBus.RelationDbStorage
)。 -
IDictionary<string, string>? items
: 事件发布的附加数据,一些额外的数据或者同一事件需要一些动态的数据传输可以使用此参数,EventBus已默认添加以下附加数据,调用时请勿重复:- "eventbus-msg-id": 消息id
- "eventbus-send-at": 消息发送时间
- "eventbus-delay-at": 消息延迟执行时间
- "eventbus-event-name": 事件名称
-
CancellationToken cancellationToken
:取消发送token。
例 RelationDbStorageTransactionContext
:
using var conn = new MySqlConnection("...");
await conn.OpenAsync();
using var tran = connection.BeginTransactionAsync();
try
{
// ....业务代码
var @event = new NewUserEvent{
Id = "1",
Name = "张三"
};
await EventPublisher.PublishAsync(
@event,
new RelationDbStorageTransactionContext(tran),
new Dictionary<string, string>{
{"user-from", "baidu"}
},
CancellationToken.None
);
await tran.CommitAsync();
}
catch(Exception){
await tran.RollbackAsync();
}
例 XaTransactionContext
:
using var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
using var conn = new MySqlConnection("...");
await conn.OpenAsync();
try
{
// ....业务代码
var @event = new NewUserEvent{
Id = "1",
Name = "张三"
};
await EventPublisher.PublishAsync(
@event,
XaTransactionContext.Current,
new Dictionary<string, string>{
{"user-from", "baidu"}
},
CancellationToken.None
);
scope.Complete();
}
catch(Exception){
// log
}
为了方便使用EF作为事件发布和事务上下文的使用,需要引入包Shashlik.EventBus.Extensions.EfCore
。可以通过DbContext直接发布事务,当然你也可以通过扩展方法将DbContext转换为RelationDbStorageTransactionContext
。
/// <summary>
/// 通过DbContext发布事件,自动使用DbContext事务上下文和连接信息
/// </summary>
/// <param name="dbContext">DbContext上下文</param>
/// <param name="event">事件实例</param>
/// <param name="additionalItems">附加数据</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <returns></returns>
/// <exception cref="InvalidOperationException">Can't resolve service of <see cref="IEventPublisher"/></exception>
/// <exception cref="ArgumentNullException">DbContext/@event can't be null</exception>
public static async Task PublishEventAsync<TEvent>(
this DbContext dbContext,
TEvent @event,
IDictionary<string, string>? additionalItems = null,
CancellationToken cancellationToken = default
) where TEvent : IEvent;
/// <summary>
/// 通过DbContext发布延迟事件,自动使用DbContext事务上下文和连接信息
/// </summary>
/// <param name="dbContext">DbContext上下文</param>
/// <param name="event">事件实例</param>
/// <param name="delayAt">延迟执行时间</param>
/// <param name="additionalItems">附加数据</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <typeparam name="TEvent">事件类型</typeparam>
/// <returns></returns>
/// <exception cref="InvalidOperationException">Can't resolve service of <see cref="IEventPublisher"/></exception>
/// <exception cref="ArgumentNullException">DbContext/@event can't be null</exception>
public static async Task PublishEventAsync<TEvent>(
this DbContext dbContext,
TEvent @event,
DateTimeOffset delayAt,
IDictionary<string, string>? additionalItems = null,
CancellationToken cancellationToken = default) where TEvent : IEvent;
/// <summary>
/// 从DbContext中获取ITransactionContext
/// </summary>
/// <param name="dbContext"></param>
/// <returns>事务上下文,如果事务未开启,返回null</returns>
/// <exception cref="ArgumentNullException"></exception>
public static ITransactionContext? GetTransactionContext(this DbContext dbContext);
例:
// 通过IEventPublisher发布事件,DbContext.GetTransactionContext()将DbContext转换为ITransactionContext
await EventPublisher.PublishAsync(new NewUserEvent{
Id = user.Id,
Name = input.Name
}, DbContext.GetTransactionContext());
// 通过ef扩展,直接使用DbContext发布事件,自动使用当前上下文事务
await DbContext.PublishEventAsync(new NewUserPromotionEvent{
Id = user.Id,
Name = input.Name,
PromotionId = "1"
}, DatetimeOffset.Now.AddMinutes(30));