Consumer Middleware - messagebus/lapine GitHub Wiki

The Lapine consumer process is built around a middleware stack, similar to Rack.

Defining Middleware

Middleware classes are created as follows:

class MyLapineMiddleware
  def initialize(app)
    @app = app
  end

  def call(message)
    @app.call(message)
  end
end

You can do work before or after calling through to the rest of the middleware stack.

The message passed into the #call method is a Lapine::Consumer::Message, which is a glorified Hash. It responds to the following methods:

  • #payload - the raw JSON payload as a String
  • #decoded_payload - the payload converted into a Hash
  • #metadata - message metadata passed in from the ruby-amqp gem
  • #logger

Messages can be added to with arbitrary data as you would with a Hash:

def call(message)
  message[:foo] = 'bar'
end

Registering Middleware

Lapine::Consumer::Middleware.add MyMiddleware
Lapine::Consumer::Middleware.add_before MyMiddleware, MyOtherMiddleware
Lapine::Consumer::Middleware.add_after MyMiddleware, AnotherMiddleware
Lapine::Consumer::Middleware.delete MyMiddleware

Existing Middleware

The default consumer middleware is as follows:

  • MessageAckHandler - acknowledges the message in RabbitMQ after processing
  • ErrorHandler - catches errors and runs through the dispatcher Error Handling
  • JsonDecoder - converts the payload into decoded_payload

Middleware with configuration

You can define middleware that is initialized with application-specific configuration:

class MyConfigurableMiddleware
  def initialize(app, options = {})
    @app = app
    @options = options
  end

  def call(message)
    message.merge(options)
    @app.call(message)
  end
end

Then register it:

Lapine::Consumer::Middleware.add MyMiddleware, {foo: 'bar'}