cloud-queue2

1.0.3 • Public • Published

Cloud Queue

This library acts as a common abstraction on top of a number of popular cloud queue implementations. It provides a simple, common interface on top of SQS, IronMQ, RabbitMQ, Azure, Rackspace, Kafka and Redis. This means you can write your application code once, and with only config file changes use any one of the implementations supported by this library.

The Cloud Queue Model

The cloud queue behavioral model as far as I understand it, is as follows: When you dequeue a message, that message is not removed from the queue; it is marked "invisible" so no other possible dequeuer can dequeue the same message. This invisibility marker only lasts a certain amount of time. If that amount of time runs out before the message is explicity removed from the queue, it becomes visible again for others to dequeue. So when you dequeue, at some point you must explicity remove the message you dequeued from the queue.

The idea is that we want to deal with the problem that someone dequeues a message, but then dies or otherwise gets into a bad state before the message is actually handled. If this happens, the temporarily invisible message will become visible after a short time, allowing someone else to attempt to process the message.

All of the queue implementations in this library follow this model except for RedisQ. Redis does not have the capability to emulate this behavior, at least not in any strait forward way. So the RedisQ should not be used in production, or at least in any mission critical situation. Once a message is dequeued, its gone. If the process that is processing the message dies before handling the message, its gone forever.

Queues and Fifos

There are queues and there are fifos. SQS is a queue but it is not a fifo. That is, the order in which things are enqueued is not the order in which they are dequeued. SQS is suitable as a work queue, when you don't really care about the strict ordering of messages. RackQ, IronMQ and RabbitMQ are strict fifos. Redis emulates a strict fifo. I am not sure if Azure is a strict fifo, but I don't think it is.

Kafka is a little different than the other systems listed above. With Kafka you must define a "keyField" that is present in every message, like a deviceId or a uuid. All messages with the same value in their keyField will be handled in fifo order. Messages with different keyFields are handled in parallel (out of order). Also, if you are running multiple instances of a consumer and all instances shared the same "groupId", then messages with the same keyField will be handled by the same consumer instance every time. For these reasons, Kafka is a great choice for IoT systems.

Usage - Push Model (consumer)

// PRODUCER (enqueue)
let q = require( 'cloud-queue' )( config );
q.producer.connect( function( err ) {
  if ( err ) exit( err );
  q.producer.send( 'myQueueName', { my: "message" }, function( err ) {
    // ...
  });
});

// CONSUMER (dequeue)
let q = require( 'cloud-queue' )( config );
q.consumer.connect( 'myQueueName', function( message, cb ) {
  console.log( JSON.stringify( message ) );
  cb();  // cb() will delete the message from the queue, cb(err) will not.
});

Usage - Pull Model (dequeue)

// PRODUCER (enqueue)
let q = require( 'cloud-queue' )( config );
q.producer.connect( function( err ) {
  if ( err ) exit( err );
  q.producer.send( 'myQueueName', { my: "message" }, function( err ) {
    // ...
  });
});

// CONSUMER (dequeue)
let q = require( 'cloud-queue' )( config );
q.consumer.connect( function( err ) {
  if ( err ) exit( err );
  async.forever( function( cb ) {
    q.consumer.dequeue( 'myQueueName', function( err, messages ) {
      if ( err ) return cb( err );
      if ( ! ( messages && messages[0] ) ) {
        console.log( 'no messages available' );
        return cb();
      }
      else {
        console.log( 'dequeued', messages.length, 'messages' );
      }
      async.eachSeries( messages, function( message, cb ) {
        console.log( JSON.stringify( message.msg ) );
        // do some work ,,,
        q.consumer.remove( 'myQueueName', message.handle, function( err ) {
          cb( err );
        });
      }, cb );
    });
  }, function( err ) {
    console.log( err );
    exit( err );
  });
});

Configuration

The object that you pass when creating a cloud queue looks like:

{
  class: QUEUE-CLASS-TO-USE,
  logger: OPTIONAL-WINSTON-LIKE-LOGGER-TO-USE,
  retry_times: HOW-MANY-TIMES-TO-RETRY ( default: 6 ),
  connection: { CONNECTION-OPTIONS },
  options: {
    maxNumberOfMessages: 1,  // max number of messages to dequeue at a time
    waitTimeSeconds: 5,      // if no messages, seconds to wait for another poll
    visibilityTimeout: 30    // visibility timeout, iff applicable
  }
}

