redis-delayed-tasks

2.0.1 • Public • Published

redis-delayed-tasks

Node.js CI Coverage Status Dependencies

This node module allows for the simple future execution of tasks utilizing redis as a datastore. It takes away the need to mess with redis or some other message queue and provides a way to do something in the future in a distributed environment.

The task callback is unaware of the context of your codebase. Instead of calling a specific callback function per task, this module calls the same callback function for each task, relying on you to route it to the appropriate place.

Possible use cases include:

  • Retry logic (with or without backoff).
  • Delaying a task wherein using setTimeout introduces a risk if the application crashes.
  • Distributing future tasks across multiple workers.

Installation

npm i redis-delayed-tasks

Upgrading from v1 to v2

Version 2 does not introduce any new features, but rather uses node-redis v4 under the hood and therefore could incorporate breaking changes if you're using an older version of the redis library elsewhere. The library is instantiated the same way and just requires an additional line afterwards: await dt.connect().

Usage

Create a new DelayedTasks object:

const { DelayedTasks } = require('redis-delayed-tasks');

const dt = new DelayedTasks({
  id: 'delayed-queue-1',
  redis: {
    host: '127.0.0.1',
    port: 6379
  },
  callback: (data, taskId, dueTime) => {
    // `data` is the JSON.stringify-able data provided when the task was added
    // `taskId` is the generated ID of the task to process
    // `dueTime` is the epoch (milliseconds) of when the task was due
  }
});

// Connect to redis (new in v2)
await dt.connect()

// Start polling
dt.start();

const newTaskId = await dt.add(2000, { foo: 'bar' });

// Don't forget to clean up later: `dt.stop()`

Example: HTTP Retry

Handling HTTP retries is a common use case that requires waiting a certain amount of time before retrying a request. You could use setTimeout, but if the application dies before the timeout function is called, you lose the request entirely.

Instead, we'll use our DelayedTasks queue to delay the task in a persisted manner until it's time to retry again.

const { DelayedTasks } = require('redis-delayed-tasks');

const dt = new DelayedTasks({
  id: 'http-retry',
  redis: {
    host: '127.0.0.1',
    port: 6379
  },
  callback: function (data, taskId, dueTime) {
    // A task is ready to be tried again
    try {
      request(data);

    } catch (e) {
      // It failed again, try in another 5 seconds
      this.add(5000, data);
    }
  }
});

// Connect redis
await dt.connect()

// Start polling
dt.start();

try {
  // `request` would be your http request library of choice
  request({
    method: 'POST',
    url: '/foo',
    data: { foo: 'bar' }
  });

} catch (e) {
  // There was an error, try again in 5 seconds
  this.add(5000, data);
}

// Don't forget to clean up later: `dt.stop()` or `dt.close()`, as applicable

Methods

constructor

The constructor takes a single object. Properties are as follows:

Property Description Required Default
id ID of the queue. This is used as a redis key, so it should be shared amongst any workers that operate within the same group. Think of it as a "consumer group" id. Yes
redis An existing redis client to use OR connection settings for a new redis client1. Yes
callback The function to call when tasks are due.

When a task is due or past-due, your callback method is called asynchronously, passing the data you provided when adding, the generated taskId, and the time (in ms) that the task was due.

The context of this is the DelayedTasks object.
Yes
options.pollIntervalMs How often to poll redis for tasks due (in milliseconds). The shorter the interval, the sooner after being due a task will be processed, but the more load in redis. No 1000

1 This module uses node-redis version 4 under the hood in legacy mode. If you provide your own client, you're responsible for connecting, disconnecting, and creating an error handler.

Connect

If you provided an object with options for connecting to redis, you must call await dt.connect() to connect to redis before doing anything else. If you've provided an existing, connected redis client, this is not necessary.

start / stop polling

To begin polling for tasks, call dt.start(). This returns a boolean with the status of starting. If false, it's because the redis client hasn't been connected yet. If this was a self-supplied client, call await client.connect(). Otherwise, call await dt.connect() to create the connection.

Call dt.stop() to stop future polling.

close()

Calling await dt.close() will stop polling. If a new redis client was created for the object instance (that is, it was passed an object of configuration details), that redis client will be closed, too (using disconnect() to abort pending requests). If you passed an existing redis client to the constructor, it will be left open and you'll have to close it yourself when you're finished with it.

This returns a promise that resolves when the client connection is confirmed closed.

If you just want to stop polling, but leave the connection open, call dt.stop() instead.

add(delayMs, data)

Adds a task to be executed delayMs millseconds in the future. data can be any JSON.stringify-able data that will get passed to callback when the task is due.

This function returns a promise that resolves to a generated UUID of the task. It is returned after the task is saved to redis, so if you want to add asynchronously and/or don't care about the generated ID, you can call the function asynchronously

Example

Add a task to be processed 30 seconds in the future:

const newTaskId = await dt.add(30000, { foo: 'bar' });

// or asynchronously
dt.add(30000, { foo: 'bar' });

poll()

To force a poll outside of the poll interval, call dt.poll(). This should be used with caution as it could potentially interfere with an active poll, therefore causing a transaction conflict in redis.

Testing

The test suite requires a local redis server on port 6379. You can run docker-compose up to launch one from this repo. Once redis is running, run npm test or npm coverage.

Notes

When tasks are processed

Tasks that are due are not processed immediately when due. Instead, they will be processed on the next poll interval. So, we recommend making the poll interval shorter if you care about processing tasks quicker after they're due.

Otherwise, if you just want to make sure it gets done sometime around when it's due, make the poll interval longer to give redis a break.

Redis transaction conflicts

If the redis key is updated during the internal poll() call, we do not retry and, instead, wait for the next poll interval. Since polling intervals can be very short, we don't want to end up overlapping.

Future work

  • This module performs minimal error catching outside of required parameters for this module. This may be improved in the future. For now, we recommend surrounding with try-catch to catch everything, including redis errors.

  • Use non-legacy mode of node-redis v4.

  • Trap errors from callback()

  • Ability to cancel a task by ID or data. Will wait until there's a genuine desire for this. For now, we'll assume that a task won't be added until it should run in the future.

  • Add a flag when the class is polling to prevent conflicts on explicit polls.

  • Better handling of watch conflicts in dt.poll. Right now it just quits, but if this happens a lot, nothing would end up getting processed.

  • Find a redis mock that works with node-redis v4 and later versions of redis server.

License

MIT License

Readme

Keywords

Package Sidebar

Install

npm i redis-delayed-tasks

Weekly Downloads

2

Version

2.0.1

License

MIT

Unpacked Size

16.8 kB

Total Files

5

Last publish

Collaborators

  • knation