Recipe: streaming basics - uhop/stream-json GitHub Wiki

Problem

You have a large JSON file and want to process it without loading everything into memory. This recipe covers the three most common shapes: a top-level array, a top-level object, and JSONL (line-delimited JSON).

Array of objects

The most common case. The file contains [obj1, obj2, ...] and you want each object one at a time.

Event-based

const fs = require('node:fs');
const chain = require('stream-chain');
const streamArray = require('stream-json/streamers/stream-array.js');

const pipeline = chain([fs.createReadStream('users.json'), streamArray.withParser()]);

pipeline.on('data', ({key, value}) => {
  // key = array index (0, 1, 2, ...)
  // value = parsed JavaScript object
  console.log(`User #${key}:`, value.name);
});

pipeline.on('end', () => console.log('done'));

Async iteration

Node.js streams are async-iterable, so you can use for await:

const fs = require('node:fs');
const chain = require('stream-chain');
const streamArray = require('stream-json/streamers/stream-array.js');

const pipeline = chain([fs.createReadStream('users.json'), streamArray.withParser()]);

for await (const {key, value} of pipeline) {
  console.log(`User #${key}:`, value.name);
}

This is the closest equivalent to Python's ijson.items() pattern.

Picking a nested array

If the array is not at the top level — for example {"results": [...]} — add Pick between the parser and the streamer. Filters operate on tokens, so they must sit between the parser (text → tokens) and the streamer (tokens → objects):

const fs = require('node:fs');
const chain = require('stream-chain');
const {pick} = require('stream-json/filters/pick.js');
const streamArray = require('stream-json/streamers/stream-array.js');

// text → parser (injected by pick.withParser) → pick → streamArray → objects
const pipeline = chain([fs.createReadStream('response.json'), pick.withParser({filter: 'results'}), streamArray()]);

for await (const {key, value} of pipeline) {
  console.log(value);
}

The pick.withParser() helper injects a parser before the filter, so the pipeline can consume text directly. The equivalent long form is:

const {parser} = require('stream-json/parser.js');

const pipeline = chain([fs.createReadStream('response.json'), parser(), pick({filter: 'results'}), streamArray()]);

Object with many keys

The file contains {"key1": val1, "key2": val2, ...} and you want each property one at a time.

const fs = require('node:fs');
const chain = require('stream-chain');
const streamObject = require('stream-json/streamers/stream-object.js');

const pipeline = chain([fs.createReadStream('config.json'), streamObject.withParser()]);

for await (const {key, value} of pipeline) {
  // key = property name (string)
  // value = parsed value
  console.log(`${key}:`, value);
}

JSONL (line-delimited JSON)

The file contains one JSON value per line — common for log files, database exports, and data pipelines.

const fs = require('node:fs');
const jsonlParser = require('stream-chain/jsonl/parserStream.js');

const pipeline = fs.createReadStream('events.jsonl').pipe(jsonlParser());

for await (const {key, value} of pipeline) {
  // key = line index (0, 1, 2, ...)
  // value = parsed object
  console.log(value);
}

If you need custom error handling for malformed lines, use the stream-json wrapper instead:

const jsonlParser = require('stream-json/jsonl/parser.js');

const pipeline = fs.createReadStream('events.jsonl').pipe(jsonlParser.asStream({errorIndicator: null}));

for await (const {key, value} of pipeline) {
  // malformed lines are silently skipped (errorIndicator: null)
  console.log(value);
}

Writing results back

To save processed data as JSONL:

const fs = require('node:fs');
const chain = require('stream-chain');
const streamArray = require('stream-json/streamers/stream-array.js');
const jsonlStringer = require('stream-chain/jsonl/stringerStream.js');

chain([fs.createReadStream('input.json'), streamArray.withParser(), ({value}) => value, jsonlStringer(), fs.createWriteStream('output.jsonl')]);

Tips

  • Error handling. Always listen for 'error' events on the pipeline, or wrap for await in a try/catch. Malformed JSON will emit an error.
  • Backpressure. chain() handles backpressure automatically. If your processing is slower than reading, the file stream will pause.
  • Filtering. Add Pick, Ignore, or Replace to the pipeline to select or transform parts of the stream before assembling objects.
  • Performance. See Performance for tuning tips. For JSONL, the dedicated parser is significantly faster than parser({jsonStreaming: true}) + streamValues().