Link Caching - noodlefrenzy/node-amqp10 GitHub Wiki

This provides opt-in functionality to reuse existing links if createSender or createReceiver is called with the same link name and options. It depends on the object-hash module.

link-cache.js:

'use strict';
var _ = require('lodash'),
    Promise = require('bluebird'),
    hash = require('object-hash');

function LinkCache(client) {
  this._client = client;
  this._receiverLinks = {};
  this._senderLinks = {};
}

LinkCache.prototype.createSender = function(address, options) {
  return this._createLink(address, options, this._senderLinks, 'createSender');
};

LinkCache.prototype.createReceiver = function(address, options) {
  return this._createLink(address, options, this._receiverLinks, 'createReceiver');
};

LinkCache.prototype._createLink = function(address, options, container, method) {
  var linkHash = hash({ address: address, options: options });
  if (_.has(container, linkHash)) {
    return Promise.resolve(container[linkHash]);
  }

  var linkPromise = this._client[method](address, options)
    .then(function(link) {
      link.once('detached', function() { delete container[linkHash]; });
      container[linkHash] = link;
      return link;
    });

  container[linkHash] = linkPromise;
  return linkPromise;
};

module.exports = LinkCache;

link-cache.test.js:

'use strict';

var LinkCache = require('../lib/link-cache'),
    AMQPClient = require('amqp10').Client,
    config = require('./config'),
    expect = require('chai').expect;

var test = {};
describe('LinkCache', function() {
  beforeEach(function() {
    if (!!test.client) delete test.client;
    if (!!test.cache) delete test.cache;

    test.client = new AMQPClient();
    test.cache = new LinkCache(test.client);
  });

  afterEach(function() {
    return test.client.disconnect().then(function() {
      delete test.client;
      delete test.cache;
    });
  });

  [
    { description: 'sender links', method: 'createSender' },
    { description: 'receiver links', method: 'createReceiver' }
  ].forEach(function(testCase) {
    it('should return cached ' + testCase.description, function() {
      return test.client.connect('amqp://' + config.amqpServer)
        .then(function() {
          return Promise.all([
            test.cache[testCase.method]('amq.topic'),
            test.cache[testCase.method]('amq.topic'),
            test.cache[testCase.method]('amq.topic')
          ]);
        })
        .spread(function(link1, link2, link3) {
          expect(link1).to.eql(link2);
          expect(link1).to.eql(link3);
          expect(link2).to.eql(link3);
        });
    });

    it('should return different ' + testCase.description + ' based on address/options', function() {
      return test.client.connect('amqp://' + config.amqpServer)
        .then(function() {
          return Promise.all([
            test.cache[testCase.method]('amq.topic'),
            test.cache[testCase.method]('amq.topic', { attach: { receiverSettleMode: false } }),
            test.cache[testCase.method]('amq.topic/testing')
          ]);
        })
        .spread(function(link1, link2, link3) {
          expect(link1).to.not.eql(link2);
          expect(link1).to.not.eql(link3);
          expect(link2).to.not.eql(link3);
        });
    });
  });

});