bs-better-stream
Setup
npm i -save bs-better-stream
const Stream = require('bs-better-stream');
let myStream = ;myStream;myStream;myStream;
Overview
chaining
myStream ;
accumulation
you can use streams before and after you have began writing to them. See .clean().
let myStream = ;myStream;myStream;myStream;// will print 11, 21, 31, 12, 22, 32
write (...values)
myStream;
write array (...[array of values])
myStream;
writePromise (...promises)
let promise1 = Promise;let promise2 = Promise;myStream;// myStream.outValues equals ['i hate `.then`s', {rejected: 'rejections r wrapped', isRejected: true}]
writePromiseSkipOnReject (...promises)
let promise1 = Promise;let promise2 = Promise;myStream;// myStream.outValues equals ['i hate `.then`s']
each (handler)
myStream;
map (handler)
myStream;
filter (predicateHandler)
myStream;
filterCount (integer)
let outStream = myStream;myStream;// outStream.outValues equals [first, second, third]
filterIndex ([array of indices])
let outStream = myStream;myStream;// outStream.outValues equals [first, third, fourth]
filterEach (predicateHandler, truePredicateHandler, falsePredicateHandler)
let outStream = myStream;
filterMap (predicateHandler, truePredicateHandler, falsePredicateHandler)
let outStream = myStream;myStream;// outStream.outValues equals [300, -0, -1, 301, -2, 302]
branchMap (...predicate and map handlers)
let myStream = ;let outStream = myStream; // if item starts with 'b', prepend 'Banana'myStream;// outStream.outValues equals ['Apple at', 'Banana bat', 'Apple action', 'cat', 'Apple aaa']
let myStream = ;let outStream = myStream; // else, prepend, 'Other'myStream;// outStream.outValues equals ['Apple at', 'Banana bat', 'Apple action', 'Other cat', 'Apple aaa']
switchMap (switchHandler, ...case and map handlers)
let myStream = ;let outStream = myStream;myStream;/*[ 'i have a pet elephant', 'i have a pet flamingo', 'u have 51 pencils', 'u have 1235 pencils', 'his favorite color is blue', 'his favorite color is pink', 'other: sun' ]*/
let myStream = ;let outStream = myStream;myStream;/*[ 'i have a pet elephant', 'i have a pet flamingo', 'u have 51 pencils', 'u have 1235 pencils', 'his favorite color is blue', 'his favorite color is pink', { type: 'star', value: 'sun' } ]*/
unique ()
let outStream = myStream;myStream;// outStream.outValues equals [0, 1, 2, 3]
uniqueOn (keyName)
let outStream = myStream;myStream;// outStream.outValues equals [{age: 4231, name: 'Odysseus'},// {age: 4234, name: 'Helen'}]
uniqueX (handler)
let outStream = myStream;myStream;// outStream.outValues equals [{a: 1, b: 5}]
pluck (keyName)
let outStream = myStream;myStream;// outStream.outValues equals ['value']
wrap (keyName)
let outStream = myStream;myStream;// outStream.outValues equals [{key: 'value'}]
pick (...keyNames)
let outStream = myStream;myStream;// outStream.outValues equals [{name: 'myName', age: 'myAge'}]
omit (...keyNames)
let outStream = myStream;myStream;// outStream.outValues equals [{name: 'myName', age: 'myAge'}]
set (keyName, handler)
let outStream = myStream;myStream;// outStream.outValues equals [ { number: 5, otherNumber: 10, sum: 15 } ]
repeat (handler)
let outStream = myStream;myStream;// outStream.outValues equals [2, 2, 3, 3, 3, 3, 2, 2, 2, 2]
repeatCount (integer)
let outStream = myStream;myStream;// outStream.outValues equals [2, 2, 3, 3, 2, 2]
flatten ()
let outStream = myStream;myStream;// outStream.outValues equals [2, 3, 2, 4]
flattenOn (listKeyName, newKeyName)
myStream;myStream;let outStream = myStream;// outStream.outValues equals [{key1: 'value1', number: 1}, {key1: 'value1', number: 2}, {key1: 'value1b', number: 4}, {key1: 'value1b', number: 5}]
Why is flattenOn useful?
imagine we have a set of animals grouped by species
let animalSpecies = ;animalSpecies;animalSpecies;
without flattenOn
, we would need to do something like the following in order to obtain a flat list of animals
animalSpecies ;
but with flattenOn
, we can simply do the following
animalSpecies ;
join (...streams)
let outStream = myStream;myStream;stream1;stream2;// outStream.outValues equals [1, 2, 3, 4, 5, 6]
joinCollapse ()
myStream;outStream = myStream;stream1;stream2;stream3;// outStream.outValues equals [1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2]
product (rightStream, leftStreamIdKey, rightStreamIdKey, leftStreamSetKey)
let productStream = myStreamproductotherStream 'myId' 'otherId' 'other';myStream;myStream;myStream;otherStream;otherStream;otherStream;// productStream.outValues equals [{myId: 2, myValue: 200, other: {otherId: 2, otherValue: 20}},// {myId: 2, myValue: 201, other: {otherId: 2, otherValue: 20}},// {myId: 2, myValue: 200, other: {otherId: 2, otherValue: 21}},// {myId: 2, myValue: 201, other: {otherId: 2, otherValue: 21}}]
productX (rightStream, matchHandler, handler)
let productStream = myStream;myStream;myStream;myStream;otherStream;otherStream;otherStream;// myStream.outValues equals [{myId: 1, myValue: 100},// {myId: 2, myValue: 200, paired: true},// {myId: 2, myValue: 201, paired: true}]// otherStream.outValues equals [{otherId: 2, otherValue: 20, paired: true},// {otherId: 2, otherValue: 21, paired: true},// {otherId: 3, otherValue: 30}]// productStream.outValues equals [{sum: 220},// {sum: 221},// {sum: 221},// {sum: 222}]
Note that while product
modifies a copy of left stream's values, leaving left stream unmodified; productX
passes in the original values of left stream, allowing left stream to be modified by the handler as seen in the example above.
to (stream)
myStream;myStream;outStream;// outStream.outValues equals [1, 2, 3, 4]
wait (skipOnReject)
myStream;myStream;myStream;let outStream = myStream;myStream;myStream;myStream;myStream;// outStream.outValues equals ['stream', 'async', 'data', 'without needing', 'async/await', 'or .then', {rejected: 'rejected', isRejected: true}]
waitOn (key, skipOnReject)
myStream;myStream;let outStream = myStream;// outStream.outValues equals [{key1: 'value1', key2: 'value2'},// {key1: 'value2', key2: {rejected: 'rejectValue2', isRejected: true}}]
Why is waitOn useful?
imagine we have a set of users
let users = ;users;
and this api to obtain a user's shape
let { return Promise;};
without waitOn
, we would need to do something like the following in order to include every user's shape
users ;
but with waitOn
, we can simply do the following
users ;
waitOrdered (skipOnReject)
let resolve1 resolve2;let promise1 = resolve1 = resolve;let promise2 = resolve2 = resolve;myStream;let outStream = myStream;;;// outStream.outValues equals ['promise 1 resolved last', 'promise 2 resolved first']
waitOnOrdered (key, skipOnReject)
let resolve1 resolve2;let promise1 = resolve1 = resolve;let promise2 = resolve2 = resolve;myStream;let outStream = myStream;;;// outStream.outValues equals [{key: 'promise 1 resolved last', key: 'promise 2 resolved first'}]
skipOnReject paramater
passing true
as the last paramater to wait
, waitOn
, waitOrdered
, and waitOnOrdered
will ignore values which are rejected, similar to writePromiseSkipOnReject
myStream;myStream;let outStream = myStream;// outStream.outValues equals [{key1: 'value1', key2: 'value2'}]
otherwise, rejected promises are wrapped in a {rejected: <rejected value>, isRejected: true}
structure and written just like resolved promises, similar to writePromise
if (predicateHandler)
myStream;let ifStreams = myStream;// ifStreams.then.outValues equals [110, 130, 150]// ifStreams.else.outValues equals [10, 30, 50] console;ifStreamsthen;console;ifStreamselse;
split (predicateHandler, truePredicateHandler, falsePredicateHandler)
myStream.write({species: 'kitten', name: 'tickleMe'});
myStream.write({species: 'kitten', name: 'pokeMe'});
myStream.write({species: 'puppy', name: 'hugMe'});
myStream.split(
animal => animal.species === 'kitten',
kittenStream => kittenStream
.set('sound', () => 'meow')
.set('image', ({name}) => getRandomLolzCatImage(name)),
puppyStream => puppyStream
.set('sound', () => 'wuff')
.set('edible', () => true)
.each(dipInChocolate));
group (handler)
myStream;let species = myStream;// species.cats.outValues equals [{species: 'cat', name: 'blue'}, {species: 'cat', name: 'green'}]// species.dogs.outValues equals [{species: 'dog', name: 'orange'}] console;speciescat;console;speciesdog;
groupCount (integerGroupSize)
myStream;let groupStreams = myStream;// groupStreams.group0.outValues equals [20, 30, 40]// groupStreams.group1.outValues equals [50, 60, 70]// groupStreams.group2.outValues equals [80]
groupFirstCount (integerGroupSize)
myStream;let groupStreams = myStream;// groupStreams.first.outValues equals [10, 20, 30]// groupStreams.rest.outValues equals [50, 60, 70, 80] console;groupStreamsfirst;console;groupStreamsrest;
groupNCount (integerGroupSize, integerGroupCount)
myStream;let groupStreams = myStream;// groupStreams.group0.outValues equals [20, 30, 40]// groupStreams.group1.outValues equals [50, 60, 70]// groupStreams.rest.outValues equals [80, 90, 100, 110, 120]
groupIndex (...[lists of indices])
myStream;let groupStreams = myStream;// groupStreams[0].outValues equals [0]// groupStreams[1].outValues equals [10, 30, 50, 60]// groupStreams.rest.outValues equals [20, 40, 70, 80, 90, 100] console;groupStreams0;console;groupStreams1;console;groupStreamsrest;
batch (integerBatchSize)
myStream;let outStream = myStream;// outStream.outValues equals [[0, 10, 20, 30], [40, 50, 60, 70]]
batchFlat (integerBatchSize)
let outStream = myStream;myStream;// outStream.outValues equals []myStream;// outStream.outValues equals [0, 10, 20, 30]myStream;// outStream.outValues equals [0, 10, 20, 30, 40, 50, 60, 70]myStream;// outStream.outValues equals [0, 10, 20, 30, 40, 50, 60, 70]
generate (handler)
myStream;let outStream = myStream;// outStream.outValues equals [10, 11, 20, 40, 41, 80]
flatMap (handler)
myStream;let outStream = myStream;// outStream.outValues equals [11, 20, 41, 80]
throttle (integer)
myStream;let throttled = myStream;throttledstream ;
After calling throttled = stream.throttle(n)
, throtled.stream
will emit n
values initially. It will emit 1 more value each time throttled.next()
or throttled.nextOne()
are invoked, and m
more values each time throttled.next(m)
is invoked.
myStream;let throttled = myStream;// throttled.stream.outValues equals [1, 2]throttlednext2;// throttled.stream.outValues equals [1, 2, 3, 4]throttlednext2;// throttled.stream.outValues equals [1, 2, 3, 4, 5]myStream;// throttled.stream.outValues equals [1, 2, 3, 4, 5, 6]
Calling throttled = stream.throttle()
is short for calling throttled = stream.throttle(0)
, which results in a lazy stream. throttled.stream
will emit values only when throttled.next
is invoked.
myStream;let throttled = myStream;// throttled.stream.outValues equals [1, 2]throttled;// throttled.stream.outValues equals [1, 2, 3, 4, 5]myStream;// throttled.stream.outValues equals [1, 2, 3, 4, 5, 6, 7]
Calling throttled.unthrottle()
will allow all current and future values to pass through without throttling, and rendering throttled.next()
unnecessary.
clean()
myStream;let oneToSix = myStream;myStream;myStream;let fourToFive = myStream;// myStream.outValues equals [4, 5, 6];
disconnect()
myStream;let oneToThree = myStream;myStream;myStream;let oneToSix = myStream;
promise
myStream.promise
returns a promise that resolves when all already written values to the stream have resolved.
myStream;myStreampromise;
length
myStream.length
outValues
myStream.outValues