mqtt-router
An easy-to-use and flexible routing library for MQTT.
Overview
@pera-swarm/mqtt-router is a library for handling MQTT publish/subscribe capabilities with a straight forward routing architecture.
This is a Node.js library available on both npm registry and GitHub package registry.
Usage
1. Installation
Installation done using npm install
command:
npm i --save @pera-swarm/mqtt-router
Also, you need to install mqtt
library as well.
npm i --save mqtt
2. Set up Routes
Create routes for subscribe and publish:
routes.js
// Sample dynamic route list with handler functions
const SAMPLE_ROUTES = [
{
topic: 'v1/sample',
allowRetained: true,
subscribe: true,
publish: false,
handler: (msg) => {
const data = JSON.parse(msg);
console.log('Sample subscription picked up the topic', data);
}
}
];
module.exports = SAMPLE_ROUTES;
3. Start the Router
You should configure your own mqttOptions according to your mqtt broker and application settings.
index.js
// Configure mqttClient
const mqttClient = require('mqtt');
const mqttOptions = {
port: 1883,
clientId: process.env.MQTT_CLIENT,
username: process.env.MQTT_USER || '',
password: process.env.MQTT_PASS || ''
};
const mqtt = mqttClient.connect(process.env.MQTT_HOST, mqttOptions);
// Import MQTTRouter from mqtt-router
const { MQTTRouter } = require('@pera-swarm/mqtt-router');
const routes = require('./routes');
var router;
// Sample MQTT Message Options
const SAMPLE_OPTIONS = { qos: 2, rap: true, rh: true };
// Sample setup function that runs on connect
const SAMPLE_SETUP_FN = () => {
console.log('sample setup fn');
};
// Sample MQTT Error handler function
const SAMPLE_ON_ERROR_FN = (err) => {
console.log('error: mqtt');
console.log(err);
};
router = new MQTTRouter(
mqtt,
routes,
SAMPLE_OPTIONS,
SAMPLE_SETUP_FN,
SAMPLE_ON_ERROR_FN
);
router.start();
router.start()
will listen to the subscribed routes that are specified as subscribed: true
in the route
specification and then if the subscriber picked up a message for the associated topic, the MQTTRouter will call the relevant handler
funtion.
You can also wrap the routes using wrapper
function to include additional higher level attribute to the handler function as well.
index.js
// Import MQTTRouter and wrapper from mqtt-router
const { MQTTRouter, wrapper } = require('@pera-swarm/mqtt-router');
const routes = require('./routes');
var router;
// Sample MQTT Message Options
const SAMPLE_OPTIONS = { qos: 2, rap: true, rh: true };
// Sample setup function that runs on connect
const SAMPLE_SETUP_FN = () => {
console.log('sample setup fn');
};
// Sample MQTT Error handler function
const SAMPLE_ON_ERROR_FN = (err) => {
console.log('error: mqtt');
console.log(err);
};
// Sample higher level attribute for the handler function
const sampleAttrib = {
time: Date.now()
};
router = new MQTTRouter(
mqtt,
wrapper(routes, sampleAttrib),
SAMPLE_OPTIONS,
SAMPLE_SETUP_FN,
SAMPLE_ON_ERROR_FN
);
router.start();
routes.js
// Sample dynamic route list with handler functions.
// sampleAttrib will be added to the handler function as the second parameter.
const SAMPLE_ROUTES = [
{
topic: 'v1/sample',
allowRetained: true,
subscribe: true,
publish: false,
handler: (msg, attrib) => {
const data = JSON.parse(msg);
// console.log(attrib);
console.log('Sample subscription picked up the topic', data);
}
}
];
module.exports = SAMPLE_ROUTES;
Useful
Note: You can also configure a topic prefix by configuring an environment variable
MQTT_CHANNEL
. (example:MQTT_CHANNEL=beta
in a .env file locally)
Contribute
1. Install dependencies
Install project dependencies.
npm install
2. Testing
Note: Before running the test cases, you should configure environment variables
MQTT_HOST
,MQTT_USER
,MQTT_PASS
, andMQTT_CLIENT
. Please refersample.nodemon.json
file for nodemon environment variable configuration.
Manually run the test cases.
node test/index.js
or you can use nodemon script once environment variables configured correctly.
npm run client
Documentation
Route
A route definition for handling route subscription logic. Following are the properties supported on the Route
definition:
-
topic: string
The Route topic -
type: 'String' | 'JSON'
Payload type (default:String) -
allowRetained: boolean
Retain allowance -
subscribe: boolean
Subscribe flag -
publish: boolean
Publish flag -
handler: Function
The default subscribe handler function, called when subscribe:true, packet.retain:true|false and allowRetained:true. Retained messages and new messages will be handled by default. -
fallbackRetainHandler?: Function
Subscribe handler function only for retained messages, but for route specific custom logic. called when subscribe:true, packet.retain:true and allowRetained:false.
Note: If specified,
fallbackRetainHandler
function will be called. If not specified, retained messages will be discarded.
MQTTRouter (mqttConnection, routes, options, setup, onError)
The main entrypoint of mqtt-router that defines the router logic.
You have to import the MQTTRouter
class and instantiate with a mqtt client.
Parameters supported in the constructor:
-
mqttConnection {MqttClient}
: mqtt connection -
routes {Route[]}
: routes with mqtt topic, handler and allowRetained properties -
options {IClientSubscribeOptions}
: mqtt message options -
setup {Function}
: setup function that runs on successful connection -
onError {Function}
: error handler function
mqttRouter.start()
The method for starting the mqtt router.
You must call this method once, a MQTTRouter object instantiated.
mqttRouter.pushToPublishQueue(topic, data)
Add a message to the publish queue that to be scheduled to publish. Parameters supported:
-
topic {string}
: message topic -
data {string|Buffer}
: message data
mqttRouter.addRoute(route)
Add a route to the subscriber routes list. Parameter supported:
-
route {Route}
: route object to be added to the subscriber list
mqttRouter.removeRoute(topic)
Remove a route from the subscriber routes list. Parameter supported:
-
topic {string}
: route topic
wrapper(routes, property)
Wrap an array of Route
objects with a higher order property (ex: property can bethis
from the callee class) or
a separate attribute to the handler
function as a second parameter, in each route object.
Parameters supported:
-
routes {Route[]}
: routes array -
property {any}
: property that required to be wrapped with
Queue
A Queue implementation for the mqtt-router
with a scheduler that acts as a "Publish Queue".
Parameters supported in the constructor:
-
mqttClient {MqttClient}
: mqtt connection -
options {IClientSubscribeOptions}
: mqtt message options -
number {number}
: interval
queue.begin()
Begin the queue processing (scheduler).
Note: You must call this method once, a Queue object instantiated.
queue.add(topic, data)
Add a message to the publish queue. This message will be published by the scheduler. Parameters supported:
-
topic {string}
: message topic -
data {string|Buffer}
: message data
queue.remove(topic)
Remove a message in the queue by a given topic. Parameter supported:
-
topic {string}
: message topic
queue.findByTopic(topic)
Find a message with the given topic in the queue. Returns -1
if not found on the queue.
Parameter supported:
-
topic {string}
: message topic
subscribeToTopic(mqtt, topic, options)
Subscribe to a given topic with options. Parameters supported:
-
mqtt {MqttClient}
: mqtt connection -
topic {string}
: message topic -
options {IClientSubscribeOptions}
: mqtt message options
publishToTopic(mqtt, topic, message, options, callback)
Publish a message to a given message topic with options and a callback function. Parameters supported:
-
mqtt {MqttClient}
: mqtt connection -
topic {string}
: message topic -
message {string}
: message data -
options {IClientSubscribeOptions}
: mqtt message options -
callback {Function}
: callback function
secondsInterval(interval)
Generates a cron interval called in given seconds Parameter supported:
-
interval {number}
: interval in seconds
minutesInterval(interval)
Generates a cron interval called in given minutes Parameter supported:
-
interval {number}
: interval in minutes
To-Do
- Fix duplicate topic support for routing.
Licence
This project is licensed under LGPL-2.1 Licence.