mix stdlib stream - remixlabs/docs-public GitHub Wiki

Standard library - stream

The module (the type stream(S) is the stream of element type S):

module stream
  // Basics:
  def empty : stream(S)
  def singleton : S -> stream(S)
  def isEmpty : stream(S) -> bool
  def isNotEmpty : stream(S) -> bool
  def length : stream(S) -> number
  // Mapping and filtering:
  def map : (S -> T) -> stream(S) -> stream(T)
  def map2 : (S -> T -> U) -> stream(S) -> stream(T) -> stream(U)                   // since PR#769
  def indexedMap : (number -> S -> T) -> stream(S) -> stream(T)
  def indexedMap2 : (number -> S -> T -> U) -> stream(S) -> stream(T) -> stream(U)  // since PR#769
  def indexOnlyMap : (number -> T) -> stream(S) -> stream(T)
  def flatMap : (S -> stream(T)) -> stream(S) -> stream(T)
  def zip : stream(S) -> stream(T) -> stream([S,T])                                 // since PR#769
  def filter : (S -> bool) -> stream(S) -> stream(S)
  def deduplicateKeepFirst : (S -> string) -> stream(S) -> stream(S)                // since PR#775
  def deduplicateKeepLast : (S -> string) -> stream(S) -> stream(S)                 // since PR#775
  // Reducing:
  def reduce : (S -> T -> T) -> T -> stream(S) -> T
  def indexedReduce : (number -> S -> T -> T) -> T -> stream(S) -> T
  def lreduce :  (S -> T -> T) -> (T -> stream(U)) -> (T -> stream(U)) -> T -> stream(S) -> stream(U)
  // Result chaining:
  def andThen : (T -> result(S, U)) ->   // since PR1353
                result(S, stream(T)) ->
                result(S, stream(U))
  // Min/max reductions:
  def minIndex : stream(data) -> number                 // since PR#695
  def minValue : stream(data) -> A                      // since PR#695
  def minIndexBy : (A -> B@data) -> stream(A) -> number // since PR#695
  def minValueBy : (A -> B@data) -> stream(A) -> A      // since PR#695
  def maxIndex : stream(A) -> number                    // since PR#695
  def maxValue : stream(A) -> A                         // since PR#695
  def maxIndexBy : (A -> B@data) -> stream(A) -> number // since PR#695
  def maxValueBy : (A -> B@data) -> stream(A) -> A      // since PR#695
  // Construction:
  def lazy : (null -> stream(T)) -> stream(T)           // since PR#769
  def initialize : number -> (number -> S) -> stream(S)
  def initUnbound : (number -> T) -> stream(T)          // since PR#769
  def enumerate : number -> stream(number)
  def enumUnbound : null -> stream(number)              // since PR#769
  def rev : stream(S) -> stream(S)
  // Permutations:
  def permuteEnum : number -> stream(array(number))     // since PR#769
  def permuteArray : array(S) -> stream(array(S))       // since PR#769
  // Other iterations:
  def force : stream(S) -> stream(S)
  def iter : (S -> null) -> stream(S) -> null
  def exists : (S -> bool) -> stream(S) -> bool
  def forAll : (S -> bool) -> stream(S) -> bool         // since PR#1822
  def contains : S -> stream(S) -> bool
  def find : (S -> bool) -> stream(S) -> S                        // since PR#379
  def findMapped : (S -> T) -> stream(S) -> T                     // since PR#379
  def findMapped2 : (S -> T -> U) -> stream(S) -> stream(T) -> U  // since PR#379
  // Special elements, slicing:
  def first : stream(S) -> S
  def firstWithDefault : S -> stream(S) -> S
  def last : stream(S) -> S
  def lastWithDefault : S -> stream(S) -> S
  def skip : stream(S) -> stream(S)
  def skipn : number -> stream(S) -> stream(S)
  def firstn : number -> stream(S) -> stream(S)
  def lastn : number -> stream(S) -> stream(S)
  // Sort and merge:
  def grouped : (S -> data) -> stream(S) -> stream(array(S))
  def merge : (S -> data) -> (S -> data) -> stream(S) -> stream(S) -> stream(S)
  def sort : (S -> data) -> stream(S) -> stream(S)
  def sort_ci : (S -> string) -> stream(S) -> stream(S)                          // since PR#1821
  def dirSort : bool -> (S -> data) -> stream(S) -> stream(S)                    // since PR#769
  def dirSort_ci : bool -> (S -> string) -> stream(S) -> stream(S)               // since PR#1821
  def sortBy : (A -> A -> bool) -> stream(A) -> stream(A)                        // since PR#1821
  // Relations:
  def innerJoin : (S -> string) -> stream(S) ->
                  (T -> string) -> stream(T) -> (S -> T -> U) -> stream(U)  // since PR#769
  // Conversions:
  def toArray : stream(S) -> array(S)
  // Tee:
  type tee(S)                           // since PR#769
  def tee : stream(S) -> tee(S)         // since PR#769
  def leftOut : tee(S) -> stream(S)     // since PR#769
  def rightOut : tee(S) -> stream(S)    // since PR#769
