Problem: You have a program you need to shell out to, and want an Observable of its lines - deanrad/rx-helper GitHub Wiki

Get an Observable of another Process' Output lines

Because of UNIX, as long as you can consume another process' stdout, you can effectively use process-spawning as a means to do inter-language interop, or to parallelize your application.

The following will work in Node to:

  1. Spawn a process when subscribed..
  2. .. provide callbacks for lines
  3. .. terminate the process when unsubscribed.
var spawn = require('child_process').spawn;
const { Observable, timer } = require('rxjs')

// fromProcess('sh', ['-c', 'echo 1; sleep 1; echo 2; sleep 1; echo 3;'])
function fromProcess(exe, args) {
  return new Observable(notify => {
    var child = spawn(exe, args);
    child.stdout.setEncoding('utf8')
    child.stdout.on('data', function(data) {
        const noNewline = data.replace(/\s$/, '');
        notify.next(noNewline);
        // 2.1 Uncomment to see how our cancellation function is needed
        // console.log('got data')
    });
    child.on('close', function(code) {
        if (!code) {
          notify.complete();
          return
        }
        notify.error(code)
    });
    
    // 2.2 Comment to fail to cancel
    return () => child.kill()
})
}

You would use it like the following:

let listing = fromProcess('sh', ['-c', 'echo 1; sleep 1; echo 2; sleep 1; echo 3;'])
let sub = listing.subscribe(line => console.log(line), e => console.error(e), () => console.log('All done!')); null

And you could exit the process early with:

// No completion or error callbacks will fire
timer(1500).toPromise().then(() => sub.unsubscribe())