@arrows/worker
Table of contents
Introduction
The library provides simple, promise-based API on top of the Node.js native worker_threads module. It allows you to use thread pools, and treat thread messages as simple promises, without worrying about underlying events.
Main benefits:
- no boilerplate,
- using worker pools is as simple as calling functions that return promises,
- worker pools are spawned and managed automatically,
- errors inside workers are automatically caught and passed as rejected promises.
The library has built-in type definitions, which provide an excellent IDE support.
The library works with:
- Node 10 LTS or higher - with
--experimental-worker
flag, - Node 12 LTS or higher - out of the box.
Quick example
Create worker file:
/* myWorker.js */
const { worker } = require("@arrows/worker")
/**
* Let's create our handler for a CPU-intensive task.
* The returned value will be passed as a promise to our caller
* (we don't have to if it's a fire-and-forget task).
*
* For example, let's calculate the sum of integers from 0 to provided value:
*/
const handler = (payload) => {
return Array.from({ length: payload }, (_, i) => i).reduce((a, b) => a + b)
}
/**
* We can now execute the `worker` function
* that will set up worker threads for us.
*
* We can pass custom configuration as a second argument,
* but in this case, we will use the default one (one worker per CPU).
*/
module.exports = worker(handler)
We can now use our worker wherever we need it, for example inside simple Express server:
const express = require("express")
const myWorker = require("./myWorker")
express()
.get("/sum/:end", async (req, res) => {
const payload = Number(req.params.end)
const sum = await myWorker(payload)
res.send({ sum })
})
.listen(3000)
That's it!
Installation
Via NPM:
npm i @arrows/worker
Via Yarn:
yarn add @arrows/worker
All modules can be imported independently (to reduce bundle size), here are some import methods (you can use either CommonJS or ES modules):
import worker from "@arrows/worker"
import { spawn } from "@arrows/worker"
import spawn from "@arrows/worker/spawn"
Rationale
Note: If you're not interested in implementation details, you can jump right into the usage section.
Starting from version 12 LTS, Node.js comes with a stable version of the worker_threads
module. It's a long-anticipated functionality that complements the existing cluster
module (multi-processes) with native multi-threading solution.
The built-in API is fairly low-level, and while being powerful, it is rather awkward to work with promises. That comes from the fact that worker communication is event-based, so we have multiple events that we want to marry with promises that can only resolve once. If we create a worker per request that fairly straightforward:
/* myWorker.js */
const { Worker, isMainThread, parentPort } = require("worker_threads")
if (isMainThread) {
module.exports = (payload) => {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename)
.once("message", resolve)
.once("error", reject)
worker.postMessage(payload)
})
}
} else {
parentPort.once("message", (payload) => {
// Do some CPU-intensive calculations.
// For now we will keep it simple:
const result = payload * 2
parentPort.postMessage(result)
})
}
const myWorker = require("./myWorker")
const main = async () => {
const result = await myWorker(7)
console.log(result) // -> 14
}
main()
In addition to being verbose, the code above has a big disadvantage - if we try to call the task many times (which, in case of a server route, can by many thousand times per second) the overhead of creating new worker on each call will slow down the application to the point that it became useless.
So what's the solution? Use raw events instead? We could try, but I cannot think of a clean way to do this when it comes to integrating it with other event-based code (server!).
Fortunately, there is a quite easy solution that takes advantages of two things:
- promises can be resolved from outside if we assign references to
resolve
andreject
to variables in the outer scope, - each message can be traced back to a corresponding promise if we add a unique identifier to each message.
We can also do this with many workers in a pool by adding some scheduling strategy. And the best part - we can abstract away all that complexity and present a clean, simple API.
That's the way that this library is implemented, we have:
- thread pools with workers selected by simple round-robin algorithm,
-
resolve
andreject
references stored in aMap
with unique message identifiers as keys, - all the above (and couple other things) completely opaque to the user.
Additionally, the goal of the library is to make simple things truly simple, and complex stuff bearable.
Usage
Single-file worker
A single-file worker is created by the worker
function. As the name suggests, it should be defined in a separate file, with only the code that is necessary for it to do its job. It should be your default choice (over work
+ spawn
pair), it has a couple of advantages:
- it automatically handles
isMainThread
logic, - it automatically sets the script path for the worker,
- has best IDE support - has generic type definition with god inference, thanks to having all logic in one place.
One disadvantage is that we cannot easily create an ad-hoc instance with different configs (different pool sizes or different shared data) - all is set up at once. In that case, you can use instructions from the next section.
Examples:
const { worker } = require("@arrows/worker")
const { writeFile } = require("fs")
/**
* Fire-and-forget handler that does not return anything.
* Could be used for processing and saving/sending non-critical data, etc.
*
* @param {number[]} payload - some data series
*/
const handler = (payload) => {
const stats = {
average: payload.reduce((a, b) => a + b) / payload.length,
min: Math.min(...payload),
max: Math.max(...payload),
// ...other stats
}
const fileName = `${Date.now()}.json`
const data = JSON.stringify(stats, null, 2)
writeFile(fileName, data, (error) => {
if (!error) {
console.log("Done!")
}
})
}
module.exports = worker(handler)
const myWorker = require("./myWorker")
const main = async () => {
myWorker([1, 2, 3, 4, 5, 6, 7, 8, 9])
}
main() // -> "Done!"
/*
Generated file:
{
"average": 5,
"min": 1,
"max": 9
}
*/
const { worker } = require("@arrows/worker")
const seedrandom = require("seedrandom")
/**
* A good example of utilizing worker threads
* - generating pseudo-random data.
*
* @param {Object} payload
* @param {string} payload.seed
* @param {number} payload.width
* @param {number} payload.height
*/
const generateWorld = ({ seed, width, height }) => {
const rng = seedrandom(seed)
const map = []
for (let i = 0; i < height; i++) {
const row = []
for (let j = 0; j < width; j++) {
const tile = rng() > 0.7 ? "💣 " : "🌳 "
row.push(tile)
}
map.push(row)
}
return map
}
module.exports = worker(generateWorld)
const myWorker = require("./myWorker")
const main = async () => {
const world = await myWorker({ seed: "hello", width: 15, height: 10 })
const render = world.map((row) => row.join("")).join("\n")
console.log(render)
}
main()
/*
Result:
🌳 🌳 🌳 💣 🌳 💣 💣 🌳 💣 🌳 🌳 🌳 🌳 💣 🌳
💣 🌳 💣 🌳 🌳 🌳 🌳 🌳 🌳 🌳 🌳 🌳 🌳 💣 🌳
🌳 🌳 💣 🌳 🌳 💣 💣 🌳 🌳 🌳 💣 🌳 🌳 🌳 💣
🌳 🌳 🌳 🌳 💣 🌳 🌳 🌳 💣 🌳 💣 🌳 🌳 🌳 💣
🌳 💣 🌳 🌳 🌳 💣 💣 🌳 🌳 🌳 💣 💣 🌳 🌳 🌳
💣 💣 🌳 🌳 🌳 🌳 💣 🌳 🌳 🌳 🌳 🌳 🌳 🌳 🌳
🌳 🌳 💣 🌳 🌳 🌳 💣 🌳 🌳 🌳 🌳 🌳 🌳 🌳 🌳
🌳 🌳 🌳 💣 🌳 🌳 🌳 🌳 💣 💣 🌳 🌳 💣 🌳 🌳
🌳 💣 💣 🌳 💣 💣 💣 💣 💣 🌳 🌳 💣 🌳 🌳 🌳
💣 🌳 🌳 💣 🌳 🌳 🌳 💣 🌳 🌳 💣 💣 🌳 🌳 🌳
*/
const { worker } = require("@arrows/worker")
/**
* Another example - complex mathematical operations.
*
* @param {number} iterations
* @returns {number} PI approximation
*/
const calculatePI = (iterations) => {
let pi = 0
for (let i = 0; i < iterations; i++) {
let temp = 4 / (i * 2 + 1)
pi += i % 2 === 0 ? temp : -temp
}
return pi
}
/**
* Note that in some cases you may want to decrease
* the number of workers in the pool.
* For example, if a system runs many application, full CPU load
* can be less performant (cost of switching between applications).
*/
module.exports = worker(calculatePI, { poolSize: 2 })
const myWorker = require("./myWorker")
const main = async () => {
const result = await myWorker(1000000)
console.log(result) // -> 3.1415916535897743
}
main()
Separating worker declaration and spawning
In some cases, you may want to prepare separate worker definition (via work
function) to spawn it multiple times with different shared data and pool size (via spawn
function).
It allows you to optimize execution time (shared data is serialized and deserialized once per each pool).
Example:
/* myWorkerDefinition.js */
const { work } = require("@arrows/worker")
const { validate } = require("json-schema")
/**
* Worker definition only, no export required.
*
* @param {Object} payload
* @param {Object} workerData
*/
const validateWithSchema = (payload, workerData) => {
return validate(payload, workerData).valid
}
work(validateWithSchema)
/* myWorkers.js */
const { spawn } = require("@arrows/worker")
const userSchema = {
$schema: "http://json-schema.org/draft-04/schema#",
type: "object",
properties: {
firstName: {
type: "string",
},
lastName: {
type: "string",
},
},
}
const postSchema = {
$schema: "http://json-schema.org/draft-04/schema#",
type: "object",
properties: {
title: {
type: "string",
},
content: {
type: "string",
},
},
}
/**
* Spawn separate worker pools for validating users and posts,
* with their own schemas as shared `workerData`.
*/
exports.validateUser = spawn("./myWorkerDefinition.js", {
workerData: userSchema,
poolSize: 4,
})
exports.validatePost = spawn("./myWorkerDefinition.js", {
workerData: postSchema,
poolSize: 3,
})
const { validateUser, validatePost } = require("./myWorkers")
const main = async () => {
const userValidationResult = await validateUser({
firstName: "John",
lastName: "Doe",
})
const postValidationResult = await validatePost({
title: "Worker Threads",
content: Date.now(),
})
console.log({ userValidationResult, postValidationResult })
// -> { userValidationResult: true, postValidationResult: false }
}
main()
Note: You can find an run all the examples listed above inside the /examples folder.
API reference
Index
Functions:
Arguments:
worker
Spawns workers and returns a function that handles messaging and returns responses as promises.
Combines spawn
and work
functions in a single file.
Parameters
-
handler
- Function that performs calculations inside worker threads (see more). -
config
- Configuration options (see more).
Returns: Async function that communicates with worker threads (see more).
Example
const { worker } = require("@arrows/worker")
const handler = (payload, workerData) => {
return /* Do some CPU-intensive task */
}
const config = {
poolSize: 2,
}
module.exports = worker(handler, config)
work
Defines a worker that can be later used with spawn
function.
Use when you want to separate worker definition from spawning a thread pool.
Parameters
-
handler
- Function that performs calculations inside worker threads (see more).
Returns: Nothing, just defines a worker for use with spawn
function.
Example
const { work } = require("@arrows/worker")
const handler = (payload, workerData) => {
return /* Do some CPU-intensive task */
}
work(handler)
spawn
Spawns a workers pool from worker defined in a separate file, returns a function that handles messaging and returns responses as promises.
Use when you want to separate worker definition from spawning a thread pool.
Parameters
-
fileName
- Path to a worker definition file created withwork
function. -
config
- Configuration options (see more).
Returns: Async function that communicates with worker threads (see more).
Example
const { spawn } = require("@arrows/worker")
const fileName = "./path/to/myWorkerDefinition"
const config = {
poolSize: 3,
}
module.export = spawn(fileName, config)
transfer
Wraps a worker handler so you can add a transfer list for the result. It is a separate function so the handler can be fully decoupled from transferring logic (that way you can use existing functions as handlers).
Works only if the result is an instance of a Buffer or a TypedArray.
Parameters
-
handler
- Handler function -
mapperFn
- Takes the result and should produce a transfer list.
Returns: A wrapped handler whose result will be moved rather than cloned.
Example
const { worker, transfer } = require("@arrows/worker")
const handler = (payload) => {
/* Do some CPU-intensive task */
return result // Result as a typed array
}
const wrappedHandler = transfer(handler, (result) => [result.buffer])
module.export = worker(fileName, config)
task
A task is a function returned by calling worker
or spawn
function,
it passes a payload to the workers' pool and returns a promise with the result.
Parameters
-
payload
- Payload that will be passed to automatically selected worker from a pool. -
transferList
- An optional array that contains references to ArrayBuffers whose memory should be moved, rather than cloned.
Returns: The result of running a worker handler on the provided payload (as a promise).
Examples
Using transferList
to move memory (payload) -> see example
Using transferList
to move memory (payload and return value) -> see example
Using SharedArrayBuffer
to share memory -> see example
handler
A handler is a user-defined function passed to worker
or spawn
function.
Errors thrown inside handlers will be automatically caught and converted to rejected promises.
The handler can be an async
function, can return a promise.
Parameters
-
payload
- Data received by worker. -
workerData
- Data shared between workers, set via config object.
Returns: Any value calculated by user (or void).
config
Config is an optional object passed to worker
or spawn
function,
that modifies default settings of the workers' pool.
Available options
-
poolSize
- size of the worker thread pool, must be greater than 0 (default: number of CPUs) -
workerData
- Data shared between workers, will be passed as a second argument of the handler.
Additionally, you can pass any of the standard worker options, see: worker threads docs.
License
Project is under open, non-restrictive ISC license.