A lightweight streaming operations library for JS that provides a flexible pipeline-based approach to data processing. StreamOps leverages generators and async generators to create efficient data processing pipelines with built-in support for parallel processing, error handling, and state management.
npm install streamops
- Pipeline-based streaming operations: Build complex data processing pipelines with ease
- Async/sync generator support: Seamlessly mix sync and async operations
- Parallel processing: Process data concurrently with parallel branches
- State management: Share state across pipeline steps
- Configurable error handling: Robust error handling with timeouts
- Rich operator set: Comprehensive set of built-in operators
const createStreamOps = require('streamops'); const stream = createStreamOps(); // Array-style pipeline const pipeline = [ function* () { yield 1; yield 2; yield 3; }, stream.map(x => x * 2), stream.filter(x => x > 4) ]; // Process the stream for await (const item of stream(pipeline)) { console.log(item); // Outputs: 4, 6 }
const result = stream(function* () { yield 1; yield 2; yield 3; }) .map(x => x * 2) .filter(x => x > 4); for await (const item of result) { console.log(item); // Outputs: 4, 6 }
const pipeline = [ // Fetch and yield data async function* () { const response = await fetch('https://api.example.com/users'); const users = await response.json(); for (const user of users) { yield user; } }, // Transform data stream.map(user => ({ id: user.id, name: user.name, isActive: user.status === 'active' })), // Filter active users stream.filter(user => user.isActive), // Process in batches stream.batch(10) ]; for await (const userBatch of stream(pipeline)) { await processUserBatch(userBatch); }
const stream = createStreamOps({ timeout: 30000, // Overall pipeline timeout logLevel: 'info', // 'error' | 'warn' | 'info' | 'debug' yieldTimeout: 20000, // Max time between yields downstreamTimeout: 30000 // Max time without downstream consumption });
-
yieldTimeoutBehavior
: Controls timeout handling-
'warn'
: Log warning and continue (default) -
'yield-null'
: Yield null value and continue -
'cancel'
: Cancel pipeline -
'block'
: Stop yielding from timed-out step
-
const pipeline = [ riskyOperation, stream.catchError(error => { console.error('Operation failed:', error); // Handle error appropriately }), nextStep ];
const pipeline = [ longRunningOperation, stream.timeout(5000), // Fails if step takes > 5s stream.catchError(error => { if (error.name === 'TimeoutError') { // Handle timeout } }) ];
const { END_SIGNAL } = require('streamops'); const pipeline = [ sourceStream, stream.withEndSignal(function* (input) { if (input === END_SIGNAL) { yield* cleanup(); return; } yield processInput(input); }) ];
The accrue
operator collects all items before continuing:
const pipeline = [ source, stream.accrue(), // Collect all items stream.map(items => processItems(items)) ];
const pipeline = [ source, [ // Parallel branches [ // Nested parallel stream.map(x => x * 2), stream.map(x => x + 1) ], stream.filter(x => x > 10) ] ];
Results from parallel branches are merged in order.
Maintain state via 'this' context:
const pipeline = [ source, function* (input) { this.count = (this.count || 0) + 1; yield `${this.count}: ${input}`; } ];
-
map(fn)
: Transform each item using the provided functionstream.map(x => x * 2)
-
filter(predicate)
: Only allow items that match the predicatestream.filter(x => x > 5)
-
reduce(reducer, initialValue)
: Accumulate values, yielding intermediate resultsstream.reduce((sum, x) => sum + x, 0)
-
flatMap(fn)
: Map each item to multiple itemsstream.flatMap(x => [x, x * 2])
-
take(n)
: Limit stream to first n itemsstream.take(5) // Only first 5 items
-
skip(n)
: Skip first n itemsstream.skip(2) // Skip first 2 items
-
batch(size, options)
: Group items into arrays of specified sizestream.batch(3, { yieldIncomplete: true })
Options:
-
yieldIncomplete
: Whether to yield incomplete batches (default: true)
-
-
distinct(equalityFn)
: Remove duplicates using optional equality functionstream.distinct((a, b) => a.id === b.id)
-
mergeAggregate(options)
: Merge objects into arrays by keystream.mergeAggregate({ removeDuplicates: true, alwaysArray: true })
-
waitUntil(condition)
: Buffer items until condition is met// Wait for specific fields stream.waitUntil(['price', 'volume']) // Or custom condition stream.waitUntil(buffer => buffer.length >= 3)
-
bufferBetween(startToken, endToken, mapFn)
: Capture content between tokensstream.bufferBetween('', '', content => parse(content))
-
catchError(handler)
: Handle errors in the pipelinestream.catchError(err => console.error(err))
-
timeout(ms)
: Fail if processing takes too longstream.timeout(5000) // 5 second timeout
-
tap(fn)
: Execute side effects without modifying streamstream.tap(x => console.log('Saw:', x))
-
accrue()
: Collect all items before proceedingstream.accrue()
-
dam()
: Alias for accrue()
-
withEndSignal(fn)
: Mark a function/generator to receive end signalsstream.withEndSignal(function* (input) { if (input === END_SIGNAL) { // Handle end of stream yield* cleanup(); return; } yield process(input); })
StreamOps also provides a simplified interface for creating pipelines:
const { simple } = require('streamops'); // Create pipeline with injected operators const stream = simple( ({map, filter}) => [ [1, 2, 3, 4], map(x => x * 2), filter(x => x > 5) ] ); for await (const item of stream) { console.log(item); // Outputs: 6, 8 }
The simple interface automatically injects operators and handles pipeline creation.
Set logLevel in configuration:
const stream = createStreamOps({ logLevel: 'debug' // See all pipeline operations });
Use tap operator for debugging:
stream.tap(x => console.log('Value:', x))
-
Memory Leaks
- Use batch operator for large streams
- Consider accrue carefully
-
Timeouts
- Adjust timeout configuration
- Use appropriate yieldTimeoutBehavior
-
Backpressure
- Monitor downstreamTimeout warnings
- Use batch operator to control flow
MIT