The RabbitMQConsumer is a lightweight Node.js class that simplifies consuming messages from RabbitMQ queues, providing a reliable mechanism for handling message retries, dead-lettering, and graceful connection management. The class abstracts the complexity of RabbitMQ setup, connection management, and message consumption, allowing developers to focus on processing business logic.
- Persistent Connection: Automatically establishes and maintains a persistent RabbitMQ connection, reconnecting if the connection drops.
- Retry Logic with Exponential Backoff: Automatically retries failed messages with exponential backoff up to a configurable number of retries.
- Dead-Letter Queue: Supports dead-lettering for messages that have exceeded the retry limit.
- Graceful Connection Handling: Manages RabbitMQ connections and channels, including graceful shutdown and error recovery.
- Durable Queues: Ensures queues are durable, making the messages persistent across RabbitMQ restarts.
- Node.js: The core runtime environment for building the package.
- RabbitMQ: A message broker for sending messages between microservices.
- amqplib: A Node.js library for working with RabbitMQ.
You can install the package via npm by running the following command:
npm install @heavybit/hb-queue-consumer-package
- Node.js version 12 or higher.
- RabbitMQ installed locally or accessible via a cloud instance.
Here’s how to integrate the RabbitMQConsumer package into your project:
- Import the Package
To start using RabbitMQConsumer, import it into your project:
const RabbitMQConsumer = require('@heavybit/hb-queue-consumer-package');
- Initialize the Consumer
Create an instance of RabbitMQConsumer with required options like queue name, dead-letter queue, maximum retries, and retry backoff time:
const consumer = new RabbitMQConsumer({
queue: 'yourQueue',
deadLetterQueue: 'yourDeadLetterQueue',
maxRetries: 5, // optional, default is 3
retryBackoff: 2000 // optional, default is 1000ms
});
- Start Consuming Messages Start consuming messages from the RabbitMQ queue by providing a callback function that processes each message.
(async () => {
try {
await consumer.consume(async (message) => {
// Business logic to process the message
console.log('Message received:', message);
});
} catch (error) {
console.error('Failed to start consuming messages:', error);
}
})();
- Configuring RabbitMQ Connection The consumer automatically connects to RabbitMQ using the environment variables:
RABBITMQ_HOST
RABBITMQ_PORT
RABBITMQ_USER
RABBITMQ_PASSWORD
Here's an example .env file configuration:
# Example environment variables
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
If a message fails to be processed, it will be retried according to the retry configuration with exponential backoff. Once the maximum retry attempts are exhausted, the message will be sent to the dead-letter queue.
// Example of message retry logic with dead-lettering
consumer.handleRetry(msg);
Ensure a clean shutdown by closing the RabbitMQ connection and channel when your application stops:
process.on('SIGINT', async () => {
console.log('Shutting down consumer...');
await consumer.close();
process.exit(0);
});
Here's an example of how you can integrate the RabbitMQConsumer into a microservice using Express:
const express = require('express');
const RabbitMQConsumer = require('@yourorg/rabbitmq-consumer');
const app = express();
const consumer = new RabbitMQConsumer({
queue: 'taskQueue',
deadLetterQueue: 'taskQueue.deadLetter',
maxRetries: 5,
retryBackoff: 2000
});
app.use(express.json());
app.post('/process-task', async (req, res) => {
try {
await consumer.consume(async (message) => {
// Process the message
console.log('Task processed:', message);
});
res.status(200).send('Task processing started');
} catch (error) {
console.error('Failed to start task processing:', error);
res.status(500).send('Error occurred');
}
});
app.listen(3000, () => {
console.log('Microservice running on port 3000');
});
RabbitMQConsumer
constructor(options)
Creates an instance of RabbitMQConsumer
.
- Options
-
queue
: The name of the RabbitMQ queue from which to consume messages. -
deadLetterQueue
: The name of the dead-letter queue for failed messages. -
maxRetries
: (Optional) The maximum number of retry attempts for failed messages. Defaults to 3. -
retryBackoff
: (Optional) The base retry backoff time in milliseconds. Defaults to 1000ms.
-
async connect()
Establishes a connection to RabbitMQ and creates a channel for message consumption. Automatically called during message consumption.
- Returns:
Promise<void>
- Throws: Error if the connection fails.
async consume(callback)
Begins consuming messages from the queue, calling the provided callback function to process each message.
-
Parameters:
- callback: A function that processes received messages. Should return a Promise.
- Returns: Promise
- Throws: Error if consumption fails.
async handleRetry(msg)
Handles message retries using exponential backoff. If the retry count exceeds the maximum retries, the message is moved to the dead-letter queue.
-
Parameters:
- msg: The message object to retry.
-
Returns:
Promise<void>
async close() Closes the RabbitMQ connection and channel gracefully.
- Returns:
Promise<void>
- Throws: Error if closing the connection or channel fails.
RabbitMQConsumer.publishToQueue(queueName, data, retries = 3, initialDelay = 1000)
Publishes a message to the specified RabbitMQ queue with retry logic and exponential backoff.
Parameters:
-
queueName (string)
: The name of the RabbitMQ queue. -
data (Object)
: The message payload to be sent to the queue. -
retries (number)
: The maximum number of retry attempts (default is 3). -
initialDelay (number)
: The initial delay (in milliseconds) before retrying (default is 1000 ms).
Returns:
Promise<void>
Throws:
- Error if queueName or data is missing.
- Error after all retry attempts fail.
RabbitMQConsumer.close()
Closes the RabbitMQ channel and connection. This method is automatically called after message publishing is completed. You can also call it manually if needed.
Returns:
Promise<void>