module end

Basics

  def empty : stream(S)
  def singleton : S -> stream(S)
  def isEmpty : stream(S) -> bool
  def isNotEmpty : stream(S) -> bool
  def length : stream(S) -> number

empty is the empty stream.

singleton(x) returns a stream with one element, x.

isEmpty tests whether the stream is already empty. The stream is not mutated. isNotEmpty is the negated test.

length destructively counts the number of elements in the stream.

You can convert any array into a stream with the a[] operator, e.g. [1,2,3][] is the stream returning 1, 2, and 3. You can also take a slice of the array:

  • a[p..]: the stream of the elements with index p and up
  • a[..q]: the stream of the elements up to index q
  • a[p..q]: the stream of the elements from p to q (inclusively)

Streams can be concatenated with the ++ operator: s ++ t is a stream first consuming from s until the end, and then consuming from t until the end.

It is not permitted to add undefined to a stream.

Mapping and filtering

Like all stream operations, mapping and filtering consume the input streams!

  def map : (S -> T) -> stream(S) -> stream(T)
  def map2 : (S -> T -> U) -> stream(S) -> stream(T) -> stream(U)                   // since PR#769
  def indexedMap : (number -> S -> T) -> stream(S) -> stream(T)
  def indexedMap2 : (number -> S -> T -> U) -> stream(S) -> stream(T) -> stream(U)  // since PR#769
  def indexOnlyMap : (number -> T) -> stream(S) -> stream(T)
  def flatMap : (S -> stream(T)) -> stream(S) -> stream(T)
  def zip : stream(S) -> stream(T) -> stream([S,T])                                 // since PR#769
  def filter : (S -> bool) -> stream(S) -> stream(S)
  def deduplicateKeepFirst : (S -> string) -> stream(S) -> stream(S)                // since PR#775
  def deduplicateKeepLast : (S -> string) -> stream(S) -> stream(S)                 // since PR#775

map returns a new stream where the elements are mapped by calling the argument function. The mapped stream is lazily constructed as far needed, and the input stream is only consumed as far needed. For example:

let s1 = [1,2,3,4][];
let s2 = stream.map(k -> k+1, s1);
let x1 = stream.first(s2);
// now x1==2, and s1 still contains 2,3,4.

map2 maps two streams of the same length by applying a function to their elements, and returns a new stream. The mapped stream is lazily constructed as far needed, and the input streams are only consumed as far needed. It is considered an error when not both inputs reach their end at the same time. For example:

let s1 = [1,2,3,4][];
let s2 = [10,20,30,40][];
let s3 = stream.map((x,y) -> 2*(x+y), s1, s2);
let x1 = stream.first(s3);
// now x1==22, and s1 still contains 2,3,4, and s2 still contains 20,30,40.

indexedMap maps one input like map, but additionally also passes the index of the elements (0 <= k < length(s)).

indexedMap2 maps two inputs like map2, but additionally also passes the index of the elements (0 <= k < length(s)).

indexOnlyMap only passes the index but not the value of the element.

flatMap concatenates the streams returned by the argument function, e.g.

def sample =
  (s -> stream.flatMap(x -> [x,x][], s))

Now, sample([1,2][]) returns the stream 1, 1, 2, 2.

zip takes two input streams of the same length, and constructs an output stream with pairs, where the left element is taken from the first input, and the right element is taken from the second input. The input streams are only consumed as far needed for the output. It is an error when not both inputs reach the end at the same time. Example:

let s1 = [1,2,3,4][];
let s2 = [10,20,30,40][];
let s3 = stream.zip(s1, s2);
let x1 = stream.first(s3);
// now x1==[1,10], and s1 still contains 2,3,4, and s2 still contains 20,30,40.

filter returns a new stream that only contains the elements of the input stream where the argument function returns true. The input stream is only as far consumed as needed for constructing the output, for example:

