@chandre/egg-amqp

1.1.4 • Public • Published

@change/egg-amqp

Install

$ npm i @change/egg-amqp --save

Usage

// {app_root}/config/plugin.js
exports.amqp = {
  enable: true,
  package: '@change/egg-amqp',
};

Configuration

// {app_root}/config/config.default.js
exports.amqp = {
  client: {
    url: 'amqp://127.0.0.1:5672',
  }
};

see config/config.default.js for more detail.

Example

Controller

// {app_root}/app/amqp/controller/queue.js
const Controller = require('egg').Controller;
class QueueController extends Controller {
  async test(ctx){
    const channel = ctx.channel; //amqp 频道
    const message = ctx.message; //amqp 消息
    const params = ctx.request.body; //消息内容
    ctx.body = 'test' 
    channel.ack(message); // 队列处理完成
  }
}
return QueueController

Middleware

// {app_root}/app/amqp/middleware/handlerQueue.js
module.exports = (app) => {
  return async(ctx, next) => {
    await next();
    return ctx.body; //rpc调用回复结果
  }
}

Router

module.exports = app => {
  const { middleware, controller } = app.amqp;
  // 创建RPC app.amqp.createRpcConsumer("前缀", ['全局中间件']);
  const RPC = app.amqp.createRpcConsumer('app.rpc', middleware.handlerQueue(app));
  // 添加RPC队列方法
  RPC.add({
    'test': {
      handler: controller.queue.test //控制器
      options: { durable: false, autoDelete: true } //队列配置选项
      consume: { noAck: false } // 消费者配置选项
    }
  })

  // 创建队列 app.amqp.createQueueConsumer("前缀", ['全局中间件']);
  const QUEUE = app.amqp.createQueueConsumer('app.queue');
  // 添加队列
  QUEUE.add({
    'test': {
      handler: controller.queue.test //控制器
      prefetch: 1, // 预取队列数据
      options: { durable: false, autoDelete: true } //队列配置选项
      consume: { noAck: false } // 消费者配置选项
      middleware: [], //控制器中间件
    }
  })

  // 创建交换机 app.amqp.createExchange("前缀", 交换机配置参数,['交换机队列全局中间件'] )
  const EXCHANGE = app.amqp.createExchange('app', { 
    name: 'fanout', // 交换机名
    type: 'fanout', //交换机类型
    durable: false, 
    autoDelete: true,
  })
  // 添加交换机队列
  EXCHANGE.add({
    'queue.test': {
      handler: controller.queue.test //控制器
      prefetch: 1, // 预取队列数据
      options: { durable: false, autoDelete: true } //队列配置选项
      consume: { noAck: false } // 消费者配置选项
      exchange: {}, // 交换机绑定队列参数
      routerKey: [], // 交换机队列路由键
      middleware: [], //控制器中间件
    }
  })
}

发送消息到指定队列

// this.app.amqp.send(队列名, 消息内容, [队列属性])
this.app.amqp.send('app.queue.test', {
  data: 'hello queue'
}, {
  timestamp: 1655360041817,
})

RPC队列调用

// this.app.amqp.call(rpc队列, 消息内容, [队列属性])
this.app.amqp.call('app.rpc.test', {
  data: 'hello queue'
} ).then(res => {
  console.log(res)
}).catch(err => {
  console.log(err)
});

广播消息到交换机中所有队列

// this.app.amqp.publish(交换机, 消息内容, [路由键,队列属性])
this.app.amqp.publish('app.fanout', {
  data: 'hello queue'
})

RPC消息到交换机中指定路由键

// this.app.amqp.callTopic(交换机, 消息内容, 路由键, [队列属性])
const result = await this.app.amqp.callTopic('app.fanout', {
  data: 'hello queue'
}, 'routerKey')
console.log(result)

License

MIT

Package Sidebar

Install

npm i @chandre/egg-amqp

Weekly Downloads

0

Version

1.1.4

License

MIT

Unpacked Size

30.2 kB

Total Files

14

Last publish

Collaborators

  • chandre