queue-adapter-pkg
TypeScript icon, indicating that this package has built-in type declarations

0.1.77 • Public • Published

QueueAdapter

A generic adapter for interacting with message queues using AMQP (RabbitMQ). The QueueAdapter class provides functionality to connect to an AMQP provider, consume messages from a queue, handle retries, and manage dead-letter queues (DLQ). It supports automatic reconnection, message acknowledgment, and logging with customizable options.

Features

  • AMQP Connectivity: Connects to AMQP brokers (such as RabbitMQ) and manages message queues.

  • Retries: Automatic retries for failed messages with exponential backoff.

  • Dead Letter Queue (DLQ): Support for dead-letter exchanges and queues.

  • Logging: Customizable logging via the ILogger interface.

  • Message Timeout: Support for message processing timeouts.

  • Auto-queue Creation: Option to automatically create queues and dead-letter queues.

  • Graceful Shutdown: Handles connection errors and reconnections automatically.

Installation

To install the package, run:

npm install <package-name>
Usage
1. Logger Setup
Set up your custom logger that will be used for logging queue-related activities. Implement the ILogger interface for your preferred logging mechanism:

typescript
Copy
const baseLogger: ILogger = {
  debug: (message, metadata) => console.debug(message, metadata),
  info: (message, metadata) => console.info(message, metadata),
  warn: (message, metadata) => console.warn(message, metadata),
  error: (message, metadata) => console.error(message, metadata),
};
2. Create Queue Adapter
Create an instance of the QueueAdapter by using the createQueueAdapter function. Pass in the necessary configuration options like the queue name, provider URL, and logging instance:

typescript
Copy
import { createQueueAdapter } from "@/adapters/queueAdapter";
import { ILogger } from "@/types/ILogger";
import { IMessage } from "@/types/IMessage";

const queueName = "myQueue";
const queueProviderUrl = "amqp://localhost"; // Example URL for RabbitMQ
const isToHandleDLQ = true;
const isToEnableRetry = true;
const isToAutoCreateQueues = true;

const queueAdapter = createQueueAdapter<IMessage>(
  queueName,
  queueProviderUrl,
  isToHandleDLQ,
  isToEnableRetry,
  isToAutoCreateQueues,
  baseLogger
);
3. Message Handler Logic
Define your message handler logic. The handler will be called every time a new message is received:

typescript
Copy
async function handleMessage(msgContent: IMessage, headers: any) {
  baseLogger.info("Received message", { msgContent, headers });

  try {
    if (msgContent.Subject === "Special Message") {
      baseLogger.info("Special handling for this message", { msgContent });
    }
  } catch (err) {
    baseLogger.error("Error processing message", { error: err, msgContent });
    throw err;  // Trigger retry if needed
  }
}
4. Start Listening for Messages
Start the queue listener to continuously listen for incoming messages and process them using the handler:

typescript
Copy
async function startQueueListener() {
  await queueAdapter.startListening(async (msgContent, headers) => {
    await handleMessage(msgContent, headers);
  });
  baseLogger.info(`Listening on RabbitMQ queue: ${queueName}`);
}
5. Process Dead Letter Queue (DLQ)
Optionally, you can handle messages from the Dead Letter Queue (DLQ). This is useful for dealing with failed messages or messages that couldn’t be processed successfully:

typescript
Copy
async function startDLQListener() {
  if (isToHandleDLQ) {
    await queueAdapter.processDLQ(async (msgContent, headers) => {
      baseLogger.warn("Processing DLQ message", { msgContent, headers });
    });
  }
}
6. Start Queue and DLQ Listeners
Start both the main queue listener and the DLQ listener:

typescript
Copy
startQueueListener();
startDLQListener();
Customizing Queue Options
You can customize several options when creating the QueueAdapter, such as message timeouts, retry counts, and DLQ settings. Here’s how you can define custom queue options:

typescript
Copy
const queueOptions: IQueueAdapterOptions = {
  prefetchCount: 10,
  messageTimeout: 30000,
  maxRetries: 3,
  enableDeadLetter: true,
  messageTtl: 1000 * 1000 * 60 * 60 * 24,
  deadLetterTtl: 1000 * 60 * 60 * 24 * 7,
};
Pass these options as the last argument when creating the QueueAdapter instance:

typescript
Copy
const queueAdapter = createQueueAdapter<IMessage>(
  queueName,
  queueProviderUrl,
  isToHandleDLQ,
  isToEnableRetry,
  isToAutoCreateQueues,
  baseLogger,
  queueOptions // Custom options
);
API Reference
createQueueAdapter<T>(queueName, queueProviderUrl, isToHandleDLQ, isToEnableRetry, isToAutoCreateQueues, logger, options?)
Parameters:
queueName (string): The name of the queue.
queueProviderUrl (string): The URL of the AMQP provider (e.g., RabbitMQ URL).
isToHandleDLQ (boolean): Whether to handle Dead Letter Queue (DLQ).
isToEnableRetry (boolean): Whether to enable retry mechanism.
isToAutoCreateQueues (boolean): Whether to automatically create the queues.
logger (ILogger): The logger instance for logging queue operations.
options (IQueueAdapterOptions, optional): Custom queue options (e.g., retries, timeouts).
Returns:
An instance of the QueueAdapter class.
startListening(onMessage)
Starts listening for messages from the queue.

Parameters:
onMessage (function): The callback function to handle each incoming message.
processDLQ(onMessage)
Processes messages from the Dead Letter Queue (DLQ).

Parameters:
onMessage (function): The callback function to handle each DLQ message.
Contributing
We welcome contributions to this project! If you'd like to contribute, please fork the repository, make your changes, and submit a pull request. Ensure that your changes are well-tested and documented.

License
This project is licensed under the MIT License - see the LICENSE file for details.

// Publishing the message to the queue async function publishMessage() { try { // Initialize the connection and setup await queueAdapter.initializeConnection();

// Publish the message
await queueAdapter.channel.publish("", "myQueue", Buffer.from(message), {
  persistent: true, // Make the message persistent
  headers: {
    "x-retry-count": 0, // Initial retry count
    "x-first-failed-at": new Date().toISOString(),
  },
});

console.log("Message published to the queue successfully.");

} catch (error) { console.error("Failed to publish message:", error); } }

Readme

Keywords

Package Sidebar

Install

npm i queue-adapter-pkg

Weekly Downloads

1

Version

0.1.77

License

MIT

Unpacked Size

51.4 kB

Total Files

14

Last publish

Collaborators

  • eliltz