let s1 = [ "a", "abc", "def", "ghij" ][];
let s2 = stream.filter(x -> string.length(x) == 3, s);
stream.first(s2)
  // returns "abc". s1 still contains "def" and "ghij".

deduplicateKeepFirst and deduplicateKeepLast remove duplicate elements from a stream. A "duplicate" is here a stream element where a projection returns the same string as a previous or later element. deduplicateKeepFirst leaves then the first element in the stream and removes the other ones. Also, deduplicateKeepFirst consumes the input stream only as far needed. In contrast, deduplicateKeepLast leaves the last element and removes the preceding ones, and also consumes the whole stream. For example:

let obj1 = { name:"Paul", age: 34 };
let obj2 = { name:"Ethan", age: 67 };
let obj3 = { name:"Tom", age: 34 };
let s1 = [ obj1, obj2, obj3 ][];
let s2 = stream.deduplicateKeepLast(obj -> "" + obj.age, l)
  // returns a stream with [ obj2, obj3 ]. s1 is immediately consumed.

Reducing

Like all stream operations, reductions consume the input streams!

  def reduce : (S -> T -> T) -> T -> stream(S) -> T
  def indexedReduce : (number -> S -> T -> T) -> T -> stream(S) -> T
  def lreduce :  (S -> T -> T) -> (T -> stream(U)) -> (T -> stream(U)) -> T -> stream(S) -> stream(U)

reduce iterates over the stream and computes

reduce(f,acc0,[x0,x1,x2,...,xLast][]) =
  f( ... f(f(f(acc0,x0),x1),x2) ..., xLast)

indexedReduce additionally passes the index of the stream element (starting with 0) to f.

lreduce is a lazy version of reduce that outputs a stream and that only runs as far as needed: lreduce(f,g,h,acc0,s) reduces the stream s by repeatedly calling f on the next element of the stream and the so-far accumulated value (which is initialized to acc0). After every f invocation, the function g is called with the current value of the accumulator, and the stream from g is emitted. At the very end, the function h is called with the final value of the accumulator, and the stream from h is also emitted.

Result chaining

  def andThen : (T -> result(S, U)) ->   // since PR1353
                result(S, stream(T)) ->
                result(S, stream(U))

stream.andThen(f,res)

  • returns res when it is already error
  • otherwise, maps f over the elements of the input stream. If f returns error this is the final result, but if an ok value is returned, it is put into the output stream

Note that stream.andThen consumes the input stream until it either finds an error token or the end of the stream.

Min/max reductions

  def minIndex : stream(data) -> number                 // since PR#695
  def minValue : stream(data) -> A                      // since PR#695
  def minIndexBy : (A -> B@data) -> stream(A) -> number // since PR#695
  def minValueBy : (A -> B@data) -> stream(A) -> A      // since PR#695
  def maxIndex : stream(A) -> number                    // since PR#695
  def maxValue : stream(A) -> A                         // since PR#695
  def maxIndexBy : (A -> B@data) -> stream(A) -> number // since PR#695
  def maxValueBy : (A -> B@data) -> stream(A) -> A      // since PR#695

These functions iterate over a stream, and return either the index of the minimum/maximum, or return the value of the minimum/maximum:

  • minIndex: returns the index of the minimum
  • minValue: returns the value of the minimum
  • maxIndex: returns the index of the maximum
  • maxValue: returns the value of the maximum

The variants with a name ending in "By" do not take the minimum/maximum of the stream elements directly, but first after mapping the elements with a function.

If several elements have the desired minimum/maximum, the functions returning an index will return the index of the first occurrence.

Construction

  def lazy : (null -> stream(T)) -> stream(T)           // since PR#769
  def initialize : number -> (number -> S) -> stream(S)
  def initUnbound : (number -> T) -> stream(T)          // since PR#769
  def enumerate : number -> stream(number)
  def enumUnbound : null -> stream(number)              // since PR#769
  def rev : stream(S) -> stream(S)

lazy(f) is a stream that is first constructed when needed. If an element is tried to be consumed from the lazy stream, the function f() is called to construct it. This is useful for unbounded recursions, e.g.

def gen : number -> stream(number)
def gen(k) = stream.singleton(k) ++ stream.lazy(_ -> gen(k+1))

Here, without lazy the recursion would immediately run infinitely. With lazy, however, the recursion is only run as often as stream elements are consumed from the output.

initialize creates a stream with n elements, and calls the function to get the value for an index. For example, initialize(5, k -> k*k) returns the stream 0, 1, 4, 9, 16. initialize constructs the stream only as far needed by the consumer.

