Our companies provide resourceful technical solutions, built with much enthusiasm. Embracing open source technologies, we hope to contribute, or at least share the fun. Company ViriCiti processes sensor data originating from electric city buses and trucks using Node.js as a core technology. Data outflow from our in-vehicle computers streams over the mobile network to our server framework intake points.
In my previous post
I touched the surface on using streams in Node.js. We’ve implemented stream-combine
, which zips multiple object streams together using their common key (i.e.
) per object, while respecting chronological order. The module stream-combine consists of two interesting parts:
- Data outflow
This is where stream-combine implements beeing a Readable stream, taking the data intake as an underlying source and creating an outflow stream that is easily piped to various destination streams.
- Data intake
This is basically the algorithm that sorts the various source streams, and combines them into one.
This post will handle data outflow only, and leaves data intake for a next post. I will describe data flow in general, and how to implement data flow without stagnation and data accumulation.
Push- and pull-based streams
Streams can be connected like garden hoses, even between separate processes. In that respect, the stream-combine’s output needs to respect a subsequent stream in terms of its maximum throughput. When one of the streams in the chain is a TCP socket (i.e from your phone to a server), maximum throughput could also be variable and needs to be throttled dynamically.
The image above shows a set of chained streams (via
) through which data flows from left to right. The bottleneck is a TCP socket, with varying throughput that doesn’t always match up with the speed at which data is supplied.
Node.js provides flow-control in its stream API
with the possibility for two flow mode settings:
- push-based a.k.a. flowing mode
- pull-based a.k.a. paused mode
In push-based data flow, a stream tries to exhaust its underlying resource as fast as it can. So if you want to parse the contents of a file or a HTTP request as fast as possible, this is the mode you’ll want to use. Setting a data listener like
stream.on("data", (data) -> ...)
will activate the flowing-mode
. Now, it is your responsibility to manage the memory usage of the total data inflow. This could become nasty:
A push-based stream going wrong…
data flow, every time data is pushed down the line, each stream will signal one node up-stream if it can handle more data. This means that when implementing streams, it is strongly advised to really stop pushing more data downstream, when the subsequent stream tells you so. Else, the stream’s internal buffer will fill up your heap memory, until something bad happens… So, let’s try to not to disregard this advice in our implementation!
Consider the two flow modes in the image above, displaying chained streams and data flowing from left to right. In both cases, it is clear that the input of data can be larger then the bottleneck (middle piece) can handle.
- In the first case, data is pushed without any respect for subsequent stream throughput. Therefore, internal stream buffers will quickly deplete and RAM will be used to account for the overflow. This can lead to a problem when the supply of input data remains high.
- In the second case, the reduced throughput of the bottleneck is communicated upstream, all the way up to the source. This results into a pull-based flow where possible RAM fill-up is eliminated. The upstream feedback is provided by the result of each push of data, signalling back-pressure: internal buffers are filled up.
Stream-combine as a data source
The module stream-combine
is basically created to merge time-based data from multiple sources. By looking at a common attribute (i.e.
), it can pause/play its sources in order to create a unified chronological stream. The algorithm that does all the number crunching and creates the objects to be pushed down the line, is like a conveyor belt
that can be started and stopped at will. The algorithm will be discussed in a follow-up post, whilst we will now focus on stream-combiner as a data source, thus implementing a Readable stream
As you can read in the API for stream implementers
, when extending a Readable stream, only the
function will have to be implemented. When the
function is being called by the subsequent Writable stream, you know that it is ready to receive some more data for processing. From that moment on, we will turn our conveyor belt on and use
to push our available data until the subsequent writable stream signals it has had enough. In the figure above, this signaling is designated as upstream feedback
. Simply put, when the
function returns a
, you are really helping by stopping the conveyor belt process and prevent buffering on the heap memory. If your implementation does not comply, data will accumulate in RAM, basically defying the whole purpose of pull-based data flow in the first place. This concept is illustrated in the image below:
When the writable stream pulls data from the internal buffer fast enough; no problem. When the speed of the writable stream stagnates, the internal buffer will alternately fill up and deplete and switch the conveyor belt on and off accordingly.
The implementation of a Readable stream for stream-combine as described in the API
, is shown in the code example below:
Now, let’s go over the code:
class StreamCombine extends Readable
StreamCombine is extended from
_busy property is set. This will tell if the conveyor belt is already running.
This is the obliged
_read() implementation. The switch
_busy will bounce continuous
_read() calls from the writable stream, while the conveyor belt (processing) is busy. Unless already running, it will be started again.
This function is basically a wrapper function to start the recursive and asynchronous
doSomeWork() function (start the conveyor belt). Once
newData is available, it is being pushed to the stream’s internal buffer by
push(). This returns a
pushMore flag, that tells us whether the internal buffer is full (the
highWaterMark is reached) or not. If
true → we can simply continue doing more work by calling
false → the subsequent stream did not yet request for more data. We will have to stop retrieving more data, unset the
_busy state and wait for the next
_read() to be called.
Due to the might and magic of the Node.js stream API, we have created a source stream, that will respect data throughput down the line by measuring back-pressure. Implementing the rules for a pull-based data flow leaves us with simple chaining of subsequent streams and provides us with a highly efficient data processing pipeline!