Streaming API - JoshuaWise/jellypromise GitHub Wiki

class PromiseStream

PromiseStreams are used to handle an aggregate of values (or promises of values) concurrently. Unlike many styles of streams, PromiseStream does not maintain the same sequence of items as they were supplied. It will output the items as soon as they are resolved.

PromiseStreams inherit from Promise (jellypromise). If an error occurs in a stream, the stream will be rejected, along with all streams that originate from it. If no error occurs, the stream will be fulfilled with undefined when all of its items have been passed on. However, some methods are available to change the fulfillment value of a stream (.merge(), .reduce()).

new PromiseStream(source)

The source used to construct a new PromiseStream must be some style of EventEmitter with the .addListener() and .removeListener() methods. The stream will listen for the data, end, and error events. The data event is used to push an item into the stream; this can be any type of data (or a promise of the data). The end event is used to indicate that no more data will be pushed into the stream. After the end event, the stream will continue processing the rest of its current data before fulfilling its promise interface. The error event is used to indicate an error. Errors will propagate down the pipeline to all streams that originated from this one.

var EventEmitter = require('events');
var PromiseStream = require('jellypromise').Stream;

var source = new EventEmitter;
new PromiseStream(source);

source.emit('data', 'foo');
source.emit('data', 'bar');
source.emit('end');

static PromiseStream.from(iterable) -> promiseStream

Constructs a new stream from an iterable object of promises or values (or a mix thereof).

If the stream gets limited by a concurrency control, it will not pull items from the iterable until it's able to process them. This is significant because if the iterable contains rejected promises, they'll log Unhandled rejections unless you invoke .catchLater() on them.

.map([concurrency], callback) -> promiseStream

Transforms the stream's data through the provided callback function, and passes the resulting data to a new stream. If the callback returns a promise, its value will be awaited before being passed on to the destination stream (which is returned by this method).

If a concurrency number is provided, only that many items will be processed at a time. The default is Infinity.

If a promise is rejected, or if callback throws an exception, processing will stop and the stream will be rejected with the same error.

callback has the following signature: function callback(value, index)

The index parameter is the item's original index from when it entered the first stream in the pipeline.

PromiseStream.from(['foo.txt', 'bar.txt'])
  .map(readFile)
  .drain(console.log);
// => "this is bar!"
// => "this is foo!"

.forEach([concurrency], callback) -> promiseStream

Similar to .map(), except the stream's data will not be changed. If the callback returns a promise, it will still be awaited, but it will not determine the data that is passed on to the destination stream. This method is primarily used for side effects.

callback has the following signature: function callback(value, index)

.filter([concurrency], callback) -> promiseStream

Similar to .forEach(), but the items will be filtered out by the provided callback function (just like Array#filter). Filtering will occur based on the truthiness of the callback's return value. If the callback returns a promise, its value will be awaited before being used in the filtering process.

callback has the following signature: function callback(value, index)

.takeUntil([concurrency], promise) -> promiseStream

Forwards the stream's data to a new stream that is returned by this method. The new stream will continue to pull data until the given promise is fulfilled. If the promise is rejected, the stream itself will also be rejected with the same error.

console.log('You only have 5 seconds to type!');
new PromiseStream(process.stdin)
  .takeUntil(Promise.resolve().delay(5000))
  .reduce(combineInput)
  .then(processInput);

It's important to note that after the given promise is fulfilled and the stream stops, any rejected promises pushed into the stream will be ignored.

.reduce(callback, [initialValue]) -> this

Applies the callback function against an accumulator and each piece of data in the stream. This method causes the stream's fulfillment value to be the final result of the reduction. If no initialValue is provided and the stream only receives one item, that item will become the fulfillment value without invoking the callback function. If no initialValue is provided and the stream receives no items, the stream will be fulfilled with undefined.

If the initialValue is a promise, its value will be awaited before starting the reduction process. If the callback returns a promise, it will be awaited before processing the next piece of data against the accumulator. Keep in mind that the callback function will receive the data in the order that stream receives it.

callback has the signature: function callback(accumulator, value, index, shortcut)

PromiseStream.from(['Jonathan', 'Robert', 'Jennifer'])
  .map(getNickname)
  .reduce(function (a, b) {return a + ', ' + b})
  .log();
// => "Jen, John, Rob"

By invoking shortcut(value), the reduction process will be ended early, the callback will no longer be invoked, and the stream will be fulfilled with value. If value is a promise, its value will be awaited before fulfilling the stream.

var stream = PromiseStream.from(['foo', 'bar', 'baz', 'quux'])

function hasBar(stream) {
  return stream.reduce(function (x, value, i, shortcut) {
    if (value === 'bar') {return shortcut(true)}
    return false
  }, false)
}

hasBar(stream).log()
// => true

It's important to note that after using shortcut() to stop the stream, any rejected promises pushed into the stream will be ignored.

.merge() -> this

Constructs an array containing all data from the stream. When all data has been received, the stream will fulfill with that array. The items in the array will be ordered by their original order as they were when they entered the first stream in the pipeline.

PromiseStream.from(['a', 'b', 'c'])
  .forEach(delayByRandomAmount)
  .forEach(console.log)
  .merge()
  .log();
// => b 1
// => c 2
// => a 0
// => ["a", "b", "c"]

.drain([[concurrency], sink]) -> this

PromiseStreams cannot be fulfilled until they have a destination. Some destinations are new streams (such as in .map()). Other destinations are single values made available by the stream's fulfillment (.merge(), .reduce()).

The .drain() method provides the simplest destination, either discarding each item or relinquishing them to the given sink.

new PromiseStream(infiniteSource)
  .forEach(processData)
  .drain();

If a callback is provided, it has the following signature: function callback(value, index)

new PromiseStream(infiniteSource)
  .map(transformData)
  .forEach(processData)
  .drain(function (value, index) {console.log('processed entry #' + index);});

A sink can also be an EventEmitter (or any similar object with an .emit() method).

If an event emitter is provided, it will receive data events, as well as the end or error event depending on whether the stream becomes fulfilled or rejected.

You can use event emitters to effectively "tee" a stream into two streams.

var sink = new EventEmitter;
var sourceStream = getStream().drain(sink);
var destination1 = new PromiseStream(sink);
var destination2 = new PromiseStream(sink);

Ordered Streams

If you need streams to process their data in order, just set the concurrency control on each stream to 1.

new PromiseStream(source)
  .takeUntil(1, someEvent)
  .filter(1, sanitizeData)
  .map(1, processData)
  .forEach(1, saveData)
  .drain(1, console.log);

Some PromiseStream methods don't have concurrency control (.merge(), .reduce()). But don't worry, these methods will maintain order automatically.