amqp-plus is a wrapper for the original npm amqplib library which add more functionalities.
-
Promise based
-
Message acknowledgement functions on message itself
-
Thanks to amqp-connection-manager:
- Automatically reconnect when your amqplib broker dies in a fire.
- Round-robin connections between multiple brokers in a cluster.
- If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
npm install amqp-plus
For code snippets checkout examples folder.
const AmqpPlus = require('amqp-plus');
const amqpClient = new AmqpPlus({
urls: [
'amqp://guest:guest@firsthost:5672',
'amqp://guest:guest@secondhost:5672'
],
exchanges: [
{
name: 'ex-1', type: 'direct', durable: true, autoDelete: false
},
{
name: 'ex-2', type: 'fanout', durable: false, autoDelete: true
},
{
name: 'ex-3', type: 'topic'
}
],
queues: [
{
name: 'q-1', durable: true, exclusive: false
},
{
name: 'q-2', durable: false, autoDelete: true
},
{
name: 'q-3'
}
],
bindings: [
{
exchange: 'ex-1',
queue: 'q-1',
keys: ['key-1', 'key-2', 'key-3']
},
{
exchange: 'ex-2', queue: 'q-3'
},
{
exchange: 'ex-3', queue: 'q-2', keys: 'key.#'
}
]
});
amqpClient.on('connect', () => {
console.log('connected');
});
amqpClient.on('disconnect', (err) => {
console.error('disconnected', err);
});
amqpClient.on('channel:connect', () => {
console.log('channel connected');
});
amqpClient.on('channel:error', (error, name) => {
console.error('channel error: ', error, name);
});
amqpClient.on('channel:close', () => {
console.log('channel closed');
});
amqpClient.publish(
'ex-1',
'key-1',
{ data: 'json msg' },
{ persistent: true, expiration: 5000 }
)
.then(() => console.log('Message delivered'))
.catch((err) => console.error('Message rejected:', err));
amqpClient.publish('ex-2', '', 'string msg');
amqpClient.publish('ex-3', 'key.3', Buffer.from('buffer msg'));
amqpClient.sendToQueue(
'q-1',
{ data: 'json msg' },
{ persistent: false }
)
.then(() => console.log('Message delivered'))
.catch((err) => console.error('Message rejected:', err));
amqpClient.bulkPublish(
'ex-2',
'routing-key-is-ignored-for-fanout-exchange',
[
{ msg: 'json msg' },
'string msg',
Buffer.from('buffer msg')
]
)
.then(() => console.log('All messages delivered'))
.catch((err) => console.error('Atleast one of the messages rejected', err));
amqpClient.bulkPublish(
'ex-1',
'key-2', // All messages get sent with routing key "key-2"
[
{ msg: 'json msg' },
'string msg',
Buffer.from('buffer msg')
]
);
amqpClient.bulkPublish(
'ex-3',
['key.1', 'key.2', 'key.3'],
[
{ msg: 'msg with routing key "key.1"' },
'msg with routing key "key.2"',
Buffer.from('msg with routing key "key.3"')
]
);
amqpClient.bulkSendToQueue(
'q-3',
[
{ msg: 'json msg' },
'string msg',
Buffer.from('buffer msg')
]
)
.then(() => console.log('All messages delivered'))
.catch((err) => console.error('Atleast one of the messages rejected', err));
amqpClient.bulkSendToQueue(
['q-1', 'q-2', 'q-3'],
[
{ msg: 'msg goes to queue "q-1"' },
'msg goes to queue "q-2"',
Buffer.from('msg goes to queue "q-3"')
]
);
function consumer(msg) {
console.log(msg);
if (msg.content === 'ok') {
msg.ack(); // Successfully processed, dequeue the msg
}
if (msg.content === 'failure') {
msg.nack(); // Failed processing, requeue the msg
}
if (msg.content === 'bad') {
msg.reject(); // Bad msg, dequeue the msg
}
}
amqpClient.subscribe('q-1', consumer, { noAck: false });