DM.Services.MessageQueue - quilin/dm GitHub Wiki

Библиотека для работы с RabbitMQ. Основная цель - лишить разработчика забот об использовании кроля. Почти. При разработке учитывалось, что возможно когда-нибудь мы захотим перейти на что-то облачное, типа SQS или ServiceBus, или что-то более продакшен-реди, вроде Kafka.

Основные абстракции

IProducer<TKey, TMessage> - отправлятор сообщений. Как и положено, принимает ключ и тело сообщения, отправляет куда там надо. IConsumer<TMessage> - приниматор сообщений. IMessageHandler<TMessage> - обработчик сообщений. Именно он получает одно сообщение из брокера и отвечает за его обработку.

Продюсер

У продюсера имеются два метода: отправить одно сообщение и отправить пакет сообщений (сообщения в пакете могут обладать разными ключами). "Ключ" в понимании разных брокеров сообщений может иметь разное значение, в случае с кролём - это ключ маршрутизации, на основании которого определяется в какие очереди попадет сообщение. В связи с особенностями кроля - ключ всегда строка.

К телу сообщения особо нет требований - желательно, чтобы это был сериализуемый POCO объект.

Как инъектить продюсер?

Есть два способа, один простой, а другой позволяет больше гибкости, но мы категорически призываем использовать именно простой - гибкий нужен для особых случаев, когда иначе ну никак.

Простой способ:

public class SomeService
{
    public readonly IProducer<string, SomeMessage> producer;

    public SomeService(IProducerBuilder<string, SomeMessage> builder)
    {
        producer = builder.BuildRabbit(new RabbitProducerParameters
        {
            ExchangeName = "some-service.messages", // Соглашение по именованию: точки разделяют пространства имен, дефисы - слова.
            ExchangeType = ExchangeType.Topic       // Если не знаете, какой тип использовать - используйте Topic. И если знаете.
        });
    }
}

Сложный способ:

public class SomeService
{
    public readonly IProducer<string, SomeMessage> producer;

    public SomeService(IProducer<string, SomeMessage> producer)
    {
        this.producer = producer;
    }
}

И почему же он сложный? А все потому, что теперь вам надо зарегистрировать свой продюсер:

public class Startup
{
    // ........
    public void ConfigureContainer(ContainerBuilder builder)
    {
        builder.Register<IProducer<string, SomeMessage>>(ctx => ctx
            .Resolve<IProducerBuilder<string, SomeMessage>>()
            .BuildRabbit(new RabbitProducerParameters
            {
                ExchangeName = "some-service.messages",
                ExchangeType = ExchangeType.Topic
            }))
            .SingleInstance();
    }
    // ........
}

Зачем такое может понадобиться? Ну, например, как в данном примере - если нас интересует экзотический цикл жизни продюсера.

Консюмер и хендлер

Консюмер отвечает за подключение к очереди: его можно запустить, можно приостановить. Консюмеры, как правило, должны быть синглтонами в приложении, и стартовать вместе с самим приложением, поэтому самое простое - инъектить консюмер прямо в Startup и запускать там же.

public class Startup
{
    // ........
    public void Configure(IApplicationBuilder appBuilder,
        IConsumerBuilder<string, SomeMessage> consumerBuilder)
    {
        consumerBuilder.BuildRabbit<SomeHandler>(new RabbitConsumerParameters("consumer-tag", ProcessingOrder.Sequential)
        {
            ExchangeName = "some-service.messages", // Избегайте тавтологии в именах очередей и бирж (не используйте слов exchange и queue)
            ExchangeType = ExchangeType.Topic,
            QueueName = "some-service.messages-to-process",
            RoutingKeys = new[] {"route1", "route2.*"},
            Exclusive = false,
            DeadLetterExchange = "some-service.messages-unprocessed",
            MaxProcessingAnticipation = TimeSpan.FromSeconds(10)
        }).Start(); // Не забудьте запустить консюмер!
    }
    // ........
}

Если очень хочется, можно инъектить сразу консюмер. Для этого нужно совершить те же телодвижения, что и для инъекции продюсера. Но в отличие от продюсера - это смысла никакого не имеет. Используйте построитель.

Как слушать сообщения?

Выше мы вызвали метод BuildRabbit<THandler>(RabbitConsumerParameters parameters), который требует generic-параметр, реализующий интерфейс IMessageHandler<TMessage>. Он должен быть зарегистрирован, и может иметь любой цикл жизни. Не забывайте, что синглтон-зависимости не должны быть stateful. В нашем случае каждое входящее сообщение будет создавать свой скоуп и резолвить хендлер в этом скоупе, поэтому Transient и Scoped регистрации технически означают одно и то же для хендлера.

Обработчик принимает сообщение, а на выход должен вернуть результат обработки, который влияет на то, какой ответ будет отправлен брокеру - ack/nack/reject.

⚠️ **GitHub.com Fallback** ⚠️