knowledge-base

Streams and Networking

These are notes based on the Frontend Masters course by James Holiday (aka Substack). Link here

Networking, servers, and clients

Networking and Packets

Protocols and Ports

Language that computes programs speak to each other, examples of protocols:

Most services have (often) one or many default ports. A computer can have many services, ports differentiate between the services on a system. (range 1 - 65535). We can have a service listen to any port, but we have custom, default assignments.

By default, systems can only listen to ports below 1024 as the root user.

Servers and Clients

Netcat

HTTP and Headers

HTTP Post

Taken the following response

    HTTP/1.1 200 OK
    Date: Mon, 12 Jan 2015 06:37:51 GMT
    Connection: keep-alive
    Transfer-Encoding: chunked <- Gonna send body in chunks, send links for chunks. Server doesn't know in advance how long the response will be.
    
    3  <-- Hex value of the cuncked part
    oi <-- payload
    4  <-- Hex value of the cuncked part
    ok <-- payload
    
    0 <-- End, finished

Curl

    $ curl -s http://substack.net        <-- Do GET and print body
    $ curl -i http://substack.net        <-- Print body and headers
    $ curl -I http://substack.net        <-- Print only headers
    # -s gets rid of progress output
    # Use -X ti set the HTTP VERB and -d for form paramters
    $ curl -X POST http://substack.net       
    $ curl -X POST http://substack.net -d title=whatever -d date=10000
    # You can set headers with the -H flag
    $ curl http://substack.net -H 'Content-Type: application/json'
    

SMTP

IRC

Another text based protocol. So HTTP, SMTP and IRC are all plain text protocols that just follow a certain text layout and a port to listen on.

Binary Protocols and Inspecting Protocols

Streams

Node.js has a handy interface for shuffling data around called streams.

Stream Origins

We should have some ways of connecting programs like garden hose screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also. Doug McIlroy, October 11, 1964

Thinks also of how we pipe in *nix systems between programs.

Why Streeams?

Composition

Just like how in unix we can pipe commands together, we can pipe streams together

    $ cat file > jq -name '.age'> ...

Simple example

a nodejs equivalent what $ cat does.

    const fs = require('fs');

    fs.createReadStream(process.argv[2])
      .pipe(process.stdout);

Transform data Example

    const fs = require('fs');
    const through = require('through2');

    fs.createReadStream(process.argv[2])
      .pipe(through(toUpper))
      .pipe(process.stdout);
    
    function toUpper(buf, enc, next){
        // buf is a binary description of the data
        // Output should be a buffer or string
        next(null, buf.toString().toUpperCase())
    }

Read from a file

    const through = require('through2');

    process.stdin
      .pipe(through(toUpper))
      .pipe(process.stdout);
    
    function toUpper(buf, enc, next){
        // buf is a binary description of the data
        // Output should be a buffer or string
        next(null, buf.toString().toUpperCase())
    }

With Node Core

    const { Transform } = require('stream');
    const toUpper = new Transform({
        transform: function(buf, enc, next) {
          next(null, buf.toString().toUpperCase())
        }
        // ... and other hooks
    })

    process.stdin
      .pipe(toUpper)
      .pipe(process.stdout);
  

flush is what happens when a stream finishes.

Bit on package through2

With through there are 2 parameters: write and end. Both are optional.

through(write, end)

Call next() when you’re ready for the next chunk. If you don’t call next(), your stream will hang!

Call this.push(VALUE) inside the callback to put VALUE into the stream’s output.

Use a VALUE of NULL to end the stream. This can happen when you need to buffer a certain amount of bytes before you can do something. If you need 100 Byte, you keep doing this.push(VALUE) on each chunk received and just call next() until your buffer size is big enough and take proper actions.

Concat stream

npm install concat-stream

Concat-steam buffers up all the data in the stream:

    const concat = require('concat-stream');
    process.stdin.pipe(concat(function( body) {
        console.log(body.length);
    }))

You can only write to a concat-stream, You can’t read from a concat-stream. Keep in mind that all data will be in memory.

GOOD TO KNOW : When you are listening to a STDIN, it will keep taking input till it receives a CTRL + D.

Example of basic HTTP Server with concat stream

    const concat = require('concat-stream');
    const through = require('through2');
    const http = require('http');
    const qs = require('querystring');
    const SIZE_LIMIT= 20;
    
    var server = http.createServer(function(req, res) {
        req
            .pipe(counter())
            .pipe(concat({encoding: 'string'}, onBody));
    
    
        function counter() {
           var size = 0;
           return through(function(buf, enc, next) {
             size += buf.length;
             if(size > SIZE_LIMIT) {
                 next(null, null);
             }else{
                 next(null, buf);
             }
           })
        }
        
        function onBody (body){
            var params = qs.parse(body);
            console.log(params);
            res.end('ok\n');
        }
    });
    server.listen(5000);

Stream Types

Simple VPN with password

echo.js

    const net = require('net')
    net.createServer(function (stream) {
      stream.pipe(stream)
    }).listen(5000)

vpn.js

    const net = require('net')
    const crypto = require('crypto')
    const pump = require('pump')
    const pw = 'abc123'
    
    net.createServer(function (stream) {
      pump(
        stream,
        crypto.createDecipher('aes192',pw),
        net.connect(5000,'localhost'),
        crypto.createCipher('aes192',pw),
        stream,
        function (err) {
          console.error(err)
        }
      )
    }).listen(5005)

