NRabbitBus wiki - GasiorowskiPiotr/NRabbitBus GitHub Wiki

Before reading and using NRabbitBus please read the following document: http://www.rabbitmq.com/tutorials/amqp-concepts.html

Simple Publish

In order to perform a simple message publishing you need to do three simple things:

  • Initialize Rabbit: Rabbit.Initialize();
  • Obtain IBus instance from Container: var bus = ComponentLocator.Current.Get<IBus>(); (or with any other dependency injection).
  • Post a message: bus.Publish(new WaitMsMessage { MilisecondsToWait = r.Next() % 10000 }, "WorkQueue");

The IBus interface has some Publish(...) overloads and these are:

  • void Publish(IMessage message, string queueName); which published a message to a specified Queue. It actually posts a message to a default Exchange with a name of the Queue as a Routing Key which, by default, routes a message to a Queue with a name equal to a Routing Key
  • void Publish(IMessage message, string exchangeName, string routingKey); which published a message to an Exchange with a given name (exchangeName) and a specified Routing Key (routingKey).
  • void Publish(IMessage message, string queueName, Action<IMessage> replyAction); published a message to a specified Queue and executes an action passed as replyAction on a separate thread.

The message published MUST implement interface NRabbitBus.Framework.Shared.IMessage.

Simple Worker

In order to start receiving messages you will need to perform the following actions:

  • Create at least one handler for a given message type. It shall be done by creating a type deriving either from MessageHandler<TMessage> or MessageHandlerWithResult<TMessage>. You don't need to register those handlers in a Container. It will be done automatically.
  • Call:
Rabbit
    .Initialize(typeof (Program).Assembly)
    .DeclareQueues(QueuesConfiguration.FromCode(new Queue
                                                            {
                                                                Durable = false,
                                                                Name = "topic1_queue",
                                                                RequiresAck = false,
                                                                IsRcp = false,
                                                                MaxThreads = 10
                                                            },
                                                        new Queue
                                                            {
                                                                Durable = false,
                                                                Name = "topic2_queue",
                                                                RequiresAck = false,
                                                                IsRcp = false,
                                                                MaxThreads = 10
                                                            },
                                                        new Queue
                                                            {
                                                                Durable = false,
                                                                Name = "topic3_queue",
                                                                RequiresAck = false,
                                                                IsRcp = false,
                                                                MaxThreads = 10
                                                            }))
                .DeclareExchanges(ExchangesConfiguration.FromCode(new Exchange
                                                                  {
                                                                      Durable = false,
                                                                      Name = "topic1_exchange",
                                                                      Type = "topic"
                                                                  }))
                .SetupRouting(RoutesConfiguration.FromCode(new Route("topic1_queue", "topic1_exchange",
                                                                                    "piotr.#"),
                                              new Route("topic2_queue", "topic1_exchange",
                                                                                    "piotr.gasiorowski.*"),
                                              new Route("topic3_queue", "topic1_exchange",
                                                                                    "piotr.gasiorowski.programmer")))
                .StartListeningOnDeclaredQueues();
// perform other operations...

The code does the following:

  1. It scans the assembly provided in initialize to be able to load all the MessageHandlers you declared (not only them, but you will see it later on)

  2. It declares AMQP Queues with RabbitMQ (if queues are already declared there is no need to worry as long as their parameters are the same).

  3. It declares AMQP Exchanges with RabbitMQ. The same rules apply here as with the case of Queues.

  4. It sets up routing between Exchanges and Queues.

  5. It starts listening and dispatching messages to handlers for the declared Queues.

MessageHandler types

As RabbitMQ (and thus NRabbitBus) support posting messages, handling responses and blocking RPC, there are two base handler types that you may need to use.

  • MessageHandler for handling messages that do not return any response:
public class TopicMessageHandler : MessageHandler<TopicMessage>
    {
        public TopicMessageHandler(Logger logger) : base(logger)
        {
        }

        protected override void HandleMessage(TopicMessage message)
        {
            Console.WriteLine("Received message");
        }
    }
  • MessageHandlerWithResponse for handling messages which returns some response (it should implement IMessage). Please note that the order in which MessageHandlers are processed is important as only the last returned value will be sent back as result.
 public class RequestReturningMessageHandler : MessageHandlerWithResult<RcpRequest>
    {
        public RequestReturningMessageHandler(Logger logger) : base(logger)
        {
        }

        protected override IMessage HandleMessage(RcpRequest message)
        {
            Console.WriteLine("Handling message in RequestReturningMessageHandler which is the FIRST handler");
            return new RcpResponse
                       {
                           SequenceNo = message.SequenceNo
                       };
        }
    }

Both base message handlers expose following properties which should be used to control the flow of handlers' processing:

  • bool StopOnFailure - if exception happens - it will not proceed to next handler.
  • bool StopProcessing - if set to true - will finish the handlers' execution process.
  • object StopProcessingReason - any object that may be logged which caused the stop in handlers' execution process. If StopOnFailer is set to true and Exception happens - the StopProcessingReason will be set to this Exception.

Configuration

In the previous example we used the configuration which was directly passed from code. You can also store the configuration in Application Configuration File. Sample configuration is dispayed below:

<configuration>
	<configSections>
		<section name="QueueConfigurationSection" type="NRabbitBus.Framework.Configuration.QueueConfigurationSection, NRabbitBus.Framework"/>
		<section name="ExchangesConfigurationSection" type="NRabbitBus.Framework.Configuration.ExchangeConfigurationSection, NRabbitBus.Framework"/>
		<section name="RoutingConfigurationSection" type="NRabbitBus.Framework.Configuration.RoutingConfigurationSection, NRabbitBus.Framework"/>
		<section name="MessageOrderConfigurationSection" type="NRabbitBus.Framework.Configuration.MessageOrderConfigurationSection, NRabbitBus.Framework"/>
		<section name="RabbitConfigurationSection" type="NRabbitBus.Framework.Configuration.RabbitConfigurationSection, NRabbitBus.Framework"/>
	</configSections>
    <startup> 
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
    </startup>

	<RabbitConfigurationSection
		hostname="localhost"
		port="5672"
		username="guest"
		password="guest"
		/>
		
	<QueueConfigurationSection>
		<Queues>
			<Queue name="TestQueue1" ackRequired="false" durable="false" maxThreads="12" isRcp="false"/>
		</Queues>
	</QueueConfigurationSection>

	<ExchangesConfigurationSection>
		<Exchanges>
			<Exchange name="ex1" type="direct" durable="false"></Exchange>
			<Exchange name="ex2" type="fanout" durable="true"></Exchange>
		</Exchanges>
	</ExchangesConfigurationSection>

	<RoutingConfigurationSection>
		<Routes>
			<Route queue="TestQueue1" exchange="ex1" routingKey="a1b1"/>
			<Route queue="TestQueue2" exchange="ex2" routingKey="a1b1"/>
		</Routes>
	</RoutingConfigurationSection>

	<MessageOrderConfigurationSection>
		<MessageOrder>
			<Order messageType="NRabbitBus.Framework.Tests.Subscription.MyMessage, NRabbitBus.Framework.Tests">
				<MessageHandlers>
					<Handler order="1" type="NRabbitBus.Framework.Tests.Subscription.MyMessageHandler, NRabbitBus.Framework.Tests"></Handler>
				</MessageHandlers>
			</Order>
		</MessageOrder>
	</MessageOrderConfigurationSection>
</configuration>

To be continued...

⚠️ **GitHub.com Fallback** ⚠️