Pub sub messaging - rebus-org/Rebus GitHub Wiki
Implementing publish/subscribe with Rebus is pretty easy, but the configuration has a few caveats that you need to be aware of depending on your choice of technology.
Generally, transports can be divided into
- those that DO support pub/sub natively, and
- those that DO NOT support pub/sub natively
The first kind of transport is really nice to work with because pub/sub "just works" and requires very little configuration. It usually requires some kind of central broker though, which comes with its own set of challenges.
The second kind of transport can be more distributed in nature, as it is capable of working without a central broker, which also means that it can be more scalable. E.g. with Rebus on MSMQ and e.g. a local database as a subscription storage, you can have real pub/sub messaging without any central bottlenecks.
With Rebus, transports with native pub/sub support insert themselves as ISubscriptionStorage implementations in addition to the usual ITransport. This means that the transport gets to handle subscribe/unsubscribe requests and can manage subscriptions accordingly.
This also means that you need not configure owners for events, because subscriptions can be managed at the transport level. There is simply no need for a subscriber to know beforehand who is going to publish a given event type, because anyone can publish any event, and the transport takes care of distributing it to whoever subscribed.
For example, with RabbitMQ, a call to await bus.Subscribe<SomeEvent>()
will immediately bind the topic SomeNamespace.SomeEvent, SomeAssembly
to the subscriber's input queue, and this will naturally be the topic that the publisher will publish messages to when it does an await bus.Publish(new SomeEvent(...))
.
Similarly, with Azure Service Bus, subscriptions are established by ensuring that an appropriate topic exists, and then a subscriber will create a subscription that forwards messages to its input queue by setting the ForwardTo
property of the subscription. With the SomeEvent
event mentioned above, a topic named somenamespace_someevent__someassembly
would be created (Azure Service Bus has case-insensitive topic and does not support .
, +
, ´
and other characters that can occur in .NET type names), and then a subscription for the subscriber with input queue subscriber1
would be created with the name subscriber1
.
Transports without native pub/sub support cannot manage subscriptions and do multicast sends. This means that Rebus must "manually" send a copy of a published event to each subscriber's input queue, using ordinary point-to-point messaging underneath the covers. This process is completely transparent to the programmer, however it is a thing that you need to know when you configure Rebus.
The way subscribe works with this kind of transport, is that Rebus will send a SubscribeRequest
/UnsubscribeRequest
upon subscribing/unsubscribing respectively. The subscribe/unsubscribe request will then be sent to the publisher of that particular event type, and then the publisher can save the subscription into its local subscription storage. Again, this happens transparently to the programmer, but it means two things:
- the publisher needs to have a subscription storage configured, and
- the subscriber needs to know where to send the subscribe/unsubscribe request
The first thing is done by configuring a subscription storage at the publisher's end, e.g. like so in order to use SQL Server as the subscription storage:
Configure.With(...)
.(...)
.Subscriptions(s => s.StoreInSqlServer(...))
.Start();
There is a slight twist to the subscription storage configuration though, as some subscription storages are capable of being configured to be CENTRALIZED. What this means is that you can get the same behavior as a real pub/sub-capable transport by using a central database (i.e. one that can be reached by all subscribers and all publishers) as a subscription storage.
You can read more about the subscription storage configuration on the ISubscriptionStorage page. Whether it is available, and how you enable it, varies with the chosen subscription storage database. With SQL Server it looks somewhat like this:
Configure.With(...)
.(...)
.Subscriptions(s => s.StoreInSqlServer(..., isCentralized: true))
.Start();
i.e. you can enable it by setting the optional isCentralized
parameter to true
when you configure it.