Queue class - abreits/amqp-ts GitHub Wiki
The Queue class defines an AMQP queue. Normally only created from within a connection with declareQueue()
.
- constructor
- delete
- close
- bind
- unbind
- send
- rpc
- activateConsumer
- stopConsumer
- prefetch
- recover
- publish - deprecated, use send instead!
- startConsumer - deprecated, use activateConsumer instead!
A detailed reference explaining the meaning and use of each method and property.
constructor ( connection: Connection,
name: string,
type?: string,
options?: Queue.StartConsumerOptions)
Creates an queue for a connection. Normally only called from within a connection with
declareQueue()
.
connection: Connection
: Connection this queue is declared forname: string
: queue name.options?: Queue.StartConsumerOptions
: queue options as defined in amqplib. An extra queue declaration option has been added in v1.3:noCreate
, this connects to an already existing AMQP queuename
, ignoring all other declaration options.// normally not used directly, but from a connection connection.declareQueue("queueName", {durable: false}); // calls internally var queue = new Queue(connection, "queueName", {durable: false}); // expect an existing queue connection.declareQueue("existingQueueName", {noCreate: true});
queue.delete (): Promise<void>
Delete the queue
Promise<void>
: promise that resolves when the queue is deleted (or an error has occurred).queue.delete().then(() => { // do things when the queue is deleted });
queue.close (): Promise<void>
Close the queue only in amqp-ts, does not delete a persistent queue
Promise<void>
: promise that resolves when the queue is closed (or an error has occurred).queue.delete().then(() => { // do things when the queue is deleted });
queue.bind ( source: Exchange,
pattern?: string,
args?: any)
: Promise<void>
Bind this queue to an exchange.
source: Exchange
: source exchange this queue is connected to.pattern?: string
: pattern that defines which messages will be received, defaults to""
.args?: any
: object containing extra arguments that may be required for the particular exchange type
Promise<Binding>
: promise that resolves when the binding is initialized// normal use destQueue.bind(sourceExchange); // less frequently used, but may be useful in certain situations destQueue.bind(sourceExchange).then((binding) => { // do things when the binding is initialized });
queue.unbind ( source: Exchange,
pattern?: string,
args?: any)
: Promise<void>
Remove binding.
source: Exchange
: source exchange this queue is connected to.pattern?: string
: pattern that defines which messages will be received, defaults to""
.args?: any
: object containing extra arguments that may be required for the particular exchange type.
Promise<Binding>
: promise that resolves when the binding is removed.destQueue.unbind(sourceExchange).then(() => { // do things when the binding is removed });
queue.send ( message: Message,
routingKey?: string)
: void
message: Message
: the [message](Message class) to be sent to the queue.routingKey?: string
: routing key for the message, defaults to""
.import * as Amqp from "amqp-ts"; var message = new Amqp.Message("ExampleMessageString"); queue.send(message);
queue.rpc ( requestParameters: any): Promise<Message>
Execute a RabbitMQ 'direct reply-to' remote procedure call. The return type of this method has changed in version 0.14. It now returns the full message object instead of just the processed message content.
requestParameters: any
: the rpc parameters to be sent to the exchange. the following preprocessing takes place if it is a- Buffer : send the content as is (no preprocessing)
- string : create a Buffer from the string and send that buffer
- everything else : create a Buffer from the to JSON converted object and, if not defined, set the contentType option to
"application/json"
Promise<Message>
: promise that resolves when the result is receivedqueue.rpc("Parameters").then((result) => { console.log("Rpc result: " + result.getContent()); });
queue.activateConsumer ( onMessage: (msg: Message) => any,
options?: Queue.ActivateConsumerOptions)
: Promise<void>
Define the function that can process messages for this queue. Only one consumer can be active per queue.
onMessage: (msg: Message) => any
: function that processes the messages. For RPC calls, the return value of this function is returned to the RPC caller, if the function returns aPromise
, the resolved result will be returned.options?: Queue.ActivateConsumerOptions
: consumer options as defined in amqplib.
Promise<any>
: promise that resolves when the consumer is started// 'simple' consumer queue.activateConsumer((msg) => { console.log(msg.getContent()); // the preprocessed content of the message received }, {noAck: true}); // message consumer example function consumerFunction(msg) { console.log(msg.getContent()); console.log(msg.fields); console.log(msg.properties); msg.ack(); } queue.activateConsumer(consumerFunction); // simple rpc server queue.activateConsumer((msg) => { var rpcParameters = msg.getContent(); return rpcParameters.value; }); // rpc client var param = { name: "test", value: "This is a test!" } queue.rpc(param).then((result) => { console.log(result.getContent()); // should result in 'This is a test!' } // rpc server that returns a Message queue.activateConsumer((msg) => { var rpcParameters = msg.getContent(); return new Amqp.Message(rpcParameters.value, {}); }); // rpc client var param = { name: "test", value: "This is a Message test!" } queue.rpc(param).then((result) => { console.log(result.getContent()); // should result in 'This is a Message test!' } // rpc server that returns a Promise queue.activateConsumer((msg) => { var rpcParameters = msg.getContent(); return new Promise((resolve, reject) => { setTimeout(() => { resolve(rpcParameters.value); }, 1000); }); }); // rpc client var param = { name: "test", value: "This is a Promise test!" } queue.rpc(param).then((result) => { console.log(result.getContent()); // should result in 'This is a Promise test!' }
queue.stopConsumer (): Promise<void>
Stops the consumer function and deletes the queue and binding created in startConsumer.
Promise<any>
: promise that resolves when the consumer is stoppedqueue.stopConsumer();
queue.prefetch (count: number): void
Set the prefetch count for this queue. The count given is the maximum number of messages sent to the queue that can be awaiting acknowledgement. Once there are count messages outstanding, the server will not send more messages until one or more have been acknowledged.
queue.prefetch(50);
queue.recover (): Promise<void>
Requeue unacknowledged messages on this channel.
Promise<void>
: promise that resolves when all messages are requeued.queue.recover();
queue.initialized: Promise<Queue.InitializeResult>
indicates whether the queue initialization is resolved (or rejected)
queue.initialized.then((result) => { console.log("Queue initialized!"); console.log("Queue name", result.name); console.log("Queue messageCount", result.messageCount); console.log("Queue consumerCount", result.consumerCount); // stuff to do } queue.initialized.catch((err) => { // something went wrong }
queue.name: string
name of the queue (read only)
queue.publish - deprecated, use send instead!
queue.publish ( content: any,
routingKey?: string,
options?: any)
: void
Publish a message to an queue.
content: any
: the content to be sent to the queue. the following preprocessing takes place if it is a:- Buffer : send the content as is (no preprocessing).
- string : create a Buffer from the string and send that buffer.
- everything else : create a Buffer from the to JSON converted object and, if not defined, set the contentType option to
"application/json"
.routingKey?: string
: routing key for the message, defaults to""
.options?: any
: publish options as defined in amqplib.queue.publish("ExampleMessageString");
queue.startConsumer - deprecated, use activateConsumer instead!
queue.startConsumer ( onMessage: (msg: any, channel?: AmqpLib.Channel) => any,
options?: Queue.StartConsumerOptions)
: Promise<void>
Define the function that can process messages for this queue. Only one consumer can be active per queue. Under water it creates a consumerqueue with consumerQueueName that is bound to the queue, from which the messages are read.
onMessage: (msg: any, channel?: AmqpLib.Channel) => any
: function that processes the messages. If therawMessage
option is set to true in the options, the 'raw' message, as defined in amqplib, is sent and the amqplib channel is passed as an extra parameter to allow acknowledgement of messages. Otherwise extra processing is done:- the raw message content is converted from a buffer to a string or Object and sent as the msg parameter.
- if the raw message contains a 'replyTo' property, the result of the onMessage function is sent back to a queue with that name (used with rpc).
options?: Queue.StartConsumerOptions
: consumer options as defined in amqplib, an extra propertyrawMessage
has been added to allow more low level message processing.
Promise<any>
: promise that resolves when the consumer is started// 'normal' consumer queue.startConsumer((msg) => { console.log(msg); // receives the processed content of the message sent }); // 'raw' message consumer example function rawConsumerFunction(msg, channel) { console.log(msg.content); console.log(msg.fields); console.log(msg.properties); channel.ack(msg); } queue.startConsumer(rawConsumerFunction, {rawMessage: true}); // rpc server queue.startConsumer((rpcParameters) => { return rpcParameters.value; }); // rpc client var param = { name: "test", value: "This is a test!" } queue.rpc(param).then((result) => { console.log(result); // should result in 'This is a test!' }