node-oop-worker-pool is a NodeJS library for dealing with multi threaded programing in a OOP approach.
Use the package manager npm to install node-oop-worker-pool.
npm install node-oop-worker-pool
Use WorkerPool.runTask() run task in a new thread, if there are more tasks then available workers it will wait in the queue.
Example:
import { WorkerPool } from "node-oop-worker-pool";
import ComputeService from "./computeService";
class MainService{
async handleCPUIntensiveTask(dataToProcess: number[]){
// wait all workers to finish
const result = await Promise.all(dataToProcess.map((data: number)=>{
return WorkerPool.runTask(data, ComputeService.path);
}));
console.log("finish all tasks", result);
WorkerPool.destroy();
}
}
const mainService = new MainService();
mainService.handleCPUIntensiveTask([1,2,3,4,5,6,7,8,9,10,11]);
Must inherit from AbstractWorker, and implement the method runTask() the method receive the data from the main file, and will also return the result to the main file.
import { AbstractWorker } from 'node-oop-worker-pool';
export default class ComputeService extends AbstractWorker {
// optional way to expose the path to the main file
static path = __filename;
// when the worker starts, this function will be called automatically.
async runTask(data: number): Promise<number> {
console.log("start processing data: ", data);
// example of thread blocking task
for (let i = 0; i < 9999999999; i++);
// modified the data
const modifiedData = data * 2;
//return data to main file
return modifiedData;
}
}
import { chunkArray } from "node-oop-worker-pool";
const CHUNK_SIZE = 3;
const rawData : number[] = [1,2,3,4,5,6,7,8,9,10,11];
const dataToProcess = chunkArray(rawData, CHUNK_SIZE);
const result = await Promise.all(dataToProcess.map((data: number[])=>{
return WorkerPool.runTask(data, ComputeService.path);
}));
console.log("finish all tasks", result);
WorkerPool.destroy();
export default class ComputeService extends AbstractWorker {
static path = __filename;
async runTask(chunk: number[]): Promise<boolean[]> {
console.log("start processing data: ", chunk);
const modifiedChunk = data.map((num: number)=>{
return this.processData(num);
});
return modifiedChunk;
}
processData(data: number) : boolean{
if(!data){
return false;
}
for (let i = 0; i < 9999999999; i++);
return true;
}
}
import OS from 'os';
//default
WorkerPool.setTotalAvailableWorkers(OS.cpus().length);
//custom
WorkerPool.setTotalAvailableWorkers(30);
-
setTotalAvailableWorkers() - Change number of available workers.
-
destroy() - Terminate all workers, force end of all tasks and empty the queue.
-
chunkArray() - Allows you to split array to chunks, in order to handle large array using workers.