A Rate-Limited Queue implementation in Node.js/TypeScript.
The class RateLimitedQueue
exposes methods for processing asynchronous tasks concurrently with a rate limit.
The class depends on an instance of ITokenBucket
being provided. The ITokenBucket
interface implements the rate limiting strategy used to govern the concurrent execution of tasks in the queue. The queue exposes methods for enqueueing tasks, as well as requesting immediate execution.
import { InMemoryBucket, RateLimitedQueue } from 'more-slowly'
// 10 requests/sec
const rate = { capacity: 10, intervalMilliseconds: 1000 }
const tokenBucket = new InMemoryBucket(rate)
const queue = new RateLimitedQueue({ tokenBucket })
Adds a task to the queue for processing. The task is guaranteed to execute as long as the queue is drained.
const task: AsyncTask = async () => {
console.log('watermelons!')
}
queue.enqueue(task)
Option | Required? | Type |
---|---|---|
task | Yes | Function - A function that accepts no arguments and returns a Promise |
options | No | IEnqueueOptions - see below |
IEnqueueOptions
Options that control the execution of the task. Setting maxRetries
to any number means that the task may not execute if it is throttled more than maxRetries
times. retryIntervalMilliseconds
controls the rate at which the task will poll the queue
Option | Type |
---|---|
maxRetries | number |
retryIntervalMilliseconds | number |
Attempts to execute a task immediately by requesting a token from the tokenBucket
. Throws ThrottleError
if a token could not be obtained.
const task: AsyncTask = async () => {
console.log('watermelons!')
}
await queue.immediate(task)
Returns a promise that resolves once all enqueued tasks have settled.
const task: AsyncTask = async () => {
console.log('watermelons!')
}
await queue.enqueue(task).drain()
RateLimitedQueue
extends the EventEmitter
class and emits the following events:
Event | Handler Type |
---|---|
error |
(err: Error) => void |
RateLimitedQueue
accepts instances of any object that implements ITokenBucket
:
export interface ITokenBucket extends EventEmitter {
/**
* The maximum number of concurrent tasks to be processed
*/
capacity: number
/**
* The amount of time during which no more than `concurrency`
* tasks can be processed, in milliseconds
*/
intervalMilliseconds: number
/**
* Consume `n` tokens, if available. If the bucket does not
* have sufficient capacity, should throw a `ThrottleError`
*/
consume(n?: number): Promise<void>
/**
* Starts the drip of tokens out of the bucket
*/
start(): Promise<void>
/**
* Stops the drip of tokens out of the bucket
*/
stop(): Promise<void>
}