MessageTransport - henk52/knowledgesharing GitHub Wiki

Message transport

Introduction

Purpose

References

Vocabulary

  • AMQP: Advanced Message Queuing Protocol is the protocol used by RabbitMQ for messaging.

Overview

RabbitMQ

RabbitMQ vocabulary

  • Binding - connects an exchange with a queue using 'binding key'
    • Bindings are rules that exchanges use (among other things) to route messages to queues.bindings
  • Binding key - A key that the exchange looks at to decide how to route the message to queues(Joh19a).
    • Think of the routing key like an address for the message(Joh19a).
    • The purpose of the routing key is to select certain messages published to an exchange to be routed to the bound queue. In other words, the routing key acts like a filter. bindings.
  • Broker - The (AMQP)server that distributes the messages
  • Channel - A virtual connection inside a connection(Joh19a).
    • Channels are there to avoid the TCP overhead if an application needs multiple connections.
      • An application would simply create multiple channelsChannels.
    • When publishing or consuming messages from a queue - it's all done over a channel(Joh19a).
    • On the same channel we can have bindings to one queue but multiple routing keys
  • Connection - A TCP connection between your application and the RabbitMQ broker(Joh19a).
  • Consumer - receieves messages from a queue.
  • default (nameless) exchange -
    • compares routing key with the queue name
    • indirectly allows sending directly to queues
  • direct, exchange type - The message is routed to the queues whose binding key exactly matches the routing key of the message(Joh19a).
  • Exchange - is responsible for routing the messages to different queues with the help of bindings and routing keys(Joh19a).
    • match and distribute messages across queues.
    • Exchange compares routing key with binding key
    • receives messages from producers and pushes them on to 0? or more queues.
  • Fanout, exchange type - routes messages to all of the queues bound to the exchange(Joh19a).
  • headers, exchange type - use the message header attributes for routing(Joh19a). TODO elaborate
  • Message - Information that is sent from the producer to a consumer through RabbitMQ(Joh19a).
  • Producer - emits messages to an exchange.
    • Sends messages to the exchange.
  • publishing -
    • Publishing messages as persistent affects performance (just like with data stores, durability comes at a certain cost in performance).
      • Delivery mode (persistent or not)
        • octet delivery-mode Non-persistent (1) or persistent (2). basic
  • Queue - Buffer that stores messages.
  • Routing key -
  • topic, exchange type - does a wildcard match between the routing key and the routing pattern specified in the binding(Joh19a).

Installing RabbitMQ via Docker

  • docker run --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always rabbitmq:3-management
  • docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

RabbitMQ overview

The RabbitMQ supports one-way messages, if you need to reply then you need to create another exchange or another queue.

how the client establishes a flow with the server

  • create a connection
    • to e.g. amqp://guest:guest@localhost:5672/
  • create a channel on the connection
  • declare a queue on the channel
  • bind to the queue to the exchange

Exchange types

  • direct - a message goes to the queues whose binding key exactly matches the routing key of the message.
    • if there are multiple queus with matches then messages are distributed round-robin
  • fanout - broadcasts all the messages it receives to all the queues it knows. Ignores the binding key in the bind operation.
  • headers -
  • topic - enable producer to have routing keys with multiple words, separated by '.'
    • e.g. 'cron.error' or 'kernel.info'
    • the subscriber could then listen to all cron messages with 'cron.*'
      • * wildcards a single word.
    • or each host could send out 'my_name.cron.warning' etc
    • and a susbscriber could listing to all messages from 'my_name' host with 'my_name.#'
      • # substitutes for zero or more words.
    • or listen to any cron message with: '.cron.'
    • When a consumer binds with '#' as the routing key, the bind is then the same as a 'fanout' exchange
    • When a consumer binds with '*' as the routing key, the bind is then the same as a 'direct' exchange

Queue operations

  • default is round-robin
  • without ack a messsage/task can get lost
  • with ack a message/task can risk being handled out of order
    • Acknowledgement must be sent on the same channel that received the delivery. Attempts to acknowledge using a different channel will result in a channel-level protocol exception.
  • Giving a queue a name is important when you want to share the queue between producers and consumers.

Ensure that a message is not lost if a worker is lost

  • wait with the ack until the message/task has been done(handler must manually send the ack)
    • channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)

Ensure that messages survice RabbitMQ restart

  • Configure the queue to survive a RabbitMQ restart
    • channel.queue_declare(queue='task_queue', durable=True)
  • wait with the ack until the message/task has been done(handler must manually send the ack)
    • channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=False)
  • At the publisher set the message to PERSISTENT_DELIVERY_MODE
    • channel.basic_publish(exchange='',routing_key="task_queue",body=message,properties=pika.BasicProperties(delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE))
    • Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet.
  • In environments and use cases where durability is important, applications must use durable queues and make sure that publish mark published messages as persisted. AMQP 0-9-1 Model Explained

Ensure that messages are distributed fairly among workers

  • channel.basic_qos(prefetch_count=1)
    • IMPORTANT: the basic_qos() must go before basic_consume() call.
    • it must be combined with manually sending an ack when the message has been handled.
    • round-robin will send an equal amount messages to each worker, unaffected by how long each message takes to process.
    • each worker will now only have one message, and that is the one it is working on.
      • when all workers do this, then messages are sent to workers as the workers becomes ready to handle the message.

working with rabbitmq from different languages

Using RabbitMQ from PERL

Semi official perl exampls

  • Examples:

  • get

  • Install Math::Int64

    • Fedora: package {'perl-Math-Int64': ensure => present }
    • Ubuntu: package {'libmath-int64-perl': ensure => present }
  • perl Makefile.PL

  • make

  • make test

  • sudo make install

testing AnyEvent-RabbitMQ for perl

  1. cd
  2. mkdir source
  3. cd source
  4. git clone AnyEvent-RabbitMQ
  5. cd AnyEvent-RabbitMQ/
  6. perl Makefile.PL
  7. make && make test
  8. sudo make install
package {'perl-Module-Build': ensure => present }
package {'perl-Module-Install': ensure => present }
package {'perl-Module-Install-AuthorTests': ensure => present }

# Required by: perl Makefile.PL
package {'perl-Test-Exception': ensure => present }
package {'perl-List-MoreUtils': ensure => present }
package {'perl-namespace-clean': ensure => present }
package {'perl-File-ShareDir': ensure => present }
package {'perl-Readonly': ensure => present }

# Required during install
package {'perl-Class-Accessor': ensure => present }
package {'perl-Class-Data-Inheritable': ensure => present }