Receiving Messages - OggettoWeb/messenger GitHub Wiki

Observer classes

Each incoming message is served as an event which should be observed by some class in the code. Observer classes should be created in separate modules (like module for ERP integration).

Let's say, you want to observe event from 3rd party system named product_created. Write the following to the module's config.xml:

<config>
    <global>
        <messenger>
            <event_observers>
                <new_products>
                    <class>mymodule/modelname</class>
                </new_products>
            </event_observers>
        </messenger>
    </global>
</config>

Here we defined new events observer mymodule/modelname. Each observer should implement Oggetto_Messenger_Model_Event_Observer_Interface:

  • match function receives events and returns true if this observer could handle this event. Here you may check event name.
  • observe function is called if match returns true. Write main observer logic here.

Observer can also implement Oggetto_Messenger_Model_Log_Loggable interface and receive automatically injected logger object to setLogger method (see logging section for more details about logs).

Observing daemon

To start observing messages you should run system daemon which will read RabbitMQ queues and pass events to defined event observers. Running the daemon is possible in 2 ways: manual and via config file.

Manual queues config

$ cd /magento/root
$ php shell/messenger/receiver.php start --queues foo:1:d,bar:2:d,baz:3:s

This command will run:

  1. 1 daemon reading messages from foo queue. d means distributed.
  2. 2 daemons reading bar queue. Also distributed.
  3. 3 daemons reading baz queue. s means selfish.

YAML queues config

$ cd /magento/root
$ php shell/messenger/receiver.php start --config /path/to/config.yaml

YAML-config file should be in the following format:

queues:
  - name: foo
    workers: 2
    type: s # selfish
  - name: bar
    workers: 1
    type: s # distributed
  - name: baz
    workers: 1
    type: s # selfish

Distributed and selfish workers

Each queue listener daemon can work in 2 modes:

  • Selfish. This means, that all messages which are received from MQ will be processed within same PHP-process.
  • Distributed. This means, that each message which is received from MQ will be processed in a separate PHP-process.

Distributed mode works slower that selfish, b/c for each message it initializes Magento instance. Selfish mode is faster since everything is within single script and Magento is initialized only once. But daemonizing Magento causes memory leaks on some operations, so selfish processes can run out of memory. Messenger automatically respawns selfish workers once they reach memory consuming of 110 MB.

Stopping and reloading daemon

To stop observing messages you should run:

$ cd /magento/root
$ php shell/messenger/receiver.php stop

To reload all workers:

$ cd /magento/root
$ php shell/messenger/receiver.php reload

Messages format

By default Oggetto_Messenger understands messages in XML format and converts them to event objects. Message like this:

<?xml version="1.0" encoding="UTF-8"?>
<update>
   <event name="smth_happened">
      <foo>1</foo>
      <bar>2</bar>
      <baz>
         <xxx>3</xxx>
      </baz>
   </event>
</update>

Will be converted to Oggetto_Messenger_Model_Event object with:

$event->getName(); // smth_happened

$event->getData();
/*
[
   'foo' => 1,
   'bar' => 2,
   'baz' => 
   [
      'xxx' => 3
   ]
]
*/

Messages format could be changed for your own.

Example

Suppose you are going to receive XML message described above from queue happenings. Your module is called MyCompany_MyModule and has models namespace 'mymodule'.

Define the observer in your module's config.xml:

<config>
    <global>
        <messenger>
            <event_observers>
                <something_happened>
                    <class>mymodule/happenings_observer</class>
                </something_happened>
            </event_observers>
        </messenger>
    </global>
</config>

Implement observer class

class MyCompany_MyModule_Model_Happenings_Observer
    implements
    Oggetto_Messenger_Model_Event_Observer_Interface,
    Oggetto_Messenger_Model_Log_Loggable
{
    /**
     * Logger
     *
     * @var Zend_Log
     */
    private $_logger;

    /**
     * Set logger
     *
     * @param Zend_Log $logger Logger
     * @return mixed
     */
    public function setLogger(Zend_Log $logger)
    {
        $this->_logger = $logger;
        return $this;
    }

    /**
     * Check if event could be observed by this class
     *
     * @param Oggetto_Messenger_Model_Event $event Event
     * @return boolean
     */
    public function match(Oggetto_Messenger_Model_Event $event)
    {
        return $event->getName() == 'smth_happened';
    }

    /**
     * Observe event
     *
     * @param Oggetto_Messenger_Model_Event $event Event
     * @return void
     */
    public function observe(Oggetto_Messenger_Model_Event $event)
    {
        $this->_logger->info('Something is happening...');
        // Your event processing logic here
    }
}

Run messenger daemon

$ php shell/messenger/receiver.php --queues happenings:1:d

Once daemon is running you should see something like the following in your console:

2014-06-05T08:17:11+00:00 INFO (6): [71b43b80a8] Started queue listener: happenings:d
2014-06-05T08:17:11+00:00 INFO (6): [71b43b80a8] Established connection with Rabbit MQ server

Once message is received you should see:

2014-06-05T08:17:15+00:00 INFO (6): [71b43b80a8] Received new message: <?xml version="1.0" encoding="UTF-8"?>
<update>
   <event name="smth_happened">
      <foo>1</foo>
      <bar>2</bar>
      <baz>
         <xxx>3</xxx>
      </baz>
   </event>
</update>
2014-06-05T08:17:15+00:00 INFO (6): [70d7844c5e] Started message processing
2014-06-05T08:17:15+00:00 INFO (6): [70d7844c5e] Something is happening...
⚠️ **GitHub.com Fallback** ⚠️