Exchange class - abreits/amqp-ts GitHub Wiki

Exchange class

The Exchange class defines an AMQP exchange. Normally only created from within a connection with declareExchange().

Contains

methods
properties

Detailed reference

A detailed reference explaining the meaning and use of each method and property.

constructor

constructor ( connection: Connection,
              name: string,
              type?: string,
              options?: Exchange.DeclarationOptions)

Creates an exchange for a connection. Normally only called from within a connection with declareExchange().

parameters
  • connection: Connection : Connection this exchange is declared for
  • name: string : exchange name.
  • type?: string : exchange type, a valid AMQP exchange type name.
  • options?: Exchange.DeclarationOptions : exchange options as defined in amqplib. An extra exchange option has been added in v1.3: noCreate, this connects to an already existing AMQP exchange name, ignoring the exchange type and all other declaration options.
example
// normally not used directly, but from a connection
connection.declareExchange("exchangeName", "amq.topic", {durable: false});
// calls internally
var exchange = new Exchange(connection, "exchangeName", "amq.topic", {durable: false});
// expect an existing exchange
connection.declareExchange("existingExchangeName", "", {noCreate: true});

exchange.delete

exchange.delete (): Promise<void>

Delete the exchange

result
  • Promise<void> : promise that resolves when the exchange is deleted (or an error has occurred).
example
exchange.delete().then(() => {
    // do things when the exchange is deleted
});

exchange.close

exchange.close (): Promise<void>

Close the exchange only in amqp-ts, does not delete a persistent exchange

result
  • Promise<void> : promise that resolves when the exchange is closed (or an error has occurred).
example
exchange.delete().then(() => {
    // do things when the exchange is deleted
});

exchange.bind

exchange.bind ( source: Exchange,
                pattern?: string,
                args?: any)
              : Promise<void>

Bind this exchange to another exchange (RabbitMQ extension).

parameters
  • source: Exchange : source exchange this exchange is connected to.
  • pattern?: string : pattern that defines which messages will be received, defaults to "".
  • args?: any : object containing extra arguments that may be required for the particular exchange type
result
  • Promise<Binding> : promise that resolves when the binding is initialized
example
// normal use
destExchange.bind(sourceExchange);

// less frequently used, but may be useful in certain situations
destExchange.bind(sourceExchange).then((binding) => {
    // do things when the binding is initialized
});

exchange.unbind

exchange.unbind ( source: Exchange,
                  pattern?: string,
                  args?: any)
                : Promise<void>

Remove binding.

parameters
  • source: Exchange : source exchange this exchange is connected to.
  • pattern?: string : pattern that defines which messages will be received, defaults to "".
  • args?: any : object containing extra arguments that may be required for the particular exchange type
result
  • Promise<Binding> : promise that resolves when the binding is removed
example
destExchange.unbind(sourceExchange).then(() => {
    // do things when the binding is removed
});

exchange.send

exchange.send ( message: Message,
                routingKey?: string)
              : void
parameters
  • message: Message : the [message](Message class) to be sent to the exchange.
  • routingKey?: string : routing key for the message, defaults to "".
example
import * as Amqp from "amqp-ts";

var message = new Amqp.Message("ExampleMessageString");
exchange.send(message);

exchange.rpc

exchange.rpc ( requestParameters: any,
                routingKey = "")
              : Promise<Message>

Execute a RabbitMQ 'direct reply-to' remote procedure call. The return type of this method has changed in version 0.14. It now returns the full message object instead of just the processed message content.

parameters
  • requestParameters: any : the rpc parameters to be sent to the exchange. the following preprocessing takes place if it is a
  • Buffer : send the content as is (no preprocessing)
  • string : create a Buffer from the string and send that buffer
  • everything else : create a Buffer from the to JSON converted object and, if not defined, set the contentType option to "application/json"
  • routingKey?: string : routing key for the message, defaults to "".
result
  • Promise<Message> : promise that resolves when the result is received
example
exchange.rpc("Parameters").then((result) => {
    console.log("Rpc result: " + result.getContent());
});

exchange.consumerQueueName

exchange.consumerQueueName (): string

