Creating a Consumer - messagebus/lapine GitHub Wiki

The Lapine consumer process registers queues in RabbitMQ exchanges. Messages published to those queues are then dispatched to custom handler classes.

See Basic Configuration for general info on configuring the Lapine consumer.

Running the consumer daemon

The Lapine consumer must be run with a --config file specified. Other flags are optional.

bundle exec lapine consume --config /path/to/config.yml

To see all available command line options, run:

bundle exec lapine consume --help

The Lapine consumer process does not currently include built-in daemonization. When running under an init system, background it with &.

Configuring topics, queues and handlers

The YAML file should contain one or more topics and one or more queues to bind to.

topics:
  - efrafa.topic
queues:
  - q: my-handler
    topic: efrafa.topic
    routing_key: stuff.i.care.about
    handlers:
      - MyHandler

Note that multiple handlers can be bound to the same queue. Messages read from the queue will be run through each handler, in turn.

Defining a handler

Handler classes should respond to ::handle_lapine_payload with the arguments :payload, and :metadata. Payloads are automatically converted from JSON into ruby hashes.

class MyHandler
  def self.handle_lapine_payload(payload, metadata)
    # do something with the payload
  end
end

Using Lapine and Sidekiq together

Lapine is built to serve as the micro-framework for passing messages between systems, or between decoupled components of a system. The Lapine consumer process should not be bogged down by doing actual work, however. RabbitMQ does not deal well when it has a large backlog of messages.

Combining Lapine and a background-worker framework like Sidekiq can provide the best of both worlds, however. Lapine can be used to pass messages between systems, and Sidekiq can be used to perform work.

class MyHandler
  include Sidekiq::Worker

  def perform(payload)
    # do real work
  end

  def self.handle_lapine_payload(payload, metadata)
    perform_async(payload)
  end
end

In this case, the handler method may do simple introspection of the payload or metadata. Message metadata may also include much more information than the worker cares about, so very likely should not be passed as-is into the Sidekiq #perform_async method.