@fangjinlyx/processpool
TypeScript icon, indicating that this package has built-in type declarations

0.1.9 • Public • Published

processPool

介绍

基于cluster模块开发的进程池,只需要很简单的编码就可以最大程度的发挥多核的性能。

修复

  • 0.1.9

进程池溢出问题(严重)

调试日志

${env:NODE_DEBUG='master,worker'};

任务格式
属性名 类型 说明
id string 任务的id,要保证唯一性
args any[] 会被当做参数注入到工作进程中暴露出的run方法
executableFilePath string 工作进程的执行路径,应为一个.js文件,该值也作为工种的分类依据,所以传值时建议通过path.resolve()进行格式化
进程工种

进程工种是为了更好的细分进程池,将相似度高的任务放到同一工种进程去执行,可以减少进程环境切换带来的额外开销。举个简单的例子:

// master.js

// 创建一个8个进程的进程池
const processPool = new ProcessPool(8);
// 生成100个任务
for (let i = 0; i < 100; i++) {
  processPool.addTask({
      id: Math.random().toString(),
      args: [],
      executableFilePath: path.join(__dirname, './worker'),
  });
}

// worker.js
import { Worker } from "../src/processPool";

export default class Test extends Worker {

  private mongoClient;

  public async before() {
    // 连接mongodb
    // this.mongoClient = .....
  }

  public async run() {
    // 某些数据库操作
    // this.mongoClient...update();
  }

  public async after() {
    // 断开mongodb
  }
}

如果没有进程工种概念,这100个任务每个任务都需要建立一个mongo连接,这样的开销是非常大的,理想状态下,只需要建立8个连接,后续的任务可以复用之前已有的连接。进程工种正是为了解决这个问题,进程工种把任务进行了细分,分为初始化过程执行过程,相同工种的初始化过程是可以复用的,上面问题的初始化过程就是建立mongo连接。详细的流程如下:

  1. 主进程启动工作进程后会优先执行before方法进行初始化,初始化后的进程可以被同一工种共享。
  2. 初始化完成后才会执行run方法真正开始处理任务。
  3. 任务完成后,告知主进程,主进程会检索待处理任务列表中是否有同一工种的任务,如果存在同一工种的任务,则会分配到该进程,不在执行before,直接执行run
  4. 主进程没有检索到同一工种的其它任务,则会让该进程进入退化期进行工种退化。
工种退化

每一个新的进程都会被第一个任务绑定工种(比如进程被绑定为A工种),以便后续的同工种任务进行环境复用。假如待处理中没有同工种的任务了,那剩余的B工种任务也无法获取到进程资源。所以需要引入工种退化的概念,工种退化是指当同一工种不存在未执行的任务时,进程主动释放掉初始化资源,重新回到未绑定工种的状态,进而被其它工种重新绑定的过程。 这样仍会面临一种问题,当任务是即时交叉出现的怎么办?A1执行完后,检测没有An了,于是被下一个任务B1绑定为工种B,此时新的任务A2被创建,B1执行完以后又要被重置为A工种。这样就又回到了最初开销大的问题上。 所以只有退化还不够,还需要引入工种退化期的概念,工种退化期可以在延迟进程的退化,尽可能的减少环境切换,可以在创建进程池指定工种退化期的大小:

// 指定退化期为3000毫秒
const processPool = new ProcessPool(8, 3000);
Agent进程

如果把主进程比喻成老板,工作进程比喻成员工,那Agent进程则为老板的秘书。Agent进程可以用来处理一些定时任务等,它与常规任务并没有本质上的区别。

进程通信

工作进程之间的通信可以在工作进程中使用this.sendToWorker()进行广播。而工作进程与Agent进程可以调用this.sendToAgent(name)一对一通信.

主从之间的通信,依旧使用原始的api,例如主进程给工作进程发送通知使用 worker.send(....),工作进程给主进程发送通知使用process.send(...),秉承少即是多的理念,在模块仅对收发消息的格式进行了规范:

{
  "type": string,
  [key: string]?: any,
}

type为通信路由,在消息体中必填,消息体中其它参数可作为传参任意添加。模块内部定义使用了一些type类型,在使用是应特别注意:

type 说明
task 主进程将任务分配给工作进程
end 工作进程接受到该通知后会直接退出
finish 工作进程完成(或发生异常导致任务失败)当前任务,向主进程通知
error 工作进程发生异常,上报给主进程
check 保留选项,主进程发送通知要求工作进程上报进度
status 保留选项,所有的工作进程每隔一段时间就上报一下自己当前任务的进度

主进程接受工作进程消息时,可以选择cluster模块原始的通信监听,比如:

cluster.on('message', (worker, message) => {
  // 解析message中的内容,由于所有的消息要在这里处理,会用到很多的判断条件,可读性低
});

模块中提供了更简单的方式,可以让代码不那么臃肿:

// 主进程监听消息
processPool.messageEvent.on('example', ({ pid, msg }, worker) => {
  console.log(pid, msg);
});

// 工作进程发送消息
process.send({ type: 'example', pid: process.pid, msg: 'hello' });
任务重试机制

任务在执行过程中如果出现了未捕获的异常导致执行失败,那该任务会重新进行分配,这样的流程默认会重复三次,超过三次后任务会被放置在errorTaskList中留存,不再被处理。 如果想改变重试次数,可通过修改retryCount属性来实现:

// 重试次数改为5次
processPool.retryCount = 55;
工作进程的生命周期

如果工作进程长时间处于空闲状态,那么它会一直占用着系统资源,如果每次任务完成后就杀死进程,新任务创建进程,也会造成系统资源的浪费,为了解决这个问题,需要引入生命周期的概念。 进程空闲一段时间后,会被销毁释放资源,生命周期默认是10秒钟,你可以在创建实例时指定这个值:

// 进程池上限为8,退化期为3秒,空闲进程生命周期为1秒
const processPool = new ProcessPool(8, 3000, 1000);

特别的,如果设置生命周期为0,那么将不会进行进程的销毁。

优雅的暂停任务

调用processPool.stop()后工作进程会在当前任务完成后进入空闲状态,不再接收新的任务,保证任务不被强制中断。通过processPool.start()可以回复执行状态。

Readme

Keywords

none

Package Sidebar

Install

npm i @fangjinlyx/processpool

Weekly Downloads

1

Version

0.1.9

License

ISC

Unpacked Size

37.9 kB

Total Files

10

Last publish

Collaborators

  • jingzhe