Returns a meaningfull unique name for the default consumer queue of the exchange. The default unique names generated by RabbitMQ are rather cryptic for an administrator, this can help.

exchange.activateConsumer

exchange.activateConsumer ( onMessage: (msg: Message) => any,
                            options?: Queue.ActivateConsumerOptions)
                          : Promise<void>

Define the function that can process messages for this exchange. Only one consumer can be active per exchange. Under water it creates a consumer queue with consumerQueueName that is bound to the exchange, from which the messages are read.

parameters
  • onMessage: (msg: Message) => any : function that processes the messages.
  • options?: Queue.ActivateConsumerOptions : consumer options as defined in amqplib.
result
  • Promise<any> : promise that resolves when the consumer is started
example
// 'simple' consumer
queue.activateConsumer((msg) => {
    console.log(msg.getContent()); // the preprocessed content of the message received
}, {noAck: true});

// message consumer example
function consumerFunction(msg) {
    console.log(msg.getContent());
    console.log(msg.fields);
    console.log(msg.properties);
    msg.ack();
}
queue.activateConsumer(consumerFunction);

// simple rpc server
queue.activateConsumer((msg) => {
    var rpcParameters = msg.getContent();
    return rpcParameters.value;
});
// rpc client
var param = {
    name: "test",
    value: "This is a test!"
}
queue.rpc(param).then((result) => {
    console.log(result.getContent()); // should result in 'This is a test!'
}

// rpc server that returns a Message
queue.activateConsumer((msg) => {
    var rpcParameters = msg.getContent();
    return new Amqp.Message(rpcParameters.value, {});
});
// rpc client
var param = {
    name: "test",
    value: "This is a test!"
}
queue.rpc(param).then((result) => {
    console.log(result.getContent()); // should result in 'This is a test!'
}

exchange.stopConsumer

exchange.stopConsumer (): Promise<void>

Stops the consumer function and deletes the queue and binding created in startConsumer.

result
  • Promise<any> : promise that resolves when the consumer is stopped
example
exchange.stopConsumer();

exchange.initialized:

exchange.initialized: Promise<Exchange.InitializeResult>

indicates whether the exchange initialization is resolved (or rejected)

example
exchange.initialized.then((result) => {
    console.log("Exchange initialized: " + result.exchange);
    // stuff to do
}
exchange.initialized.catch((err) => {
    // something went wrong
}

exchange.name:

exchange.name: string

name of the exchange (read only)

exchange.type:

exchange.type: string

type of the exchange (read only)

Deprecated methods and properties

exchange.publish - deprecated: use send instead!

exchange.publish ( content: any,
                    routingKey?: string,
                    options?: any)
                  : void

Publish a message to an exchange

parameters
  • content: any : the content to be sent to the exchange. the following preprocessing takes place if it is a
  • Buffer : send the content as is (no preprocessing)
  • string : create a Buffer from the string and send that buffer
  • everything else : create a Buffer from the to JSON converted object and, if not defined, set the contentType option to "application/json"
  • routingKey?: string : routing key for the message, defaults to "".
  • options?: any : publish options as defined in amqplib.
example
exchange.publish("ExampleMessageString");

exchange.startConsumer - deprecated: use activateConsumer instead!

exchange.startConsumer ( onMessage: (msg: any, channel?: AmqpLib.Channel) => any,
                          options?: Queue.StartConsumerOptions)
                        : Promise<void>

Define the function that can process messages for this exchange. Only one consumer can be active per exchange. Under water it creates a consumerqueue with consumerQueueName that is bound to the exchange, from which the messages are read.

parameters
  • onMessage: (msg: any, channel?: AmqpLib.Channel) => any : function that processes the messages.
  • options?: Queue.StartConsumerOptions : consumer options as defined in amqplib. An extra property rawMessage has been added to allow more low level message processing, see [queue.startConsumer](Queue class#startConsumer) for more details.
result
  • Promise<any> : promise that resolves when the consumer is started
example
exchange.startConsumer((msg) => {
    console.log(msg);
};
⚠️ **GitHub.com Fallback** ⚠️