mqlobber
Streaming message queue with pub-sub, work queues, wildcards and back-pressure. Just Node and a filesystem required.
mqlobber
basically remotes
qlobber-fsq
over one or more
connections.
Say you have a server and a number of clients, with the clients
connected to the server using some mechanism which provides a stream for each
connection. Create a QlobberFSQ
instance on the server and for each stream,
pass the instance and the stream to MQlobberServer
.
On each client, pass the other end of the stream to MQlobberClient
. Clients
can then publish and subscribe to topics (including wildcard subscriptions).
Work queues are also supported - when publishing a message, a client can specify
that only one subscriber should receive it.
All data is transferred on streams multiplexed over each connection using
bpmux
, with full back-pressure support
on each stream. Clients get a Writable
when publishing a message and a
Readable
when receiving one.
You can scale out horizontally by creating a number of QlobberFSQ
instances
(e.g. one per CPU core), all sharing the same message directory.
No other backend services are required - just Node and a filesystem.
The API is described here.
Example
First, let's create a server program which listens on a TCP port specified on the command line:
// server.jsvar net = QlobberFSQ = QlobberFSQ MQlobberServer = MQlobberServer fsq = ; fsq;
Next, a program which connects to the server and subscribes to messages published to a topic:
// client_subscribe.jsvar assert = MQlobberClient = MQlobberClient c = mq = c topic = processargv3; mq;
Finally, a program which connects to the server and publishes a message to a topic:
// client_publish.jsvar MQlobberClient = MQlobberClient c = mq = c; mq;
Run two servers listening on ports 8600 and 8601:
node server.js 8600 &node server.js 8601 &
Subscribe to two topics, foo.bar
and wildcard topic foo.*
, one against each
server:
node client_subscribe.js 8600 foo.bar &node client_subscribe.js 8601 'foo.*' &
Then publish a message to the topic foo.bar
:
node client_publish.js 8600 foo.bar
You should see the following output, one line from each subscriber:
received foo.bar hello
received foo.bar hello
Only the servers should still be running and you can now terminate them:
$ jobs[1]- Running node server.js 8600 &[2]+ Running node server.js 8601 &$ kill %1 %2[1]- Terminated node server.js 8600[2]+ Terminated node server.js 8601
Installation
npm install mqlobber
Licence
Test
grunt test
Lint
grunt lint
Code Coverage
grunt coverage
Istanbul results are available here.
Coveralls page is here.
API
- MQlobberClient
- MQlobberClient.prototype.subscribe
- MQlobberClient.prototype.unsubscribe
- MQlobberClient.prototype.publish
- MQlobberClient.events.handshake
- MQlobberClient.events.backoff
- MQlobberClient.events.drain
- MQlobberClient.events.full
- MQlobberClient.events.removed
- MQlobberClient.events.error
- MQlobberClient.events.warning
- MQlobberServer
- MQlobberServer.prototype.subscribe
- MQlobberServer.prototype.unsubscribe
- MQlobberServer.events.subscribe_requested
- MQlobberServer.events.unsubscribe_requested
- MQlobberServer.events.unsubscribe_all_requested
- MQlobberServer.events.publish_requested
- MQlobberServer.events.message
- MQlobberServer.events.handshake
- MQlobberServer.events.backoff
- MQlobberServer.events.drain
- MQlobberServer.events.full
- MQlobberServer.events.removed
- MQlobberServer.events.ack
- MQlobberServer.events.error
- MQlobberServer.events.warning
MQlobberClient(stream, [options])
Create a new
MQlobberClient
object for publishing and subscribing to messages via a server.
Parameters:
{Duplex} stream
Connection to a server. The server should useMQlobberServer
on its side of the connection. How the connection is made is up to the caller - it just has to supply aDuplex
. For example,net.Socket
orPrimusDuplex
.{Object} [options]
Configuration options. This is passed down toQlobberDedup
(which matches messages received from the server to handlers) andBPMux
(which multiplexes message streams over the connection to the server). It also supports the following additional property:{Buffer} [handshake_data]
Application-specific handshake data to send to the server. The server-sideMQlobberServer
object will emit this as ahandshake
event to its application.
Throws:
{Error}
If an error occurs before initiating the multiplex with the server.
Go: TOC
MQlobberClient.prototype.subscribe(topic, handler, [cb])
Subscribe to messages published to the server.
Parameters:
-
{String} topic
Which messages you're interested in receiving. Message topics are split into words using.
as the separator. You can use*
to match exactly one word in a topic or#
to match zero or more words. For example,foo.*
would matchfoo.bar
whereasfoo.#
would matchfoo
,foo.bar
andfoo.bar.wup
. Note these are the default separator and wildcard characters. They can be changed on the server when constructing theQlobberFSQ
object passed toMQlobberServer
. -
{Function} handler
Function to call when a new message is received from the server due to its topic matching againsttopic
.handler
will be passed the following arguments:-
{Readable} stream
The message content as a Readable. Note that all subscribers will receive the same stream for each message. -
{Object} info
Metadata for the message, with the following properties:{String} topic
Topic to which the message was published.{Boolean} single
Whether this message is being given to at most one handler (across all clients connected to all servers).{Integer} expires
When the message expires (number of seconds after 1 January 1970 00:00:00 UTC). This is only present if the server'sMQlobberServer
instance is configured withsend_expires
set totrue
.{Integer} size
Size of the message in bytes. This is only present if the server'sMQlobberServer
instance is configured withsend_size
set totrue
.
-
{Function} done
Function to call once you've handled the message. Note that calling this function is only mandatory ifinfo.single === true
, in order to clean up the message on the server.done
takes one argument:{Object} err
If an error occurred then pass details of the error, otherwise passnull
orundefined
.
-
-
{Function} [cb]
Optional function to call once the subscription has been registered with the server. This will be passed the following argument:{Object} err
If an error occurred then details of the error, otherwisenull
.
Throws:
{Error}
If an error occurs before sending the subscribe request to the server.
Go: TOC | MQlobberClient.prototype
MQlobberClient.prototype.unsubscribe([topic], [handler], [cb])
Unsubscribe to messages published to the server.
Parameters:
{String} [topic]
Which messages you're no longer interested in receiving via thehandler
function. Iftopic
isundefined
then all handlers for all topics are unsubscribed.{Function} [handler]
The function you no longer want to be called with messages published to the topictopic
. This should be a function you've previously passed tosubscribe
. If you subscribedhandler
to a different topic then it will still be called for messages which match that topic. Ifhandler
isundefined
, all handlers for the topictopic
are unsubscribed.{Function} [cb]
Optional function to call oncehandler
has been unsubscribed fromtopic
on the server. This will be passed the following argument:{Object} err
If an error occurred then details of the error, otherwisenull
.
Throws:
{Error}
If an error occurs before sending the unsubscribe request to the server.
Go: TOC | MQlobberClient.prototype
MQlobberClient.prototype.publish(topic, [options], [cb])
Publish a message to the server for interested clients to receive.
Parameters:
-
{String} topic
Message topic. The topic should be a series of words separated by.
(or whatever you configuredQlobberFSQ
with on the server). -
{Object} [options]
Optional settings for this publication:-
{Boolean} single
Iftrue
then the message will be given to at most one handler (across all clients connected to all servers). If you don't specify this then all interested handlers (across all clients) will receive it. -
{Integer} ttl
Time-to-live (in seconds) for this message. If you don't specify this then the default is taken from theQlobberFSQ
instance on the server. In any case,QlobberFSQ
's configured time-to-live is used to constrainttl
's maximum value.
-
-
{Function} [cb]
Optional function to call once the server has published the message. This will be passed the following argument:{Object} err
If an error occurred then details of the error, otherwisenull
.
Return:
{Writable}
Stream to which to write the message's data. Make sure you end
it when you're done.
Throws:
{Error}
If an error occurs before sending the publish request to the server.
Go: TOC | MQlobberClient.prototype
MQlobberClient.events.handshake(handshake_data)
handshake
event
Emitted by a MQlobberClient
object after it successfully completes an initial
handshake with its peer MQlobberServer
object on the server.
Parameters:
{Buffer} handshake_data
Application-specific data which theMQlobberServer
object sent along with the handshake.
Go: TOC | MQlobberClient.events
MQlobberClient.events.backoff()
backoff
event
Emitted by a MQlobberClient
object when it delays a request to the server
because the connection is at full capacity. If you want to avoid buffering
further requests, don't call subscribe
,
unsubscribe
or
publish
until a drain
event is emitted.
Go: TOC | MQlobberClient.events
MQlobberClient.events.drain()
drain
event
Emitted by a MQlobberClient
object when the multiplexing layer emits a drain
event.
Go: TOC | MQlobberClient.events
MQlobberClient.events.full()
full
event
Emitted by a MQlobberClient
object when the multiplexing layer emits a full
event.
Go: TOC | MQlobberClient.events
MQlobberClient.events.removed(duplex)
removed
event
Emitted by a MQlobberClient
object when the multiplexing layer emits a removed
event.
Parameters:
{Duplex} duplex
The multiplexed stream which has closed.
Go: TOC | MQlobberClient.events
MQlobberClient.events.error(err, obj)
error
event
Emitted by a MQlobberClient
object if an error is emitted by the multiplexing
layer (bpmux
), preventing proper
communication with the server.
Parameters:
{Object} err
The error that occurred.{Object} obj
The object on which the error occurred.
Go: TOC | MQlobberClient.events
MQlobberClient.events.warning(err, obj)
warning
event
Emmited by a MQlobberClient
object when a recoverable error occurs. This will
usually be due to an error on an individual request or multiplexed stream.
Note that if there are no warning
event listeners registered then the error
will be displayed using console.error
.
Parameters:
{Object} err
The error that occurred.{Object} obj
The object on which the error occurred.
Go: TOC | MQlobberClient.events
MQlobberServer(fsq, stream, [options])
Create a new
MQlobberServer
object for publishing and subscribing to messages on behalf of a client.
Parameters:
{QlobberFSQ | QlobberPG} fsq
File system queue - an instance ofQlobberFSQ
. This does the heavy-lifting of reading and writing messages to a directory on the file system. Alternatively, you can pass an instance ofQlobberPG
, which uses PostgreSQL to process messages.{Duplex} stream
Connection to the client. The client should useMQlobberClient
on its side of the connection. How the connection is made is up to the caller - it just has to supply aDuplex
. For example,net.Socket
orPrimusDuplex
.{Object} [options]
Configuration options. This is passed down toBPMux
(which multiplexes message streams over the connection to the client). It also supports the following additional properties:-
{Boolean} send_expires
Whether to include message expiry time in metadata sent to the client. Defaults tofalse
. -
{Boolean} send_size
Whether to include message size in metadata sent to then client. Defaults tofalse
. -
{Boolean} defer_to_final_handler
Iftrue
then a message stream is only considered finished when allMQlobberServer
objects finish processing it. Defaults tofalse
.
-
Go: TOC
MQlobberServer.prototype.subscribe(topic, [options], [cb])
Subscribe the connected client to messages.
Note: If the client is already subscribed to topic
, this function will do
nothing (other than call cb
).
Parameters:
-
{String} topic
Which messages the client should receive. Message topics are split into words using.
as the separator. You can use*
to match exactly one word in a topic or#
to match zero or more words. For example,foo.*
would matchfoo.bar
whereasfoo.#
would matchfoo
,foo.bar
andfoo.bar.wup
. Note these are the default separator and wildcard characters. They can be changed when constructing theQlobberFSQ
instance passed toMQlobberServer
's constructor. -
{Object} [options]
Optional settings for this subscription:{Boolean} subscribe_to_existing
Iftrue
then the client will be sent any existing, unexpired messages that matchtopic
, as well as new ones. Defaults tofalse
(only new messages).
-
{Function} [cb]
Optional function to call once the subscription has been made. This will be passed the following arguments:-
{Object} err
If an error occurred then details of the error, otherwisenull
. -
{Integer} n
The number of subscriptions made (0 iftopic
was already subscribed to, 1 if not).
-
Go: TOC | MQlobberServer.prototype
MQlobberServer.prototype.unsubscribe([topic], [cb])
Unsubscribe the connected client from messages.
Parameters:
{String} [topic]
Which messages the client should no longer receive. If topic isundefined
then the client will receive no more messages at all.{Function} [cb]
Optional function to call once the subscription has been removed. This will be passed the following arguments:-
{Object} err
If an error occurred then details of the error, otherwisenull
'. -
{Integer} n
The number of subscriptions removed.
-
Go: TOC | MQlobberServer.prototype
MQlobberServer.events.subscribe_requested(topic, cb)
subscribe_requested
event
Emitted by a MQlobberServer
object when it receives a request from its peer
MQlobberClient
object to subscribe to messages published to a topic.
If there are no listeners on this event, the default action is to call
subscribe(topic, cb)
.
If you add a listener on this event, the default action will not be called.
This gives you the opportunity to filter subscription requests in the
application.
Parameters:
{String} topic
The topic to which the client is asking to subscribe.{Function} cb
Function to call after processing the subscription request. This function must be called even if you don't callsubscribe
yourself. It takes the following arguments:
Go: TOC | MQlobberServer.events
MQlobberServer.events.unsubscribe_requested(topic, cb)
unsubscribe_requested
event
Emitted by a MQlobberServer
object when it receives a request from its peer
MQlobberClient
object to unsubscribe from messages published to a topic.
If there are no listeners on this event, the default action is to call
unsubscribe(topic, cb)
.
If you add a listener on this event, the default action will not be called.
This gives you the opportunity to filter unsubscription requests in the
application.
Parameters:
{String} topic
The topic from which the client is asking to unsubscribe.{Function} cb
Function to call after processing the unsubscription request. This function must be called even if you don't callunsubscribe
yourself. It takes the following arguments:-
{Object} err
Ifnull
then a success status is returned to the client (whether you calledunsubscribe
or not). Otherwise, the client gets a failed status and awarning
event is emitted witherr
. -
{Integer} n
The number of subscriptions removed. -
{Buffer} [data]
Optional data to return to the client.
-
Go: TOC | MQlobberServer.events
MQlobberServer.events.unsubscribe_all_requested(cb)
unsubscribe_all_requested
event
Emited by a MQlobberServer
object when it receives a request from its peer
MQlobberClient
object to unsubscribe from all messages published to any topic.
If there are no listeners on this event, the default action is to call
unsubscribe(cb)
. If you add a listener
on this event, the default action will not be called. This gives you the
opportunity to filter unsubscription requests in the application.
Parameters:
{Function} cb
Function to call after processing the unsubscription request. This function must be called even if you don't callunsubscribe
yourself. It takes the following arguments:-
{Object} err
Ifnull
then a success status is returned to the client (whether you calledunsubscribe
or not). Otherwise, the client gets a failed status and awarning
event is emitted witherr
. -
{Integer} n
The number of subscriptions removed. -
{Buffer} [data]
Optional data to return to the client.
-
Go: TOC | MQlobberServer.events
MQlobberServer.events.publish_requested(topic, stream, options, cb)
publish_requested
event
Emitted by a MQlobberServer
object when it receives a request from its peer
MQlobberClient
object to publish a message to a topic.
If there are no listeners on this event, the default action is to call
stream.pipe(fsq.publish(topic, options, cb))
, where fsq
is the
QlobberFSQ
instance you passed to MQlobberServer
's constructor.
Parameters:
-
{String} topic
The topic to which the message should be published. -
{Readable} stream
The message data as aReadable
. This is multiplexed over the connection to the client - back-pressure is applied to the senderMQlobberClient
object according to when you callread
. -
{Object} options
Optional settings for this publication:-
{Boolean} single
Iftrue
then the message should be published to at most one client (across all servers). Otherwise, it should be published to all interested clients. -
{Integer} ttl
Time-to-live (in seconds) for this message.
-
-
{Function} cb
Function to call after processing the publication request. This function must be called even if you don't callpublish
yourself. It takes the following arguments:
Go: TOC | MQlobberServer.events
MQlobberServer.events.message(stream, info, multiplex, done)
message
event
Emitted by a MQlobberServer
object when its QlobberFSQ
object passes it a
message published to a topic to which its peer MQlobberClient
object has subscribed.
If there are no listeners on this event, the default action is to call
stream.pipe(multiplex())
.
You can add a listener on this event to insert processing between the message stream and the client.
Parameters:
-
{Readable} stream
The message content as a Readable. Note that all subscribers will receive the same stream for each message. -
{Object} info
Metadata for the message, with the following properties:{String} topic
Topic to which the message was published.{Boolean} single
Whether this message is being given to at most one handler (across all clients connected to all servers).{Integer} expires
When the message expires (number of seconds after 1 January 1970 00:00:00 UTC). This is only present if theMQlobberServer
object was configured withsend_expires
set totrue
.{Integer} size
Size of the message in bytes. This is only present if the server'sMQlobberServer
instance is configured withsend_size
set totrue
.
-
{Function} multiplex
Function to call in order to multiplex a new stream over the connection to the client. It returns the multiplexed stream, to which the data fromstream
should be written - after the application applies whatever transforms and processing it requires. -
{Function} done
If you don't callmultiplex
then you should call this function to indicate you have finished handling the message.done
takes the following optional argument:{Object} [err]
If an error occurred while handling the message, pass it here.
Go: TOC | MQlobberServer.events
MQlobberServer.events.handshake(handshake_data, delay_handshake)
handshake
event
Emitted by a MQlobberServer
object after it receives an initial handshake
message from its peer MQlobberClient
object on the client.
Parameters:
{Buffer} handshake_data
Application-specific data which theMQlobberClient
object sent along with the handshake.{Function} delay_handshake
By default,MQlobberServer
replies toMQlobberClient
's handshake message as soon as your event handler returns and doesn't attach any application-specific handshake data. If you wish to delay the handshake message or provide handshake data, calldelay_handshake
. It returns another functon which you can call at any time to send the handshake message. The returned function takes a single argument:{Buffer} [handshake_data]
Application-specific handshake data to send to the client. The client-sideMQlobberClient
object will emit this as ahandshake
event to its application.
Go: TOC | MQlobberServer.events
MQlobberServer.events.backoff()
backoff
event
Emitted by a MQlobberServer
object when it delays a message to the client
because the connection is at full capacity.
If you want to avoid buffering further messages, use a filter
function (see
QlobberFSQ
's constructor) to prevent messages being sent until a drain
event is emitted. In the filter
function, a handler owned by a MQlobberServer
object will have a property named mqlobber_server
set to the MQlobberServer
object.
You can also use event listeners on subscribe_requested
, unsubscribe_requested
, unsubscribe_all_requested
and publish_requested
to prevent responses being
sent to the client until a drain
event is emitted.
Depending on your application, you might also terminate the connection if it can't keep up.
Go: TOC | MQlobberServer.events
MQlobberServer.events.drain()
drain
event
Emitted by a MQlobberServer
object when the multiplexing layer emits a drain
event.
Go: TOC | MQlobberServer.events
MQlobberServer.events.full()
full
event
Emitted by a MQlobberServer
object when the multiplexing layer emits a full
event.
Go: TOC | MQlobberServer.events
MQlobberServer.events.removed(duplex)
removed
event
Emitted by a MQlobberServer
object when the multiplexing layer emits a removed
event.
Parameters:
{Duplex} duplex
The multiplexed stream which has closed.
Go: TOC | MQlobberServer.events
MQlobberServer.events.ack(info)
ack
event
Emitted by a MQlobberServer
object when the client has acknowledged receipt
of a message.
Parameters:
{Object} info
Metadata for the message, with the following properties:{String} topic
Topic to which the message was published.{Boolean} single
Alwaystrue
because acknowledgements are only supported for messages which were given to a single handler (across all clients connected to all servers).{Integer} expires
When the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).
Go: TOC | MQlobberServer.events
MQlobberServer.events.error(err, obj)
error
event
Emitted by a MQlobberServer
object if an error is emitted by the multiplexing
layer (bpmux
), preventing proper
communication with the client.
Parameters:
{Object} err
The error that occurred.{Object} obj
The object on which the error occurred.
Go: TOC | MQlobberServer.events
MQlobberServer.events.warning(err, obj)
warning
event
Emited by a MQlobberServer
object when a recoverable error occurs. This will
usually be due to an error on an individual request or multiplexed stream.
Note that if there are no warning
event listeners registered then the error
will be displayed using console.error
.
Parameters:
{Object} err
The error that occurred.{Object} obj
The object on which the error occurred.
Go: TOC | MQlobberServer.events
—generated by apidox—