stream_reader

1.1.5 • Public • Published

stream_reader

stream_reader and pack_parser

Stream_reader handles continuous data blocks from sock or file. It will buffering the data blocks you pushed in. When the data reached the length you want, it contact data to a Buffer object and call back the function you registered.

Pack_parser pack or unpack nodejs Buffer with specified format.

stream_reader

Install

$ npm install stream_reader

Usage of stream_reader

stream_reader can be used to handle continuous data blocks from sock or file. It will buffering the data blocks you pushed in. When the data reached the length you want, it contact data to a Buffer object and call back the function you registered.

Create a stream rader

var StreamReader = require('stream_reader');
var sockStreamReader = new StreamReader();

Push data into reader

When data block come in from socket, file, etc., push into StreamReader

sock.on("data", function(data){ 
               //When socket data come in, push into StreamReader
        sockStreamReader.push(data);
    });

Register data reading function

You can register how long data you want, and when the buffering data reached the length, the callback function will bo called:

   //Read pack head with fixed length 9 bytes
   sockStreamReader.read(9, function(headData){
            console.info("Pack head received");
    });

and you can register multi functions as the following:

    var flag, type;
    sockStreamReader.read(4, function(flagData){
        flag = flagData;
        console.info("Pack flag received");
    }).read(1, function(typeData){
        type = typeData;
        console.info("Pack type received");
    })

Each reading function is called a reading task.

Check the StreamReader has reading task

After a data reading function be called, it will be erased from StreamReader. So, we maybe need check if the StreamReader has reading task by calling function taskEmpty().

