Nodes provides a framework for building type-safe data transformation graphs using Node.js streams.
Nodes provides an intuitive framework for constructing data transformation graphs using native Node.js streams. You can use the built-in library of commonly used data transformation Node
classes or implement your own.
- A type-safe graph-like API pattern for building data transformation graphs based on Node.js streams.
- Consume any native Node.js Readable, Writable, Duplex, or Transform stream and add it to your graph.
- Error handling and selective termination of inoperable graph components.
- Automatic message queueing in order to assist with handling of backpressure.
npm install @farar/nodes
A Node
is a component of a graph-like data transformation pipeline. Each Node
is responsible for transforming its input into an output that can be consumed by its connected Node
instances. By connecting Node
instances into a network, sophisticated graph-like data transformation pipelines can be constructed.
Please see the Streams Logger implementation.
new nodes.Node<InT, OutT>(stream, options)
-
<IntT>
The input into the stream. -
<OutT>
The output from the stream. -
stream
<stream.Writable | stream.Readable>
An instance of aWritable
,Readable
,Duplex
, orTransform
Node.js stream. -
options
<NodeOptions>
-
errorHandler
<(err: Error, ...params: Array<unknown>) => void>
An optional error handler that will be used in the event of an internal Error. Default:console.error
-
public node.connect(...nodes)
- nodes
<Array<T>>
An array ofNode<OutT, unknown>
to be connected to thisNode
.
Returns: <Node<InT, OutT>>
public node.disconnect(...nodes)
- nodes
<Array<T>>
An array ofNode<OutT, unknown>
to be disconnected from thisNode
.
Returns: <Node<InT, OutT>>
protected node._write(data, encoding)
- data
<InT>
Data to write to the writable side of the stream. - encoding
<NodeJS.BufferEncoding>
An optional Node.js encoding Default:utf-8
. Returns:<Promise<void>>
Config.errorHandler <ErrorHandler>
An optional error handler. Default: console.error
Config.debug <boolean>
Announce internal activities e.g., connectiing and disconnecting from Node
instances. Default: false
In order to implement a data transformation Node
, extend the Node
class and pass a Node.js stream.Writable
implementation to the super's constructor.
For example, the following StringToNumber
implementation will convert a numeric string to a number.
NB In this example,
writableObjectMode
andreadableObjectMode
are both set totrue
; hence, the Node.js stream implementation will handle the input and output as objects. It's important thatwritableObjectMode
andreadableObjectMode
accurately reflect the input and output types of yourNode
.
import * as stream from 'node:stream';
import { Config, Node } from '@farar/nodes';
export class StringToNumber extends Node<string, number> {
constructor(options: stream.TransformOptions) {
super(new stream.Transform({
...options, ...{
writableObjectMode: true,
readableObjectMode: true,
transform: (chunk: string, encoding: BufferEncoding, callback: stream.TransformCallback) => {
try {
const result = parseFloat(chunk.toString());
callback(null, result);
}
catch (err) {
if (err instanceof Error) {
callback(err);
Config.errorHandler(err);
}
}
}
}
}));
}
}
In this hypothetical example a type-safe Node
is constructed from a net.Socket
. The resulting Node
instance can be used in a data transformation graph.
import * as net from "node:net";
import { once } from "node:events";
net.createServer((socket: net.Socket) => socket.pipe(socket)).listen(3000);
const socket = net.createConnection({ port: 3000 });
await once(socket, "connect");
const socketHandler = new Node<Buffer, Buffer>(socket);
The Node
class has a _write
method that respects backpressue; when a stream is draining it will queue messages until a drain
event is emitted by the Node's stream. Your application can optionally monitor the size of the queue and respond appropriately.
If you have a stream that is backpressuring, you can increase the high water mark on the stream in order to mitigate drain events.
Reusing the same Node
instance can result in unexpected phenomena. If the same Node
instance is used in different locations in your graph, you need to think carefully about the resulting edges that are connected to both the input and the output of the Node
instance. Most of the time if you need to use the same class of Node
more than once, it's advisable to create a new instance for each use.
Nodes may be used in diverse contexts, each with unique requirements. Nodes should never throw if the API is used in accordance with the documentation. However, "phenomena happens"; hence, you may choose to handle errors accordingly!
Nodes defaults to logging its errors to process.stderr
. If your application requires that errors throw, you may set an errorHandler
on the Config
object that does that. Alternatively, your handler may consume the Error
and handle it otherwise.
import { Config } from "@farar/nodes";
Config.errorHandler = (err: Error) => {
throw err;
};
git clone https://github.com/faranalytics/nodes.git
cd nodes
npm install && npm update
npm test