async-json-stream

1.0.1 • Public • Published

async-json-stream

A comfortable async/await interface for json streams.

This package consits of two utilities to ease coding. JsonFaucet helps to create continues json streams that may be larger than would fit in memory. JsonDrain helps to parse such streams.

Both creating and parsing are possible to do directly without these tools, however that code may be more complicated to write and maintain.

Respects back-pressure of streams thus will pause generation or parsing if stream buffers are full or in case of parsing empty.

JsonFaucet

Example code:

const fs = require( 'fs' );
const { JsonFaucet } = require( 'async-json-stream' );

const run =
    async function( )
{
    const faucet = new JsonFaucet( { indent : '  ' } );
    faucet.pipe( fs.createWriteStream( 'mydump.json' ) );
    await faucet.beginDocument( );
    await faucet.attribute( 'version', 1 );

    await faucet.attribute( 'vocals' );
    await faucet.beginObject( );
    for( let v of [ 'a', 'e', 'i', 'o', 'u' ] )
    {
        await faucet.attribute( v );
        await faucet.beginObject( );
        await faucet.attribute( 'charChode', v.charCodeAt( 0 ) );
        await faucet.endObject( );
    }
    await faucet.endObject( );

    await faucet.attribute( 'digits' );
    await faucet.beginArray( );
    for( let d = 0; d < 10; d++ )
        await faucet.value( d );
    await faucet.endArray( );
    await faucet.endDocument( );
};

process.on( 'unhandledRejection', err => { throw err; } );
run( );

Produces:

{
  "version" : 1,
  "vocals" : {
    "a" : {
      "charCode" : 97
    },
    "e" : {
      "charCode" : 101
    },
    "i" : {
      "charCode" : 105
    },
    "o" : {
      "charCode" : 111
    },
    "u" : {
      "charCode" : 117
    }
  },
  "digits" : [
    0,
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9
  ]
}

Or without indent

{"version":1,"vocals":{"a":{"charCode":97},"e":{"charCode":101},"i":{"charCode":105},"o":{"charCode":111},"u":{"charCode":117}},"digits":[0,1,2,3,4,5,6,7,8,9]}

Interface

Full interface description for JsonFaucet

JsonDrain

Uses stream-json to parse JSON streams, but as JsonFaucet provides a more comfortable async/await interface. This way you can write parses like the example below handling the JSON dump from the JsonFaucet example above.

Again it would be possible to use stream-json directly, however it would result in writing state machines which are often difficult to maintain. In the example below the position where the progam is at is the "state machine".

The example is small enough, it would also fit as whole in memory. So here it would likely be simpler to just read in the JSON dump as a whole and parse it with JSON.parse( ) and then interpreting the resulting object. However, it's just an example.

Also the async/await flow should make it easier to handle the other side of hugh data streams -- whatever you may want to do with it -- and respecting back pressure, thus for example waiting until a database finished processed the inputs you sent.

const fs = require( 'fs' );
const { JsonDrain } = require( '..' );

function jerror( ) { throw new Error( 'invalid JSON dump' ); }

async function run( )
{
    let digits, version, vocals;

    const stream = fs.createReadStream( 'mydump.json' );
    const drain = new JsonDrain( stream );
    const msg = 'invalid JSON dump';

    const start = await drain.next( );
    if( start.object !== 'start' ) throw new Error( msg );

    while( true )
    {
        const chunk = await drain.next( );
        if( chunk.object === 'end' ) break;
        switch( chunk.attribute )
        {
            case 'version' :
                if( version ) jerror( );
                version = chunk.value;
                if( !version ) jerror( );
                continue;
            case 'vocals' :
                if( vocals ) jerror( );
                if( chunk.object !== 'start' ) jerror( );
                vocals = await handleVocals( drain );
                continue;
            case 'digits' :
                if( digits ) jerror( );
                if( chunk.array !== 'start' ) jerror( );
                digits = await handleDigits( drain );
                continue;
            default : jerror( );
        }
    }

    if( !version || !vocals || !digits ) jerror( );
    console.log( '* done' );
}

async function handleVocals( drain )
{
    while( true )
    {
        const chunk = await drain.next( );
        if( chunk.object === 'end' ) return true;
        const letter = chunk.attribute;
        if( chunk.object !== 'start' ) jerror( );
        const entry = await drain.next( );
        if( entry.attribute !== 'charCode' ) jerror( );
        console.log( '* got vocal "' + letter + '" having charCode: ' + entry.value );
        // awaiting end of entry
        const eoe = await drain.next( );
        if( eoe.object !== 'end' ) jerror( );
    }
}

async function handleDigits( drain )
{
    while( true )
    {
        const chunk = await drain.next( );
        if( chunk.array === 'end' ) return true;
        const digit = chunk.value;
        if( typeof( digit ) !== 'number' ) jerror( );
        console.log( '* got digit: ' + digit );
    }
}

process.on( 'unhandledRejection', err => { throw err; } );
run( );

Interface

Full interface description for JsonDrain

Readme

Keywords

none

Package Sidebar

Install

npm i async-json-stream

Weekly Downloads

1

Version

1.0.1

License

MIT

Unpacked Size

29 kB

Total Files

13

Last publish

Collaborators

  • axkibe