sock.on("data", function(data){ 
    if(sockStreamReader.taskEmpty()){
        //If the sockStreamReader has no task any more, register pack head reading function again
        sockStreamReader.read(9, function(headData){
            console.info("Pack head received");
            ... //To read pack body ...
    });
               //When socket data come in, push into StreamReader
    sockStreamReader.push(data);
});

loopRead()

The StreamReader will buffering the data blocks until they reached the length you want, so, if you want read a large data chunk, it will occupy large memory, that's not a good idea. So you maybe need the this function: loopRead(totalLen, unitLen, cb). Calling this function will create a loop task in StreamReader, it will call cb function repetitiously when buffering data reached unitLen; and until totalLen being read, the task will finish. This is example for read 2M data, the reading unit is 2K:

var total = 0;
sockStreamReader.loopRead(1024*2048, 1024, function(data){
    var dataStr = data.toString();                        
    console.info("Long data unit come in: " + dataStr);
    total += data.length;
    if(total == (1024 * 2048))
        console.info("Long data finished!");
}

pack_parser

Install

$ npm install pack_parser

Usage of pack_parser

Pack_parser pack or unpack nodejs Buffer with specified format, is used to handle data package with special fomat.

Create a writer or reader

Writer is used to packaging data, and reader unpackaging data pack.

var PackParser = require('pack_parse');
var writer = PackParser.CreateWriter();
var Reader = PackParser.CreateReader(pack);

pack()

Package data to nodeJs Buffer:

var writer = PackParser.CreateWriter();
 
//Package 1 byte with value 3 and 4 bytes with value 12 to a nodejs Buffer
var buff = writer.byte(3).UInt32(12).bigEndian().pack();
console.log(buff);

Output as following:

<  03 00 00 00 0c>

unpack()

The function unpack() will unpackage nodeJs Buffer to a javascript object with fields you gave with field functions such as byte(), UInt32(), etc.

//buff value: < Buffer 03 00 00 00 0c >
var reader = PackParser.CreateReader(buff);
var ret = reader.byte('Field0').UInt32('Field1').bigEndian().unpack();
console.log(ret);

Output as following:

{  
  Field0: 3,
  Field1: 12  
}

Field functions

Now field functions include:

Int8(), int8(),  
UInt8(), byte(), uint8(),  
UInt16(), uint16(), ushort(), 
UInt32(), uint32(),  
Int16(), int16(), short(),  
Int32(), int32(),  
Float(), float(),  
Double(), double(),  
string(),  
fstring(),
buffer() .  

In these field functions:

Int8(), int8() are equivalent.  
UInt8(), byte(), uint8() are equivalent.  
UInt16(), uint16(), ushort() are equivalent.  
UInt32(), uint32() are equivalent.  
Int16(), int16(), short() are equivalent.  
Int32(), int32() are equivalent.  
Float(), float() are equivalent.  
Double(), double() are equivalent.

In writer, call field functions with the value you want to package into. In reader, call field functions with the field name.

string()

Field function string() in writer will pack/unpack a string beginning with 4 bytes string length into/from result buffer.

var pack = writer.string('1234567890123').bigEndian().pack();
console.log(pack);

Output as following:

< Buffer 00 00 00 0d 31 32 33 34 35 36 37 38 39 30 31 32 33 >

For a reader, function string() will read 4 byte string length(in bytes) firstly, and then read string body according to this length.

var out = reader.set(pack).string('str_name').bigEndian().unpack();
console.log(out);
 

Output as following:

{ str_name: '1234567890123' }

fstring()

fstring(string, length) means fixed length string.

writer.fstring(string, [fixedLength])
 
reader.fstring(fieldName, fixedLength)

Call this function in writer will package a fixed length(in bytes) string according to argument length. If string is shorter than length, zero(s) will be padding at the end of string. If string is longer than length, it will be truncated.

var pack = writer.fstring('1234', 10).bigEndian().pack();
console.log(pack);
 
var pack = writer.fstring('1234567890123', 10).bigEndian().pack();
console.log(pack);

Output as following:

< Buffer 31 32 33 34 00 00 00 00 00 00 >
< Buffer 31 32 33 34 35 36 37 38 39 30 >

For writer, if fixedLength is undefined, fixedLength is the string's length(in bytes); and for the the reader, fixedLength must be specified when calling this function.

buffer()

buffer() function likes fstring(), it pack/unpack a nodeJs Buffer object into/from result.

writer.buffer(Buffer, [fixedLength])
 
reader.buffer(Buffer, fixedLength)

bigEndian() & littleEndian()

bigEndian() & littleEndian() set number encoding mode: big endian or little endian.

setEncoding() & getEncoding()

setEncoding() & getEncoding() set/get string encoding mode such as 'utf8', 'ascii', 'hex', 'base64', etc., the more detail can reference to nodeJs docment for Buffer: Buffers and Character Encodings.

Example

A client and server communicat with the pack format as the following format:

    |------------------------------------------------------|
    | Flag(4 bytes) | Type(1 byte) | Length(4 byte) | Data |
    |------------------------------------------------------|
Flag: pack flag, must be string "Test"
Type:  1 -- Echo, 2 -- Long string data
Length: Length for data
Data: Data

Full example as a TCP client

//This is a full example for a tcp client using stream_reader and pack_parser package.
//The pack format is: 
//
//    |------------------------------------------------------|
//    | Flag(4 bytes) | Type(1 byte) | Length(4 byte) | Data |
//    |------------------------------------------------------|
//
// Flag: pack flag, must be string "Test"
// Type:  1 -- Echo, 2 -- Long string data
// Length: Length for data
// Data: Data
 
var net = require('net');
var PackParser = require('pack_parser');
var StreamReader = require('stream_reader');
 
var PACK_TYPE_ECHO = 1;
var PACK_TYPE_STR = 2;
 
var HOST = '127.0.0.1';
var PORT = 8888;
 
var client = new net.Socket();
 
var sockStreamReader = new StreamReader();
                      
client.connect(PORT, HOST, function() {
 
    console.log('CONNECTED TO: ' + HOST + ':' + PORT);
    
    //Build a echo pack to send
    var paserWriter = PackParser.CreateWriter().bigEndian();
    var dataStr = "Hello, world!"
    var outPack = paserWriter.fstring("Test").byte(PACK_TYPE_ECHO).UInt32(dataStr.length).fstring(dataStr).pack();
    console.log('Send ' + outPack.length + ' bytes TO: ' + HOST + ':' + PORT);
    client.write(outPack);
    
    var loopStr = "This is a test loop string";
    var dataLen = Buffer.byteLength(loopStr, paserWriter.getEncoding()) * 1024;
    
    //Pack and send head
    outPack = paserWriter.fstring("Test").byte(PACK_TYPE_STR).UInt32(dataLen).pack();    
    console.log('Send ' + outPack.length + ' head bytes TO: ' + HOST + ':' + PORT);
    client.write(outPack);
    
    //Send data
    for(var i = 0; i < 1024; i++){
        var data = new Buffer(loopStr, paserWriter.getEncoding());
        client.write(data);
    }
 
    setTimeout(function(){ //Close after 10 seconds
        
        client.end();
    }, 10 * 1000);
});
 
 
client.on('data', function(data) {
    
    //If no more task in StreamReader, register protocol handle body
    if( sockStreamReader.taskEmpty() )
        registerProtocolBody();
    
     //When socket data coming, push to StreamReader
    sockStreamReader.push(data);    
});
 
client.on('close', function() {
    console.log('Connection closed');
});
    
client.on("error",function(err){
    console.error('Error occurred:', err.message);
});
 
function registerProtocolBody(){
    
    sockStreamReader.read(9, function(headData){
 
        //Parse head to object head
        var parserReader = PackParser.CreateReader(headData).bigEndian();
        var head = parserReader.fstring('flag', 4).byte('type').UInt32('length').unpack(); 
 
        //Check flag field
        if(head.flag != "Test"){ 
            console.error("Bad pack flag" + head.flag);
            return;
        }
 
        switch( head.type ){
            case PACK_TYPE_ECHO:
 
                //Read data according head.length
                sockStreamReader.read(head.length, function(data){
 
                    var dataStr = data.toString();
                    console.info("Echo pack coming with string: " + dataStr);
                });
                break;
            default:
                console.error("Unrecongized pack type: " + head.type);                                
        }
    });
}

Full example as a TCP server

//This is a full example for a tcp server using stream_reader and pack_parser package.
//The pack format is: 
//
//    |------------------------------------------------------|
//    | Flag(4 bytes) | Type(1 byte) | Length(4 byte) | Data |
//    |------------------------------------------------------|
//
// Flag: pack flag, must be string "Test"
// Type:  1 -- Echo, 2 -- Long string data
// Length: Length for data
// Data: Data
 
var server = require('net').createServer();
var StreamReader = require('stream_reader');
var PackParser = require('pack_parser');
 
var PACK_TYPE_ECHO = 1;
var PACK_TYPE_STR = 2;
 
function newConn(sock)
{
    var sockStreamReader = new StreamReader();
    
    sock.on("data", function(data){ 
        
        console.info("Data come in: " + data.length + " bytes");
        
        //If no more task in StreamReader, register protocol handle body
        if( sockStreamReader.taskEmpty() )
            registerProtocolBody();
        
        //When socket data coming, push to StreamReader
        sockStreamReader.push(data);
                
    });
  
    sock.on("error",function(err){
        console.error('Error occurred:', err.message);
    });
    
    sock.on("close",function(){
        console.error('connection closed');
    });
    
    function registerProtocolBody(){
        
        //Read pack head with fixed length 9 bytes
        sockStreamReader.read(9, function(headData){
 
            console.info("Pack head received");
 
            //Parse head to object head
            var parserReader = PackParser.CreateReader(headData).bigEndian();
            var head = parserReader.fstring('flag', 4).byte('type').UInt32('length').unpack(); 
 
            //Check flag field
            if(head.flag != "Test"){ 
                console.error("Bad pack flag" + head.flag);
                return;
            }
 
            switch( head.type ){
                case PACK_TYPE_ECHO:
 
                    //Read data according head.length
                    sockStreamReader.read(head.length, function(data){
 
                        var dataStr = data.toString();
                        console.info("Echo pack coming with string: " + dataStr);
 
                        //Build a echo pack to send
                        var paserWriter = PackParser.CreateWriter().bigEndian();
                        var outPack = paserWriter.fstring(head.flag).byte(head.type).UInt32(data.length).fstring(dataStr).pack();
                        sock.write(outPack);
                    });
                    break;
 
                case PACK_TYPE_STR:
 
                    var total = 0;
                    sockStreamReader.loopRead(head.length, 1024, function(data){
 
                        var dataStr = data.toString();                        
                        console.info("Long data pack come in: " + dataStr);
                        
                        total += data.length;
                        if(total == head.length)
                            console.info("Long data finished!");
                        //So you can handle data as your wish
                    }); 
                    break;
            }
        });
    }
   
}
 
function createSrv(port){
    
    server.on('listening', function() {
        console.info('Server is listening on port ' + port);
    });
    
    server.on('connection', function(socket) {
        newConn(socket);
        console.info('Server has a new connection');
    });
    
    server.on('close', function() {
        console.info('Server is now closed');
    });
    
    server.on('error', function(err) {
        console.error('Error occurred:', err.message);
    });
    
    server.listen(port);    
}
 
 
createSrv(8888);
 

Package Sidebar

Install

npm i stream_reader

Weekly Downloads

8

Version

1.1.5

License

GPL-3.0

Last publish

Collaborators

  • west_luo