egg-mqtt-plugin 版本 | egg 2.x |
---|---|
1.x | 😁 |
0.x | ❌ |
// config/plugin.js
exports.mqtt = {
enable: true,
package: 'egg-mqtt-plugin',
};
// config/default.js
config.mqtt = {
host: 'mqtt://xxx.xxx.x.x',
port: 1883,
username: 'username',
password: 'password',
clientId: 'client_id',
topics: {
'topic-topic-topic': { qos: 0 },
},
// options:{},
// DataBus: true,
}
详细配置
插件介绍
插件的使用
插件的使用.建立链接
插件的使用.发布
插件的使用.订阅
插件的使用.取消订阅
插件的使用.监听订阅消息
Mqtt类
示例
issue
license
请到 config/config.default.js 查看详细配置项说明。
配置项 | 类型 | 必填 | 默认值 | 描述 |
---|---|---|---|---|
host | string | √ | ||
port | number | √ | ||
username | string | √ | ||
password | string | √ | ||
clientId | string | √ | ||
DataBus | boolean | × | true | 是否需要统一的数据输出 |
topics | object | × | 需要订阅的topic,参照mqtt.subsceibe() | |
options | object | × | keeplive,connectTimeout等配置,参照mqtt.connect() |
topics
用来创建订阅,会传入mqtt.subsceibe()
方法,options
会传入mqtt.connect()
方法,更加详细的配置参数请参考文档
使用过一些mqtt插件,发现并不能满足业务开发的需求。有的插件在生产环境和多进程环境下无法使用,为了能够在多进程下使用mqtt客户端并且能够有一个比较舒服的使用方式。
为了在多进程下使用mqtt,根据egg官方文档的建议,长连接的维护一般放在agent进程中。所以egg-mqtt-plugin
插件基于mqtt.js
在agent进程中维护长连接,并且由agent进程来完成发布和订阅事件,通过进程间的通讯由agent进程将订阅的消息内容随机的发送到一个work进程,work进程将需要发布的消息发送给agent进程。agent进程不参与具体的业务逻辑,只维护长连接和监听发布订阅事件。
mqtt实例运行在agent进程上,所以在work进程上无法对mqtt实例进行操作,egg-mqtt-plugin
提供了统一数据出口(需要开启DataBus
)和Mqtt类。
插件得到publish、subscribe、unsubscribe等方法建议在agent.js或者在一个http请求的生命周期(controller、service、middleware、route)中使用。如果在非agent进程和一个http请求的生命周期中使用,多进程 环境下会导致每个worker进程都调用一次,从而导致了重复调用。这个问题会在后续版本中优化,可以在任何地方调用并且不会导致重复调用的情况。
获取订阅消息的message()方法不会多次触发,可以在任何地方使用。
开启插件并填写好相关配置,在框架启动的时候时候会自动进行mqtt链接。
因为多进程的原因最好在一个http请求的生命周期(controller、service、middleware、route)中或者agent中调用,如果其他地方使用会出现重复发布的情况。
消息的发布需要使用Mqtt实例,调用Mqtt实例的publish()
方法即可。
Mqtt类由 'egg-mqtt-start/bootstrap.js'文件暴露出来,实例化Mqtt的时候需要使用app
作为参数。
publish()方法需要传入一个对象参数,其中包括topic、message、options等信息,更详细的参数可以参考mqtt.publish()文档。
'use strict';
const Mqtt = require('egg-mqtt-start/bootstrap');
const mqtt = new Mqtt(this.app);
const topic = 'xxx-xxx-xxx';
const message = '这是我要发布的信息';
const options = { qos: 0 };
mqtt.publish(topic, message, options, err => {
console.log('发布完成');
});
因为多进程的原因最好在一个http请求的生命周期(controller、service、middleware、route)中或者agent中调用,如果其他地方使用会出现重复订阅的情况。
订阅有两种方式,一种是通过config.default.js配置文件中topics
参数进行配置,另一种是手动调用subscribe()
方法。更详细的参数可以参考mqtt.subscribe()文档。
'use strict';
const Mqtt = require('egg-mqtt-start/bootstrap');
const mqtt = new Mqtt(this.app);
const topic = 'xxx-xxx-xxx';
const options = { qos: 0 };
mqtt.subscribe(topic, options, (err, granted) => {
console.log(granted.topic);
});
取消订阅直接调用Mqtt
类的unsubscribe()
方法就可以了。
const Mqtt = require('egg-mqtt-start/bootstrap');
const mqtt = new Mqtt(this.app);
const topic = 'xxx-xxx-xxx';
const options = { qos: 0 };
mqtt.unsubscribe(topic, options, err => {
console.log('取消订阅');
});;
获取订阅的消息有两种方式,一种是开启config.default.js
配置文件中DataBus:true
选项(默认开启),订阅的所有消息和app
都会统一的被发送到/app/mqtt/DataBus.js
文件中,
开启此功能后DataBus.js
文件会在项目启动后自动创建,也可以自己手动创建,格式如下例子:
'use strict';
// /app/mqtt/DataBus.js
module.exports = async (app, data) => {
console.log('收到数据');
console.log(data.topic);
};
另一种获取订阅的消息的方式是调用message()
方法,需要传入一个回调函数,插件会将topic
和相应的消息信息传入回调函数的参数中。
const Mqtt = require('egg-mqtt-plugin/bootstrap');
const mqtt = new Mqtt(this.app);
mqtt.message((topic, message) => {
console.log(topic);
})
egg-mqtt-plugin
是基于mqtt.js
进行封装并且是在agent
进程上进行实例化的,所以在我们的业务代码中,无法使用mqtt.js
实例提供的相应的方法,为了保证正常的时候用,
egg-mqtt-plugin
通过bootstrap.js
文件导出一个Mqtt
类,在这个类中通过进程间通讯间接的实现了mqtt.js
实例上的部分方法。目前的版本暂时没有显示部分方法的回调函数功能。
const Mqtt = require('egg-mqtt-plugin/bootstrap');
// 获取mqtt实例的时候需要传入当前环境下的app对象
cosnt mqtt = new Mqtt(this.app);
// 发布
mqtt.publish('topic', '我是发布的消息体', { qos: 0 }, callback);
// 订阅
mqtt.subscribe('topic', { qos: 0 }, callback);
// 取消订阅
mqtt.unsubscribe('topic', { qos: 0 }, callback);
// 监听消息
mqtt.message((topic, message) => {
console.log(topic);
});
请到 egg issues 异步交流。