pipage

1.0.2 • Public • Published

The Pipage

npm npm license npm downloads build status build status

pip•age

  • n. plumbing, a system of pipes
  • n. node module, a splice-able stream pipeline

Install via npm

$ npm install --save pipage

Usage

For detailed API documentation, see doc/

var Pipage = require('pipage')

The Pipage is a duplex stream, inheriting from Node's stream.Duplex and behaves in the same way, while containing an internal pipeline which can be added to, removed from and spliced at runtime:

readable -> pipage[ transform, transform, ... ] -> writable

Creating a pipeline

// A blank pass-through pipeline:
var pipeline = new Pipage()
 
// Initialized with already existing streams:
var pipeline = new Pipage([ transform1, transform2, ... ])

Adding streams

// Streams can be appended,
pipeline.append( stream )
pipeline.append( stream1, stream2, ..., streamN )
 
// prepended,
pipeline.prepend( stream1, stream2, ..., streamN )
 
// inserted at a specific index,
pipeline.insert( 3, stream1, ..., streamN )
 
// which is the same as splicing in streams (add N streams at index 3):
pipeline.splice( 3, 0, stream1, ..., streamN )

Removing streams

// Streams can be shifted off the beginning,
var firstStream = pipeline.shift()
 
// or popped off the end,
var lastStream = pipeline.pop()
 
// spliced out at a specific index (remove 3 from index 2),
var removedStreams = pipeline.splice( 2, 3 )
 
// or removed by reference:
pipeline.remove( stream )

Selecting streams

// Get a stream at a specific index in the pipeline:
var stream = pipeline.get( 2 )
var lastStream = pipeline.get( -1 )
 
// Find a stream in the pipeline:
var index = pipeline.indexOf( stream )
var lastIndex = pipeline.lastIndexOf( stream )

Example

var Pipage = require('pipage')
var path = require('path')
var zlib = require('zlib')
var unbzip2 = require('unbzip2-stream')
var xz = require('xz')
 
var pipeline = new Pipage()
 
switch( path.extname( filename ) ) {
  case '.gz':
    pipeline.prepend( zlib.createUnzip() );
    break
  case '.bz':
  case '.bz2':
    pipeline.prepend( unbzip2() );
    break
  case '.xz':
    pipeline.prepend( new xz.Decompressor() )
    break
}
 
fs.createReadStream( filename )
  .pipe( pipeline )
  .pipe( fs.createWriteStream( destination ) )

Events

Error handling

Errors from streams within the pipeline are listened to, and re-emitted on the pipeline itself, with an additional .stream property being set on the error object, which is the stream that emitted it:

pipeline.on( 'error', function( error ) {
  // This error originated from the pipeline's internal stream
  // available as `error.stream`, not from the pipeline itself
  if( error.stream ) {
    // ...
  } else {
    // This error came from the pipeline itself
  }
})

Binding to contained stream's events

var stream = pipeline.get(-1)
 
// Re-emit a stream's events on the pipeline:
pipeline.bind( stream, 'eventname' )
pipeline.bind( stream, [ 'someevent', 'otherevent' ])
 
// Remove re-emission of a stream's event:
pipeline.unbind( stream, 'eventname' )
 
// Stop re-emission of all of stream's events on the pipeline:
pipeline.unbindAll( stream )
Example
var Pipage = require('pipage')
 
// Let's say we have a stream which emits an event
// we want to capture without having to get a reference to
// that particular stream:
module.exports = function createPipeline() {
 
  var checksumStream = createChecksumStream( 'sha256', 'md5' )
  var pipeline = new Pipage([ checksumStream ])
 
  // This will cause the pipeline to re-emit
  // the 'checksums' event from the `checksumStream`
  pipeline.bind( checksumStream, 'checksums' )
 
  // Add some more fancy things to the pipeline...
 
  return pipeline
 
}
var createPipeline = require('./create-pipeline')
var pipeline = createPipeline()
 
// Now we can listen for the bound event directly on the pipeline
pipeline.on( 'checksums', function( checksums ) {
  // Validate the checksums, etc...
})
 
fs.createReadStream( filename )
  .pipe( pipeline )
  .resume()

Nested Pipelines

Since pipelines are duplex streams, and contain duplex streams, they can be nested arbitrarily:

var pipeline = new Pipage([
  new Pipage(),
  new Pipage([
    new Pipage()
  ])
])

Package Sidebar

Install

npm i pipage

Weekly Downloads

1

Version

1.0.2

License

MIT

Last publish

Collaborators

  • jhermsmeier
  • jviotti
  • balena.io