RQueue is a powerful and flexible task queue library for Node.js and TypeScript. It supports features like task prioritization, task grouping, pausing and resuming, rate limiting, timeout management, error handling, and progress tracking. Designed with Clean Code and SOLID principles in mind, RQueue helps you manage task execution efficiently and effectively.
- Task Prioritization: Assign priorities to tasks to control execution order.
- Task Grouping: Group similar tasks for better management.
- Pause and Resume: Pause and resume task processing as needed.
- Rate Limiting: Control the rate of task execution to avoid overloading resources.
- Timeout Management: Set timeouts for tasks to ensure timely completion.
- Error Handling: Comprehensive error management and retry mechanisms.
- Progress Tracking: Track the progress of task execution.
- Event Emission: Detailed event emission for monitoring queue activities and task execution.
npm install @owsla/r-queue
First, create an instance of RQueue
with the desired options. You can specify options like concurrency
, autoStart
, delayMs
, rateLimit
, and timeoutMs
.
import { RQueue } from '@owsla/r-queue';
// Create a queue with a concurrency of 2
const queue = new RQueue({ concurrency: 2 });
queue.on('success', (results) => {
console.log('Tasks completed successfully:', results);
});
queue.on('error', (error) => {
console.error('Error processing tasks:', error);
});
async function task1() {
// Your async task logic
return 'Task 1 completed';
}
async function task2() {
// Your async task logic
return 'Task 2 completed';
}
// Enqueue tasks
queue.enqueue(task1);
queue.enqueue(task2);
You can assign priorities to tasks to control their execution order. Higher priority tasks will be executed first.
queue.enqueue(() => {
// High priority task
return 'High priority task completed';
}, 10); // Priority 10
queue.enqueue(() => {
// Normal priority task
return 'Normal priority task completed';
}, 1); // Priority 1
Grouping similar tasks helps in managing them together. You can process or cancel tasks based on their group.
queue.enqueue(() => {
// Group A task
return 'Group A task completed';
}, 1, 'groupA');
queue.enqueue(() => {
// Group B task
return 'Group B task completed';
}, 1, 'groupB');
You can pause the queue processing and resume it later.
queue.pause();
setTimeout(() => {
queue.resume();
}, 5000); // Resume after 5 seconds
Control the rate of task execution to avoid overloading resources. This is useful when interacting with rate-limited APIs.
const rateLimitedQueue = new RQueue({
concurrency: 2,
rateLimit: { count: 5, duration: 1000 } // Max 5 tasks per second
});
rateLimitedQueue.enqueue(async () => {
// Your async task logic
return 'Rate limited task completed';
});
rateLimitedQueue.on('rateLimitReached', (waitTime: number) => console.log(`Rate limit reached. Waiting for ${waitTime}ms`));
rateLimitedQueue.on('rateLimitReset', () => console.log('Rate limit reset'));
rateLimitedQueue.on('rateLimitCheck', (processedCount: number) => console.log(`Rate limit check. Processed count: ${processedCount}`));
Set timeouts for tasks to ensure they complete within a specified duration. If a task exceeds the timeout, it will be rejected.
const timeoutQueue = new RQueue({
concurrency: 2,
timeoutMs: 3000 // 3 seconds timeout for each task
});
timeoutQueue.enqueue(async () => {
// Your async task logic
await new Promise((resolve) => setTimeout(resolve, 4000)); // Simulating long task
return 'This will timeout';
});
Comprehensive error management allows you to handle and retry tasks when errors occur.
queue.on('error', (error) => {
console.error('Error occurred:', error.message);
});
queue.enqueue(async () => {
throw new Error('Simulated task error');
});
Track the progress of tasks in the queue, including the number of remaining and active tasks.
queue.on('progress', (progress) => {
console.log(`Tasks remaining: ${progress.remaining}, Active tasks: ${progress.active}`);
});
new RQueue(options?: RQueueOptions)
-
options
:RQueueOptions
(optional)-
concurrency
: Number of tasks to run concurrently (default: 1) -
autoStart
: Automatically start processing the queue (default: true) -
delayMs
: Delay between task executions (default: 0) -
rateLimit
: Rate limit configuration{ count: number, duration: number }
-
timeoutMs
: Timeout for each task in milliseconds
-
-
enqueue<T>(transaction: RCallback<T>, priority?: number, group?: string): void
: Add a task to the queue. -
pause(): void
: Pause the queue processing. -
resume(): void
: Resume the queue processing. -
clear(): void
: Clear all tasks in the queue. -
start(): void
: Start processing the queue.
-
length: number
: Get the number of tasks in the queue. -
processing: boolean
: Check if the queue is currently processing tasks. -
paused: boolean
: Check if the queue is paused.
-
'start'
: Emitted when processing starts. -
'success'
: Emitted when tasks complete successfully. -
'error'
: Emitted when an error occurs. -
'end'
: Emitted when processing ends. -
'drain'
: Emitted when the queue is empty. -
'pause'
: Emitted when the queue is paused. -
'resume'
: Emitted when the queue is resumed. -
'progress'
: Emitted to track progress. -
'rateLimitReached'
: Emitted when the rate limit is reached. -
'rateLimitReset'
: Emitted when the rate limit is reset. -
'rateLimitCheck'
: Emitted to check the number of processed tasks.
MIT
Feel free to open issues or submit pull requests for new features, bug fixes, or improvements.
Thanks to all the contributors who helped make RQueue a great project!