API Ideas - Gozala/streamer GitHub Wiki


/*jshint asi: true */

var fibs = Stream(0, Stream(1, function rest() {
  return fibs.zip(fibs.drop(1)).map(function($) {
    return $[0] + $[1]
  })
})).lazy()


var fibs =
(streamer.begin)
(streamer.lazy, 0, 1, (function rest(fibs) {
  return (streamer.begin)
         (streamer.zip, fibs, streamer.tail(fibs))
         (streamer.map, function sum(a, b) { return a + b })
         (streamer.end)
}))
(streamer.end)

var fibs = Stream(0, Stream(1, function rest() {
  return streamer.chain(fibs).tail().zip(fibs).map(function($) {
    return $[0] + $[1]
  }).value()
})).lazy()

// # FS wrapper for streamer

var binding = process.binding('fs')
var streamer = require('streamer/core')

function opener(path, options) {
  options = options || {}
  var flags = options.flags || 'r'
  var mode =  options.mode || '0666'
  return Stream(function() {
    var deferred = Stream.defer()
    fs.open(path, flags, mode, deferred.callback)
    return deferred.promise
  })
}

function reader(fd, options) {
  options = options || {}
  var size = options.size || 64 * 1024
  var start = options.start || 0
  var end = options.end || Infinity
  return end <= start ? streamer.empty : Stream(function() {
    var buffer = new Buffer(size)
    var deferred = Stream.defer()
    binding.read(fd, buffer, 0, size, start, function(error, count) {
      if (error) deferred.reject(error)
      else deferred.resolve(Stream(buffer.slice(0, count), reader(fd, {
        size: size, start: start + count, end: end
      })))
    })
    return deferred.promise
  })
}

fs.read = (function read(path, options) {
  options = options || {}
  options.flags = options.flags || 'r'
  var encoding = options.encoding || 'raw'
  return ((streamer.run)
  (opener, path, options)
  (streamer.map, function(fd) {
    return ((streamer.run)
    (reader, fd, options)
    (streamer.map, decoder(encoding))
    (streamer.correct, function(error) {
      return ((streamer.run)
        (streamer.closer, fd)
        (streamer.append, streamer.raise(error)))
      }))
  })
  (streamer.flatten))
})