initUnbound is like initialize but without upper bound on the number of elements. The stream has infinite length.

enumerate creates a stream with n elements where the k-th index is set to k. For example, enumerate(5) returns the stream 0, 1, 2, 3, 4. enumerate constructs the stream only as far needed by the consumer.

enumUnbound is like enumerate but without upper bound on the number of elements. The stream has infinite length.

rev reverses the order of the elements (and consumes the input).

Permutations

  def permuteEnum : number -> stream(array(number))     // since PR#769
  def permuteArray : array(S) -> stream(array(S))       // since PR#769

permuteEnum(n) returns the stream of all permutations of the array [0, 1, ..., n-1]. The stream is only constructed as far as needed. Warning: The number of permutations grows very quickly!

permuteArray(a) returns the stream of all permutations of the array a.

Example:

stream.permuteEnum(3) |> stream.toArray
// returns [0,1,2],[1,0,2],[0,2,1],[1,2,0],[2,0,1],[2,1,0](/remixlabs/docs-public/wiki/0,1,2],[1,0,2],[0,2,1],[1,2,0],[2,0,1],[2,1,0)

Other iterations

Like all stream operations, these iterations consume the input streams!

  def force : stream(S) -> stream(S)
  def iter : (S -> null) -> stream(S) -> null
  def exists : (S -> bool) -> stream(S) -> bool
  def forAll : (S -> bool) -> stream(S) -> bool         // since PR#1822
  def contains : S -> stream(S) -> bool
  def find : (S -> bool) -> stream(S) -> S                        // since PR#379
  def findMapped : (S -> T) -> stream(S) -> T                     // since PR#379
  def findMapped2 : (S -> T -> U) -> stream(S) -> stream(T) -> U  // since PR#379

force ensures that a stream is completely pulled from its source, and that all elements are put into the internal buffer.

iter iterates over the stream, and calls the function for each element. The result is discarded.

exists returns true when the function returns true for any of the stream elements. exists stops consuming the input when the true element is found. It returns false when the whole stream was consumed and the function always returned false or undefined.

forAll returns true when the function returns true for all of the stream elements. forAll stops consuming the input when the false or the undefined element is found, and returns then false. Otherwise, once it consumed the whole stream and only found true, the return value is true.

contains returns true when the element is contained in the stream. contains stops consuming the input when the searched element is found.

find(f,s) iterates over the stream, and returns the first element x where f(x) is true. If there is no such element, undefined is returned instead.

findMapped(f,s) returns the value y = f(x) for the first element x of the stream where y is defined. If there is no such value, undefined is returned instead.

findMapped2(f,s1,s2)returns the value y = f(x1,x2) for the first elements x1 of the stream s1 and x2 of the stream s2 where y is defined. x1 and x2 are taken from the same position relative to the beginning of the streams (e.g. both the 8th-element of their respective stream). If there is no such value, undefined is returned instead.

Special elements, slicing

  def first : stream(S) -> S
  def firstWithDefault : S -> stream(S) -> S
  def last : stream(S) -> S
  def lastWithDefault : S -> stream(S) -> S
  def skip : stream(S) -> stream(S)
  def skipn : number -> stream(S) -> stream(S)
  def firstn : number -> stream(S) -> stream(S)
  def lastn : number -> stream(S) -> stream(S)

first is the first element of the stream (or a runtime error for the empty stream). The first element is consumed.

firstWithDefault returns the first element, or if the stream is empty, the substitute. The first element, if any, is consumed.

last is the last element of the stream (or a runtime error for the empty stream). The input stream is completely consumed.

lastWithDefault returns the last element, or if the stream is empty, the substitute. The input stream is completely consumed.

skip consumes one element (so far existing) from the stream, and returns the mutated input stream.

skipn consumes the first n elements (so far existing) from the stream, and returns the mutated input stream.

firstn consumes the first n elements (so far existing) from the input, and returns a new stream only containing these elements.

lastn consumes the complete input stream, and returns a new stream that contains the last n elements of the input (so far existing).

Sort and merge

Like all stream operations, these functions consume the input streams!

  def grouped : (S -> data) -> stream(S) -> stream(array(S))
  def merge : (S -> data) -> (S -> data) -> stream(S) -> stream(S) -> stream(S)
  def sort : (S -> data) -> stream(S) -> stream(S)
  def sort_ci : (S -> string) -> stream(S) -> stream(S)                          // since PR#1821
  def dirSort : bool -> (S -> data) -> stream(S) -> stream(S)                    // since PR#769
  def dirSort_ci : bool -> (S -> string) -> stream(S) -> stream(S)               // since PR#1821
  def sortBy : (A -> A -> bool) -> stream(A) -> stream(A)                        // since PR#1821