The class names supported as of this writing are; SQS, IronMQ, RabbitMQ, AzureQ, RackQ, KafkaQ and RedisQ. The connection object you pass depends on the class you choose. See "config-example.json" for how the configuration should look for each class.

Consuming

As you can see from the api, to consume messages you pass a message handler function which will get called whenever there is a new message. This handler is passed a callback which you will call after processing the message. You won't receive a new message until you call the message handler callback of the current message. Still, the rate at which messages are coming in could be very fast.

Lets say that you are reading from a queue and writing DynamoDB records. If the write capacity for your table is 5 (writes per second), then you better keep at or below this value when doing the DynamoDB writes. But you may be dequeuing at 100 messages per second!

The solution is to rate limit yourself. I like this code:

let rateLimit = require('function-rate-limit');

let perSec = 5;
let putItem = rateLimit( perSec, 1000, function( table, item, cb ) {
  ddb.putItem( table, item, {}, cb );
});

Q.consumer.connect( table, function( message, cb ) {
  putItem( table, message, function( err ) {
    cb( err );
  });
});

Nice and simple. Will keep your message handling at or below 5 messages per second to match your table's write capacity.

Consuming - the "pull" model

Sometimes you just gotta pull. In this use model, you call the consumer "connect" with no queue name and just a callback function which is called when the connection has been established. From then on you must explicitly "pull" from the queue by calling dequeue(). The dequeue() function will return a list of message items, each looking like

{ msg: QUEUE-MESSAGE,
  handle: OPAQUE-MESSAGE-HANDLE }

Deal with the message as you see fit, then call remove() with the queue name and handle to delete the message from the queue.

Implementation Options

RabbitMQ

  {
    "class": "RabbitMQ",
    "connection": {
      "url": "amqp://admin:secret@rabbitmq"
    },
    "options": {
      "producerConfirm": true,
      "messageTtl": 259200000,
      "expires": 604800000,
	  "autoDelete": false,
      "autoAck": false,
      "qos": { "count": 100, "global": false }
    }
  }

producerConfirm indicates whether producer.send waits for a confirmation from rabbitmq (true, default) when enqueuing a message. If set to false, it is more of a fire-and-forget model, but total throughput may be much faster. messageTtl is the number of milliseconds an enqueued message should live before being deleted by rabbitmq. The default is 3 days. expires is the number of milliseconds of disuse of a queue before it is deleted by rabbitmq. The default is 7 days. qos is used when consuming messages in "push" mode; consumer.connect() with a message handler. qos.count is the number of messages alowed to be outstanding without being deleted. qos.global when false means that qos.count is per client, not global and is probably what you want. autoAck is used on both "push" and "pull" modes, and if set to true, tells the rabbitmq broker to consider messages to be acknowledged as soon as they are sent to the consumer. If false (the default), the consumer must explicity acknowldege (delete) messages. Setting autoAck to true can lead to considerably better consumption rates, at the risk of possibly losing some messages due to failues.
See this article for details.

autoDelete is a different way of managing queue deletion. It is set to false by default. If set to true, queues will be deleted when the number of consumers drops to zero.

Testing

You can do some testing here. You can fire up a test environment on your development machine by executing docker-compose up -d. This will get you a redis service and a rabbitmq service, and a container called "app" which you can "exec -it" into and run "push-test.js" and "pop-test.js". You can add kafka to the mix by running "sh ./launch-kafka.sh" and by editing your config.json to point the connectionString to the ip address of the machine running kafka ($DOCKER_HOST).

The other queuer services require account credentials.

Testing Hints

RabbitMQ

If you use the Dockerfile.rabbitmq here to deploy a rabbitmq instance, then you can:

docker exec -it rabbitmq ./rabbitmqadmin --username=admin --password=secret list queues

See this link for details.

Versions

Current Tags

VersionDownloads (Last 7 Days)Tag
1.0.32latest

Version History

VersionDownloads (Last 7 Days)Published
1.0.32
1.0.20
1.0.10
0.0.260
0.0.250
0.0.240
0.0.230
0.0.220
0.0.210
0.0.200
0.0.190
0.0.180
0.0.170
0.0.160
0.0.150
0.0.140
0.0.130
0.0.120
0.0.110
0.0.100
0.0.90
0.0.80
0.0.70
0.0.60
0.0.50
0.0.40
0.0.30
0.0.20
0.0.10

Package Sidebar

Install

npm i cloud-queue2

Weekly Downloads

2

Version

1.0.3

License

ISC

Unpacked Size

47.2 kB

Total Files

19

Last publish

Collaborators

  • aqpeeb