vpn-client.js

    const net = require('net')
    const crypto = require('crypto')
    const pw = 'abc123'
    
    var stream = net.connect(5005,'localhost')
    process.stdin
      .pipe(crypto.createCipher('aes192',pw))
      .pipe(stream)
      .pipe(crypto.createDecipher('aes192',pw))
      .pipe(process.stdout)

Object Streams

Normally you can only read and write buffers and strings with streams. However, if you initialize a stream in objectMode, you can use any kind of object (except for null):

    // This can be also done with native modules
    const through = require('through2')
    const tr = through.obj(function(reow, enc, next) {
      next(null, (row.n * 1000) + '\n')
    })
    tr.pipe(process.stdout)
    tr.write({n : 5})
    tr.write({n : 10})
    tr.write({n : 3})
    tr.end();

When piping a object stream, the consuming stream should also be able to do objectMode.

Core Streams

APIs

Many of the APIs in node core provide stream interfaces:

Child Process also uses streams

    const { spawn } = require('child_process');
    const ps = spawn('grep', ['potato']);
    ps.stdout.pipe(process.stdout); // We pipe the output of the child process to our stdout
    ps.stdin.write('cheese\n');
    ps.stdin.write('potato\n');
    ps.stdin.end();

HTTP core streams

    // We receive a request
    // req: readable, res:writeable
    http.createServer((req, res) => ({}))
    
    // We make a request
    // req: writeable, res:readable
    var req = http.request((res) => ({}))

Crypto Streams

    const { createHash } = require('crypto');
    
    process.stdin
        .pipe(createHash('sha512', { encoding : 'hex' }))
        .pipe(process.stdout);

Don’t forget, when you run this, to use CTRL + D to get the hash. It basically says, pull my shit.

Zlib core streams

Split2 use case

Split input on newlines

    const split = require('split2');
    const through = require('through2');
    
    let count = 0;
    process.stdin
        .pipe(split()) // This splits on new lines
        .pipe(through(write, end)); // Now we increase the count per chunk (each being a new line) and log total count
    
    function write(next) {
      count++;
      next();
    }
    
    function end() {
      console.log(count);
    }

Web Socket

Websocket streams

Streaming websockets in node and the browser.

    const http = require('http');
    const ecstatic = require('ecstatic');
    const through = require('through2');
    
    var server = http.createServer(ecstatic(__dirname + '/public')
    server.listen(3000);
    
    const wsock = require('websocket-stream');
    wsock.createServer({server}, function (stream) {
        // stream is a duplex stream
        stream.pipe(loud()).pipe(stream);
    })
    
    function loud () {
        return through(function(bug, enc, next){
            next(null, buf.toString().toUpperCase());
        });
    }

Websocket Node Client

    const wsock = require('websocket-stream');
    const stream = wsock('ws://localhost:500');
    process.stdin.pipe(stream).pipe(process.stdout);

Stream Modules

Collect Stream

Collect a stream’s output into a single buffer. Useful for unit tests. For object streams, collect output into an array of objects.

    const collect = require('collect-stream');
    const split = require('split2');
    
    const sp = process.stdin.pipe(split(JSON.parse))
    collect(sp, function(err, rows){
        if(err) console.error(err);
        else console.log(rows)
    })

from2

Create a readable stream with a pull function. Reminds me a bit of a generator. (Enumeration)

    const from = require('from2');
    const messages = ['hello', 'world\n', null];
    
    from(function(size, next) {
      next(null, messages.shift())
    }).pipe(process.stdout);

to2

Create a writable stream with a write and flush function.

    const to = require('to2');
    const split = require('split2');
    
    process.stdin
        .pipe(split())
        .pipe(to(function(buf, next) {
            console.log(buf.length)
            next();
        }))

Duplexify

A logger example

    const duplexify = require('duplexify');
    const mkdirp = require('mkdirp');
    const fs = require('fs');
    
    module.exports = function (name) {
        const d = duplexify();
        mkdirp('logs', function(err) {
          const w = fs.createWriteStream('logs/' + name + '.log');
          d.setWriteable(w);
        })
        return d;
    }
   

Usage example

    cont log = require('./logger.js');

    const stream = log('myname');
    stream.write(Date.now() + '\n');
    stream.end();

Errors

Streams are also even emitters. So errors can be caught with error listeners.

Pump

Pump can pipe streams on each other, but gently handles errors.

    const pump = require('pump');

    pump(stream1, stream2, stream3, function onError() {});

Pumpify

Unlike pump, you get back a stream you can write to and from.

End-of-stream

Reliably detect when a stream is finished. This package is aware of all the obscure ways streams can end.

    const onend = require('end-of-stream');
    const net = require('net');
    
    const server = net.createServer(function(stream) {
      const iv = setInterval(() => {
          stream.write(Date.now() + '\n');
      }, 1000);
      onend(stream, function onEndedOrErrorsOut(){
          clearInterval(iv);
      })
    })
    server.listen(5000);

Remote Procedure Call and Multiplex

RPC-Stream

Call methods defined by a remote endpoint.

Multiplex

Pack multiple streams into a single stream.