grouped(f,s) splits s into groups of adjacent similar elements. x and y are similar when f(x) == f(y). For example, grouped(string.length, ["a", "b", "ab1", "abcd", "ghfd"][]) returns the stream ["a","b"], ["ab1"], ["abcd","ghfd"].

merge(pleft,pright,left,right) merges two streams left and right so that when both streams are sorted, the result is the sorted merged stream. Whether the next element is taken from left or right depends on the result of pleft(x_left) and pright(x_right). If pleft(x_left) < pright(x_right), the element from the left x_left is taken. If pright(x_right) < pleft(x_left), the element from the right x_right is taken. In case of equality, both elements are taken (in any order). If one of pleft(x_left) or pright(x_right) is undefined, the corresponding element x_left or x_right, resp., counts as smaller (unless both are undefined).

sort(f,a) sorts the stream by f and returns the sorted stream.

sort_ci(f,a) sorts the stream by the string result of f in a case-insensitve way, and returns the sorted stream.

dirSort(r,f,a) sorts the stream by f in ascending or descending direction and returns the sorted stream. If r is true, it is descending, otherwise ascending.

dirSort_ci is the case-insensitive variant.

sortBy(f,a) sorts the stream a by the greater-than relation f, and returns a new sorted stream.

Relations

  def innerJoin : (S -> string) -> stream(S) ->
                  (T -> string) -> stream(T) -> (S -> T -> U) -> stream(U)  // since PR#769

The function innerJoin(kl,sl,kr,sr,proj) computes the inner join between the stream of records sl and the stream of records sr, and returns the projected result. In particular, it relates the rows xl from sl with the rows xr from sr where kl(xl) == kr(xr) and returns the stream of the elements proj(xl,xr).

The functions kl and kr must returns strings or undefined. In the latter case, the row is ignored.

Example: Let's assume we have data about persons and rooms, and want to derive data about which phone extension a person has (which depends on the office):

def persons =
  [ { name:"John", room:"101" },
    { name:"Greg", room:"102" },
    { name:"Brenda", room:"102" },    // shares room with Greg
    { name:"Betty", room:"201" }
  ]
def rooms =
  [ { room:"101", extension:"3453" },
    { room:"102", extension:"5612" },
    { room:"103", extension:"0192" }
    // room 201 doesn't have a phone
  ]

Now:

def person_extension =
  stream.innerJoin(person -> person.room, persons[],
                   room -> room.room, rooms[],
                   (person,room) -> { name:person.name, extension:room.extension })[]

This would return:

[ { name:"John", extension:"3453" },
  { name:"Greg", extension:"5612" },
  { name:"Brenda", extension:"5612" }
]

Betty doesn't occur because an inner join implicitly deletes rows without match (we'd need a left outer join for this - TODO).

Performance: The function innerJoin memoizes the contents of the left stream completely. The right stream is processed element by element, and for every match outputs are emitted.

If the left stream is joined several times with different streams, it is also possible to partially evaluate the function:

let join = stream.innerJoin(kl,sl);
let r1 = join(kr1,sr1,p1);
let r2 = join(kr2,sr2,p2);

This way, the memoization of sl is only done once.

Conversions

  def toArray : stream(S) -> array(S)

toArray consumes the input and returns an array with the elements of the input stream.

Tee

  type tee(S)                           // since PR#769
  def tee : stream(S) -> tee(S)         // since PR#769
  def leftOut : tee(S) -> stream(S)     // since PR#769
  def rightOut : tee(S) -> stream(S)    // since PR#769

A tee creates two output streams - called the left and the right stream - from a single input stream. While consuming the outputs, the input is consumed as far needed. The tee provides an internal buffer to deal with the problem that the outputs can be consumed with different speed.

Example:

let s1 = [1,2,3,4,5][];
let t = stream.tee(s1);
let l = stream.leftOut(t);
let r = stream.rightOut(t);
let x1 = stream.first(l);  // x1 == 1
let x2 = stream.first(l);  // x2 == 2
let y1 = stream.first(r);  // y1 == 1
// now s1 still contains 3,4,5.

Since PR#1539, it is allowed that the consumers of the output streams are run by different coroutines.