Our companies provide resourceful technical solutions, built with much enthusiasm. Embracing open source technologies, we hope to contribute, or at least share the fun. Company ViriCiti processes sensor data originating from electric city buses and trucks using Node.js as a core technology. Data outflow from our in-vehicle computers streams over the mobile network to our server framework intake points.
In my previous post I touched the surface on using streams in Node.js. We’ve implemented stream-combine, which zips multiple object streams together using their common key (i.e. time) per object, while respecting chronological order. The module stream-combine consists of two interesting parts:
  1. Data outflow This is where stream-combine implements beeing a Readable stream, taking the data intake as an underlying source and creating an outflow stream that is easily piped to various destination streams.
  2. Data intake This is basically the algorithm that sorts the various source streams, and combines them into one.
This post will handle data outflow only, and leaves data intake for a next post. I will describe data flow in general, and how to implement data flow without stagnation and data accumulation.

Push- and pull-based streams

Streams can be connected like garden hoses, even between separate processes. In that respect, the stream-combine’s output needs to respect a subsequent stream in terms of its maximum throughput. When one of the streams in the chain is a TCP socket (i.e from your phone to a server), maximum throughput could also be variable and needs to be throttled dynamically. Varying data throughput The image above shows a set of chained streams (via a.pipe(b).pipe(c)) through which data flows from left to right. The bottleneck is a TCP socket, with varying throughput that doesn’t always match up with the speed at which data is supplied. Node.js provides flow-control in its stream API with the possibility for two flow mode settings:
  1. push-based a.k.a. flowing mode
  2. pull-based a.k.a. paused mode
In push-based data flow, a stream tries to exhaust its underlying resource as fast as it can. So if you want to parse the contents of a file or a HTTP request as fast as possible, this is the mode you’ll want to use. Setting a data listener like stream.on("data",  (data) -> ...) or calling stream.resume() will activate the flowing-mode. Now, it is your responsibility to manage the memory usage of the total data inflow. This could become nasty:
A push-based stream, overflowing a coffee cup

A push-based stream going wrong…

With pull-based data flow, every time data is pushed down the line, each stream will signal one node up-stream if it can handle more data. This means that when implementing streams, it is strongly advised to really stop pushing more data downstream, when the subsequent stream tells you so. Else, the stream’s internal buffer will fill up your heap memory, until something bad happens… So, let’s try to not to disregard this advice in our implementation! varying flow - pull and push Consider the two flow modes in the image above, displaying chained streams and data flowing from left to right. In both cases, it is clear that the input of data can be larger then the bottleneck (middle piece) can handle.
  1. In the first case, data is pushed without any respect for subsequent stream throughput. Therefore, internal stream buffers will quickly deplete and RAM will be used to account for the overflow. This can lead to a problem when the supply of input data remains high.
  2. In the second case, the reduced throughput of the bottleneck is communicated upstream, all the way up to the source. This results into a pull-based flow where possible RAM fill-up is eliminated. The upstream feedback is provided by the result of each push of data, signalling back-pressure: internal buffers are filled up.

Stream-combine as a data source

The module stream-combine is basically created to merge time-based data from multiple sources. By looking at a common attribute (i.e. time), it can pause/play its sources in order to create a unified chronological stream. The algorithm that does all the number crunching and creates the objects to be pushed down the line, is like a conveyor belt that can be started and stopped at will. The algorithm will be discussed in a follow-up post, whilst we will now focus on stream-combiner as a data source, thus implementing a Readable stream. As you can read in the API for stream implementers, when extending a Readable stream, only the _read() function will have to be implemented. When the _read() function is being called by the subsequent Writable stream, you know that it is ready to receive some more data for processing. From that moment on, we will turn our conveyor belt on and use push() to push our available data until the subsequent writable stream signals it has had enough. In the figure above, this signaling is designated as upstream feedback. Simply put, when the push() function returns a false, you are really helping by stopping the conveyor belt process and prevent buffering on the heap memory. If your implementation does not comply, data will accumulate in RAM, basically defying the whole purpose of pull-based data flow in the first place. This concept is illustrated in the image below: Readable stream illustrated as a conveyor belt When the writable stream pulls data from the internal buffer fast enough; no problem. When the speed of the writable stream stagnates, the internal buffer will alternately fill up and deplete and switch the conveyor belt on and off accordingly.

Code example

The implementation of a Readable stream for stream-combine as described in the API, is shown in the code example below:
{ Readable } = require "stream"

class StreamCombine extends Readable
  constructor: ->
    @_busy = false

    # ... more instantiation logic

  _read: ->
    return if @_busy

    @_busy = true
    @retrieveMoreData()

  retrieveMoreData: ->
    @doSomeWork (error, newData) =>
      @emit "error", error if error

      # push it down the line!
      pushMore = @push(newData)

      # check received upstream signal
      if pushMore
        @retrieveMoreData()
      else
        @_busy = false

  doSomeWork: (cb) ->
    # ... algorithm goes here

module.exports = StreamCombine
Now, let’s go over the code:
  • class StreamCombine extends Readable The class StreamCombine is extended from stream.Readable. Easy.
  • constructor: -> Here, the _busy property is set. This will tell if the conveyor belt is already running.
  • _read: -> This is the obliged _read() implementation. The switch _busy will bounce continuous _read() calls from the writable stream, while the conveyor belt (processing) is busy. Unless already running, it will be started again.
  • retrieveMoreData: -> This function is basically a wrapper function to start the recursive and asynchronous doSomeWork() function (start the conveyor belt). Once newData is available, it is being pushed to the stream’s internal buffer by push(). This returns a pushMore flag, that tells us whether the internal buffer is full (the highWaterMark is reached) or not. If pushMore is
    • true → we can simply continue doing more work by calling retrieveMoreDate() again.
    • false → the subsequent stream did not yet request for more data. We will have to stop retrieving more data, unset the _busy state and wait for the next _read() to be called.

Winning again!

Due to the might and magic of the Node.js stream API, we have created a source stream, that will respect data throughput down the line by measuring back-pressure. Implementing the rules for a pull-based data flow leaves us with simple chaining of subsequent streams and provides us with a highly efficient data processing pipeline!