201611 SICP系列 第7讲 流计算模型 学习笔记 (五) - xiaoxianfaye/Learning GitHub Wiki

7 Stream

7.26 Formulating sqrt iteration as stream process

迭代过程是最早介绍的,它把所有的状态都保存在迭代过程的参数中,是显式化的。现在来看一下,怎么用流的思想把原来的迭代过程表示出来。

用Alexander方法求平方根。不把它看作随时间相关的不断更新的变量,把它表示成一个无限的流,这个流上的每一个元素都是对平方根的不断逼近。

sq_stream(N) ->
    cons_stream(1.0,
                fun() ->
                    map_stream(
                        fun(E) ->
                            (E + N / E) / 2
                        end,
                        sq_stream(N))
                end).

定义一个sq_stream。用Alexander方法求平方根从1.0开始,所以把1.0作为流的初始值。仍然是在流上操作,对流上的每一个元素做一个映射转换操作,当前猜测值和N除以猜测值取平均作为下一个猜测值。

sqrt(N, Toler) ->
    stream_limit(sq_stream(N), Toler).

当流中两个元素差值的绝对值小于Tolerance时,就把当前值作为N的平方根返回。这是一个无限流,怎么取出一个满足要求元素的呢?stream_limit。从流中取元素,两个两个的比较。

stream_limit(S, Toler) ->
    stream_limit(head(S), tail(S), Toler).

stream_limit(Pre, S, Toler) ->
    Next = head(S),
    if
        abs(Pre - Next) =< Toler ->
            Next;
        true ->
            stream_limit(Next, tail(S), Toler)
    end.

测试代码:

test_sqrt() ->
    true = abs(1.414 - sqrt(2, 0.001)) < 0.001,
    true = abs(3 - sqrt(9, 0.001)) < 0.001,

    test_sqrt_ok.

构建的是一个无限流,只是存在,真正需要它计算的时候,才会去计算。

7.27 Stream of pairs

在跟时间相关的过程中,我们要考虑时间、事件的顺序、同步、如何改变状态。在流中,我们也有需要去考虑的问题。

7.27.1 Stream of pairs constructed from an infinite stream

下面来看一个例子。

Stream of pairs

给定一个无穷的正整数流,产生出整数对(i, j)流,整数对满足i <= j。

%% Stream of pairs from an infinite stream
pair(S) ->
    flatmap(
        fun(J) ->
            map_stream(
                fun(I) ->
                    {I, J}
                end,
                enumerate_interval(1, J))
        end, S).

运行起来以后卡住了。

Stream of pairs exec result 1

为什么会卡住?看一下flatmap的代码。

flatten(StOfSt) ->
    acc_stream(fun append_stream/2, the_empty_stream(), StOfSt).

flatmap(P, S) ->
    flatten(map_stream(P, S)).

acc_stream(Proc, A, S) ->
    case is_empty_stream(S) of
        true ->
            A;
        false ->
            Proc(head(S), acc_stream(Proc, A, tail(S)))
    end.

flatmap调用了flatten,flatten中用到了acc_stream,acc_stream中用到的Proc是append_stream。如果flatten接收的StOfSt本身是一个无限流的话,就有问题了。所以针对无限流,目前的flatmap是不能满足需要的。

7.27.2 Improved version of flatten

写一个flatten的改进版本,把无限流拉平成一个流,而这个流本身也是一个无限流。

flatten(StOfSt) ->
    case is_empty_stream(StOfSt) of
        true ->
            the_empty_stream();
        false ->
            flatten(head(StOfSt), tail(StOfSt))
    end.

flatten(Hs, StOfSt) ->
    case is_empty_stream(Hs) of
        true ->
            flatten(StOfSt);
        false ->
            cons_stream(head(Hs),
                        fun() ->
                            flatten(tail(Hs), StOfSt)
                        end)
    end.

这样写出来的版本,不管flatten接收的是不是无限流,都是可以的。

7.27.3 Printing integer pairs

老师写了一个print_stream,和我之前自己写的print_stream_limit其实是一样的。

print_stream(_S, 0) -> ok;
print_stream(S, N) ->
    case is_empty_stream(S) of
        true ->
            io:format("Done~n");
        false ->
            io:format("~p ", [head(S)]),
            print_stream(tail(S), N - 1)
    end.

运行结果:

Stream of pairs exec result 2

测试代码:

test_pair() ->
    [{1, 1}, 
     {1, 2}, {2, 2}, 
     {1, 3}, {2, 3}, {3, 3}] = collect_stream_limit(6, pair(integers())),

    test_pair_ok.

7.27.4 Stream of pairs constructed from two infinite streams

将问题更推进一步。

刚才是一个无穷的流,从流中取出来的元素决定了要构建这个pair的第二个元素所在的区间。现在我们有两个无限流S和T,希望产生如图所示的pair。

Stream of pairs 2

如果已经看到某个S了,要把这个S对以后的T能够产生的结果用一种累积的方式保存起来,后面碰到T的时候,把前面已经产生的元素追加到T上,然后再处理目前的元素。按照这种方式,pair流产生的顺序应该是{S0, T0}、{S0, T1}、{S1, T1}、{S0, T2}、{S1, T2}、{S2, T2}……。

%% Stream of pairs from two infinite streams
pair(S, T, Fun) ->
    append_stream(
        Fun(head(T)),
        cons_stream(
            {head(S), head(T)},
            fun() ->
                pair(tail(S),
                     tail(T),
                     acc_fun(Fun, head(S)))
            end)).

希望有一个累积函数Fun,先不管Fun怎么实现,Wishful Thinking。每次碰到S和T的时候,先做一个累加,把前面看到过的那些S产生的结果先累加到要看的T (head(T)) 上去,然后再来做目前的S (head(S)) 和T (head(T)),剩下的就是用tail(S)和tail(T)递归来做,但还要通过acc_fun,把目前S (head(S)) 产生的结果累加到Fun上。

Fun有一个参数,返回流,流中的元素是Pair,Fun处理的是T流上的元素。acc_fun返回的还应该是一个Function,这个Function接收的还是T流上的元素,先Fun一把,才会把自己的东西追加上去,注意要保证acc_fun返回的是一个流。

acc_fun(Fun, Hs) ->
    fun(E) ->
        append_stream(
            Fun(E),
            cons_stream(
                {Hs, E},
                fun the_empty_stream/0))
    end.

最一开始的Fun肯定也要求是一个Function,只不过没有过掉任何S流中的元素,但要返回一个stream,所以一定是empty stream。

pair(S, T) ->
    pair(S, T,
         fun(_E) ->
            the_empty_stream()
         end).

根据程序,我尝试推导了一下计算过程。

S0...   T0...   Fun <=> fun(E) -> the_empty_stream() end

Fun(head(T)) <=> (fun(E) -> the_empty_stream())(T0)
             <=> the_empty_stream()
             
append_stream(
    the_empty_stream(),
    stream({S0, T0})
)

stream({S0, T0})
S1...   T1...   Fun <=> acc_fun(fun(E) -> the_empty_stream() end, S0))

Fun(head(T)) <=> (acc_fun(fun(E) -> the_empty_stream() end, S0))(T1)
             <=> append_stream(
                    (fun(E) -> the_empty_stream() end)(T1),
                    stream({S0, T1})
                 )
             <=> append_stream(
                    the_empty_stream(),
                    stream({S0, T1})
                 )
             <=> stream({S0, T1})
             
append_stream(
    stream({S0, T1})
    stream({S1, T1})
)            

stream({S0, T0}, {S0, T1}, {S1, T1})
S2...   T2...   Fun <=> acc_fun(acc_fun(fun(E) -> the_empty_stream() end, S0), S1)

Fun(head(T)) <=> (acc_fun(acc_fun(fun(E) -> the_empty_stream() end, S0), S1))(T2)
             <=> append_stream(
                    (acc_fun(fun(E) -> the_empty_stream() end, S0))(T2),
                    stream(S1, T2)
                 )
             <=> append_stream(
                    append_stream(
                        (fun(E) -> the_empty_stream())(T2),
                        stream(S0, T2)
                    )
                    stream(S1, T2)
                 )
             <=> stream({S0, T2}, {S1, T2})
             
append_stream(
    stream({S0, T2}, {S1, T2})
    stream({S2, T2})
)

stream({S0, T0}, {S0, T1}, {S1, T1}, {S0, T2}, {S1, T2}, {S2, T2})

Faye:关键在于,随着流往后走,acc_fun是递归嵌套的,用“过程”把S流前面的元素都累积起来,不同于一般的用数据结构累积的方式,所以不太容易想清楚。所以什么是过程、什么是数据,界限越来越模糊。

7.27.5 Printing pair stream

运行结果:

Stream of pairs 2 exec result 1

Stream of pairs 2 exec result 2

测试代码:

test_pair_2() ->
    [{1, 1},
     {1, 2}, {2, 2},
     {1, 3}, {2, 3}, {3, 3},
     {1, 4}, {2, 4}, {3, 4}, {4, 4}] 
        = collect_stream_limit(10, pair(integers(), integers())),

    test_pair_2_ok.

7.27.6 If I want all the pairs

Stream of pairs all pairs

现在想要所有的pairs,S和T依然是两个无限流。

老师展示了生成的顺序,先累积S的,再累积T的,然后做当前的,最后做tail的。

  1. {S0, T0}
  2. {S0, T1}
  3. {S1, T0}
  4. {S1, T1}
  5. {S0, T2} {S1, T2}
  6. {S2, T0} {S2, T1}
  7. {S2, T2}
  8. {S3, T0} {S3, T1} {S3, T0}
%% Stream of all pairs from two infinite streams
all_pair_acc(S, T, SFun, TFun) ->
    append_stream(
        SFun(head(T)),
        append_stream(
            TFun(head(S)),
            cons_stream(
                {head(S), head(T)},
                fun() ->
                    all_pair_acc(tail(S), tail(T),
                                 acc_first_fun(SFun, head(S)),
                                 acc_sec_fun(TFun, head(T)))
                end))).

先做S的累积,再做T的累积,然后做当前的,然后是tail(S)和tail(T),最后把head(S)和head(T)累积起来。

acc_first_fun(Fun, Hs) ->
    fun(E) ->
        append_stream(
            Fun(E),
            cons_stream(
                {Hs, E},
                fun the_empty_stream/0))
    end.

acc_sec_fun(Fun, Ht) ->
    fun(E) ->
        append_stream(
            Fun(E),
            cons_stream(
                {E, Ht},
                fun the_empty_stream/0))
    end.

acc_first_fun累积S、接收的是T的元素,acc_sec_fun累积T、接收的是S的元素。

all_pair_acc(S, T) ->
    all_pair_acc(S, T,
        fun(_E) ->
            the_empty_stream()
        end,
        fun(_E) ->
            the_empty_stream()
        end).

SFun和TFun起始返回都是empty stream。

7.27.7 Printing all pairs stream

运行结果:

Stream of pairs all pairs exec result

测试代码:

test_all_pair() ->
    [{1, 1},
     {1, 2}, 
     {2, 1},
     {2, 2}, 
     {1, 3}, {2, 3},
     {3, 1}, {3, 2}, 
     {3, 3},
     {1, 4}, {2, 4}, {3, 4},
     {4, 1}, {4, 2}, {4, 3},
     {4, 4}] 
        = collect_stream_limit(16, all_pair_acc(integers(), integers())),

    test_all_pair_ok.

这个流是由很多无限流共同组成的,每一行和每一列都是无穷流,我们人为指定了交织(Interleave)的顺序把这个流表现出来。

7.27.8 Recursive Thinking

Stream of pairs recursive thinking

换一个方式思考。

生成的顺序如下:

  1. {S0, T0}
  2. {S0, T1}, {S0, T2}, ……
  3. {S1, T0}, {S1, T1}, {S1, T2}, …… {S2, T0}, {S2, T1}, {S2, T2}, …… {S3, T0}, {S3, T1}, {S3, T0}, ……

没有考虑那么多的流不断往上叠加的因素。

当前的是第1步的{S0, T0},第2步的“{S0, T1}, {S0, T2}, ……”是一个无穷的流,第3步的流也是一个无穷的流。第2步可以看作把S0应用到所有的tail(T)上。剩下的就是把tail(S)跟T做,用递归思路解决。

那么,怎么把第2步的流和第3步的流合起来呢?

7.27.9 How to combine two inner streams?

%% Stream of all pairs, recursive thinking
all_pair(S, T) ->
    cons_stream(
        {head(S), head(T)},
        fun() ->
            ???(
                map_stream(
                    fun(E) -> {head(S), E} end,
                    tail(T)),
                all_pair(tail(S), T))
        end).

如果我们用append_stream合并这两个流。

all_pair(S, T) ->
    cons_stream(
        {head(S), head(T)},
        fun() ->
            append_stream(
                map_stream(
                    fun(E) -> {head(S), E} end,
                    tail(T)),
                all_pair(tail(S), T))
        end).

运行结果如下,肯定不符合我们的需要。

All pair exec result 1

在合并两个无限流的时候,要考虑流中间的顺序。不管是在做两个无限流的up_pair,还是all_pair的时候,都是人为指定了中间的顺序。

merge两个无限流,跟时间没有关系,公平起见,我们可以“交织”来做。以interleave(S, T)为例,cons_stream,先从S取一个出来,interleave(T, tail(S)),交换一下。

all_pair(S, T) ->
    cons_stream(
        {head(S), head(T)},
        fun() ->
            interleave(
                map_stream(
                    fun(E) -> {head(S), E} end,
                    tail(T)),
                all_pair(tail(S), T))
        end).

interleave(S1, S2) ->
    case is_empty_stream(S1) of
        true ->
            S2;
        false ->
            cons_stream(
                head(S1),
                fun() ->
                    interleave(S2, tail(S1))
                end)
    end.

根据程序,以两个正整数流为例,我尝试推导了一下计算过程,并尝试用图说明,看图的时候需注意,红色字体表示当前元素,红色背景色比蓝色背景色先做、深色背景色比浅色背景色先做。

S: [1, 2, 3, 4, ...]
T: [1, 2, 3, 4, ...]

All pair deduction start

all_pair(S, T) 
<=> % 根据all_pair定义替换all_pair
cons_stream(
    {1, 1},
    interleave(
        [{1, 2}, {1, 3}, {1, 4}, ...],
        all_pair([2, 3, 4, ...], [1, 2, 3, 4, ...])
    )
)
=> stream({1, 1})

All pair deduction 1 1

% 取出上面的interleave部分继续推导
interleave(
    [{1, 2}, {1, 3}, {1, 4}, ...],
    all_pair([2, 3, 4, ...], [1, 2, 3, 4, ...])
) 
<=> % 根据interleave定义替换interleave
cons_stream(
    {1, 2},
    interleave(
        all_pair([2, 3, 4, ...], [1, 2, 3, 4, ...]),
        [{1, 3}, {1, 4}, ...]
    )
)
=> stream({1, 1}, {1, 2})

All pair deduction 1 2

% 取出上面的interleave部分继续推导
interleave(
    all_pair([2, 3, 4, ...], [1, 2, 3, 4, ...]),
    [{1, 3}, {1, 4}, ...]
) 
<=> % 根据all_pair定义替换all_pair
interleave(
    cons_stream(
        {2, 1},
        interleave(
            [{2, 2}, {2, 3}, {2, 4}, ...],
            all_pair([3, 4, ...], [1, 2, 3, 4, ...])
        )
    ),
    [{1, 3}, {1, 4}, ...]
)
<=> % 根据interleave定义替换外层interleave
cons_stream(
    {2, 1},
    interleave(
        [{1, 3}, {1, 4}, ...],
        interleave(
            [{2, 2}, {2, 3}, {2, 4}, ...],
            all_pair([3, 4, ...], [1, 2, 3, 4, ...])
        )
    )
)
=> stream({1, 1}, {1, 2}, {2, 1})

All pair deduction 2 1

% 取出上面的外层interleave部分继续推导
interleave(
    [{1, 3}, {1, 4}, ...],
    interleave(
        [{2, 2}, {2, 3}, {2, 4}, ...],
        all_pair([3, 4, ...], [1, 2, 3, 4, ...])
    )
)
<=> % 根据interleave定义替换外层interleave
cons_stream(
    {1, 3},
    interleave(
        interleave(
            [{2, 2}, {2, 3}, {2, 4}, ...],
            all_pair([3, 4, ...], [1, 2, 3, 4, ...])
        ),
        [{1, 4}, ...]
    )
)
=> stream({1, 1}, {1, 2}, {2, 1}, {1, 3})

All pair deduction 1 3

% 取出上面的外层interleave部分继续推导
interleave(
    interleave(
        [{2, 2}, {2, 3}, {2, 4}, ...],
        all_pair([3, 4, ...], [1, 2, 3, 4, ...])
    ),
    [{1, 4}, ...]
)
<=> % 根据interleave定义替换内层interleave
interleave(
    cons_stream(
        {2, 2},
        interleave(
            all_pair([3, 4, ...], [1, 2, 3, 4, ...]), 
            [{2, 3}, {2, 4}, ...]
        )
    )
    [{1, 4}, ...]
)
<=> % 根据interleave定义替换外层interleave
cons_stream(
    {2, 2},
    interleave(
        [{1, 4}, ...],
        interleave(
            all_pair([3, 4, ...], [1, 2, 3, 4, ...]), 
            [{2, 3}, {2, 4}, ...]
        )
    )
)
=> stream({1, 1}, {1, 2}, {2, 1}, {1, 3}, {2, 2})

All pair deduction 2 2

% 取出上面的外层interleave部分继续推导
interleave(
    [{1, 4}, ...],
    interleave(
        all_pair([3, 4, ...], [1, 2, 3, 4, ...]), 
        [{2, 3}, {2, 4}, ...]
    )
)
<=> % 根据interleave定义替换外层interleave
cons_stream(
    {1, 4},
    interleave(
        interleave(
            all_pair([3, 4, ...], [1, 2, 3, 4, ...]), 
            [{2, 3}, {2, 4}, ...]
        ),
        [{1, 5}, ...]
    )
)
=> stream({1, 1}, {1, 2}, {2, 1}, {1, 3}, {2, 2}, {1, 4})

All pair deduction 1 4

% 取出上面的外层interleave部分继续推导
interleave(
    interleave(
        all_pair([3, 4, ...], [1, 2, 3, 4, ...]), 
        [{2, 3}, {2, 4}, ...]
    ),
    [{1, 5}, ...]
)
<=> % 根据all_pair定义替换all_pair
interleave(
    interleave(
        cons_stream(
            {3, 1},
            interleave(
                [{3, 2}, {3, 3}, ...],
                all_pair([4, ...], [1, 2, 3, 4, ...])
            )
        ),
        [{2, 3}, {2, 4}, ...]
    ),
    [{1, 5}, ...]
)
<=> % 根据interleave定义替换第二层interleave
interleave(
    cons_stream(
        {3, 1},
        interleave(
            [{2, 3}, {2, 4}, ...],
            interleave(
                [{3, 2}, {3, 3}, ...],
                all_pair([4, ...], [1, 2, 3, 4, ...])
            )
        )
    ),
    [{1, 5}, ...]
)
<=> % 根据interleave定义替换第一层interleave
cons_stream(
    {3, 1},
    interleave(
        [{1, 5}, ...],
        interleave(
            [{2, 3}, {2, 4}, ...],
            interleave(
                [{3, 2}, {3, 3}, ...],
                all_pair([4, ...], [1, 2, 3, 4, ...])
            )
        )
    )
)
=> stream({1, 1}, {1, 2}, {2, 1}, {1, 3}, {2, 2}, {1, 4}, {3, 1})

All pair deduction 3 1

% 取出上面的外层interleave部分继续推导
interleave(
    [{1, 5}, ...],
    interleave(
        [{2, 3}, {2, 4}, ...],
        interleave(
            [{3, 2}, {3, 3}, ...],
            all_pair([4, ...], [1, 2, 3, 4, ...])
        )
    )
)
...
=> stream({1, 1}, {1, 2}, {2, 1}, {1, 3}, {2, 2}, {1, 4}, {3, 1}, {1, 5})

...

运行结果:

All pair exec result 2

测试代码:

test_all_pair_2() ->
    [{1, 1}, {1, 2}, {2, 1}, {1, 3}, {2, 2}, {1, 4}, {3, 1}, {1, 5}] 
        = collect_stream_limit(8, all_pair(integers(), integers())),

    test_all_pair_2_ok.

7.27.10 Rewriting up pairs using stream interleaving

同理,我们可以换种方式重新思考up_pair。

%% Rewriting up pairs using stream interleaving
up_pair(S, T) ->
    cons_stream(
        {head(S), head(T)},
        fun() ->
            interleave(
                map_stream(
                    fun(E) -> {head(S), E} end,
                    tail(T)),
                up_pair(tail(S), tail(T)))
        end).

运行结果:

Up pair exec result

测试代码:

test_up_pair() ->
    [{1, 1}, {1, 2}, {2, 2}, {1, 3}, {2, 3}, {1, 4}, {3, 3}, {1, 5}]
        = collect_stream_limit(8, up_pair(integers(), integers())),

    test_up_pair_ok.   

7.27.11 Exercise

Stream of pairs exercise

可以用interleave的方式去做,未做。

7.28 Memorization

7.28.1 mem_proc

delay放的是对这个表达式在需要的时候才会被求值的Promise。对流的访问没有索引,跟列表或数组是完全不一样的,只能不停地tail,nth_stream也是不停地调用tail。哪怕是之前访问过的元素,再访问还是要不停地tail。

有没有一种方式,如果计算过了,能够把它存住,下次再访问的时候,虽然还是不停的tail,但其实没有产生真实的计算量。

mem_proc

改起来也很方便,不需要改上面所有的程序。原来的delay是对表达式延迟计算的承诺,现在变成对表达式延迟计算承诺完了、一旦计算完了、做一个memorize的操作,把它缓存住。

mem_proc的函数签名必须跟“fun() -> Exp end”的函数签名一样,应该是一个无参的函数。这个函数有个小技巧,如果已经被运行过了,它应该能够知道自己的结果,下次再调用它的时候,它应该不用再去做实际的计算了。这就是mem_proc要完成的。

cons_stream(H, T) ->
    Ref = make_ref(),
    F = fun() ->
            case get(Ref) of
                undefined ->
                    Res = T(),
                    put(Ref, Res),
                    Res;
                R ->
                    R
            end
        end,
    [H, F].

这是一个最简单的实现方式,用进程字典。cons_stream对外并没有变化,依然是head跟Function合在一起的一个数据结构。在不断调tail(T)的时候,会在流中做一些重复运算,这样做可以减少计算量。

一开始看这段代码,我对Ref有点疑问,每次调用cons_stream的时候都会make出新的Ref,这样的话,每次get(Ref)肯定是undefined。但很明显运行结果不会是我想的这样。所以加了打印语句,并结合nth_stream和integers_from,详细分析了计算过程。

cons_stream(H, T) ->
    Ref = make_ref(),
    io:format("make_ref ~p~n", [Ref]),
    F = fun() ->
            io:format("Ref ~p~n", [Ref]),
            case get(Ref) of
                undefined ->
                    Res = T(),
                    put(Ref, Res),
                    io:format("put {~p, ~p}~n", [Ref, Res]),
                    Res;
                R ->
                    io:format("get {~p, ~p}~n", [Ref, R]),
                    R
            end
        end,
    [H, F].

head([H, _]) -> H.
tail([_, T]) -> T().

the_empty_stream() -> [].

is_empty_stream([]) -> true;
is_empty_stream(_) -> false.

%% Terminals
nth_stream(Idx, S) ->
    case is_empty_stream(S) of
        true ->
            throw(no_this_idx);
        false ->
            if
                Idx =:= 1 ->
                    head(S);
                true ->
                    nth_stream(Idx - 1, tail(S))
            end
    end.

%% Infinite Stream
integers_from(N) ->
    cons_stream(N, fun() -> integers_from(N + 1) end). 

先调用integers_from(1)生成从1开始的正整数流。

stream_cache nth_stream exec result 1

I = integers_from(1)
<=> % 根据integers_from定义替换integers_from
cons_stream(1, fun() -> integers_from(2) end)
{
    Ref = make_ref() => #Ref<0.0.0.93>,
    F => 
        fun() ->
            case get(Ref) of % closure Ref #Ref<0.0.0.93>
                undefined ->
                    Res = (fun() -> integers_from(2) end)()
                        => integers_from(2),
                    put(Ref, Res),
                    Res;
                R ->
                    R
            end
        end
}

构建integers_from(1)流时,生成#Ref<0.0.0.93>,被闭包在了F中。

从流I中获取第1个元素。

nth_stream(1, I) <=> head(I) => 1

stream_cache nth_stream exec result 2

% nth_stream(2, I) first
nth_stream(2, I) <=> nth_stream(1, tail(I))

tail(I) <=> F()
<=>
(
    fun() ->
        case get(Ref) of % closure Ref #Ref<0.0.0.93>
            undefined ->
                Res = integers_from(2),
                put(Ref, Res),
                Res;
            R ->
                R
        end
    end
)()
<=>
case get(Ref) of % closure Ref #Ref<0.0.0.93>
    undefined ->
        Res = integers_from(2),
        put(Ref, Res),
        Res;
    R ->
        R
end
<=> % undefined branch
Res = integers_from(2),
put(#Ref<0.0.0.93>, Res),
Res;

由第一条打印信息可以知道,#Ref<0.0.0.93>确实被闭包到了F中。在执行“Res = integers_from(2)”这条语句时,会生成新的从2开始的正整数流,此时会生成新的#Ref<0.0.0.104>,第二条打印信息也印证了这一点。需要注意的是,这个新的Ref (#Ref<0.0.0.104>)是属于integers_from(2)这个新流的,并不会影响到integers_from(1)流中的Ref (#Ref<0.0.0.93>)。所以在执行“put(Ref, Res)”这条语句时put的Ref还是#Ref<0.0.0.93>,第三条打印信息也印证了这一点。

integers_from(2)
<=> % 根据integers_from定义替换integers_from
cons_stream(2, fun() -> integers_from(3) end)
{
    make_ref() => Ref #Ref<0.0.0.104>
    F => 
        fun() ->
            case get(Ref) of % closure Ref #Ref<0.0.0.104>
                undefined ->
                    Res = (fun() -> integers_from(3) end)(),
                    put(Ref, Res),
                    Res;
                R ->
                    R
            end
        end
}

nth_stream(1, tail(I))
<=>
nth_stream(1, integers_from(2))
<=>
head(integers_from(2)) => 2

再次从流I中获取第2个元素。

stream_cache nth_stream exec result 3

% nth_stream(2, I) again
nth_stream(2, I) <=> nth_stream(1, tail(I))

tail(I) <=> F()
<=>
(
    fun() ->
        case get(Ref) of % closure Ref #Ref<0.0.0.93>
            undefined ->
                Res = integers_from(2),
                put(Ref, Res),
                Res;
            R ->
                R
        end
    end
)()
<=>
case get(Ref) of % closure Ref #Ref<0.0.0.93>
    undefined ->
        Res = integers_from(2),
        put(Ref, Res),
        Res;
    R ->
        R
end
<=> % R branch
integers_from(2)

不论是第一次还是第二次从流I中获取第2个元素,操作的都是同一个流I,F也是同一个,因此被闭包到F中的Ref也始终是#Ref<0.0.0.93>,第一条打印信息印证了这一点。因为已经缓存过了,所以直接从进程字典中取出缓存过的integers_from(2)流,而且也没有构造integers_from(2)流的Ref打印信息。

nth_stream(1, tail(I))
<=>
nth_stream(1, integers_from(2))
<=>
head(integers_from(2)) => 2

7.28.2 Difference on memorization

在Erlang中选择了Function来对Promise进行delay,这只是一种实现方式。在Lisp语言中是不一样的,在Lisp中定义的是一个真实的数据结构,这个数据结构的这个值放在那个地方就是真地放在那个地方。这一点会特别影响,如果流不是用生成式的方式定义出来,而是隐式定义出来的,Erlang中的memorization跟Lisp中的定义是不一样的。

在Lisp中定义一个fibs流,它是一个数据结构。当我计算出tail以后,再调这个tail下面的东西的话,fibs真地能够利用前面计算的结果来进行计算。

在Lisp中定义的fibs:

LISP fibs

“(tail fibs)”中的fibs就是“(define fibs”中的fibs。也就是说,add-stream的时候,两个fibs是同一个东西,在同一个引用上的同一块数据。

“define fibs”定义了一个fibs变量,可以直接在add-stream和tail中用它。在Erlang中不可以这样定义和使用变量,但函数可以这样定义和使用。因为函数在Erlang中是first-class的,函数只要定义了,就在名字空间中存在了,所以才可以定义递归函数。但正因为是递归函数,add_stream中的两个fibs和最开始的fibs肯定不是同一个东西。

在Erlang中定义的fibs:

% Defining streams implicitly
fibs() ->
    cons_stream(0,
                fun() ->
                    cons_stream(1,
                                fun() ->
                                    add_stream(fibs(), tail(fibs()))
                                end)
                end).

为了解决缓存的问题,我们加上了mem_proc,如果语义是Lisp写的那样,那就已经解决问题了。但mem_proc只适用于用生成式的方式定义的流,对于直接将流作为整体来操作、特别是用递归函数来模拟的流是不能解决问题的。

integers_from()用生成式的方式定义正整数流,mem_proc的处理过程如下,每个计算结果只对应一个缓存。

mem_proc integers_from exec result

fibs()用隐式的方式定义流,mem_proc的处理过程如下,每个计算结果对应不止一个缓存,也就是说,还是存在重复计算。

mem_proc fibs exec result 1

mem_proc fibs exec result 2

stream_cache:nth_stream(2, Fibs)打印信息中#Ref<0.0.0.59>就是1对应的重复计算。

mem_proc fibs exec result 3

stream_cache:nth_stream(3, Fibs)打印信息中#Ref<0.0.0.59>、#Ref<0.0.0.62>、#Ref<0.0.0.96>、#Ref<0.0.0.57>都是1对应的重复计算。

如果不改变流的定义方式,那怎么解决呢?

7.28.3 Pseudo simulation using process dictionary

我们想要的是指向同一个流。给cons_stream产生的stream起一个名字缓存起来,进程字典是最简单的方式,也可以在系统中专门起一个进程,或者放在ets中。

%% Pseudo simulation using process dictionary
naming_cons_stream(N, H, T) ->
    put(N, cons_stream(H, T)),
    get(N).

fibs() ->
    naming_cons_stream(
        fibs,
        0,
        fun() ->
            cons_stream(
                1,
                fun() ->
                    Fibs = get(fibs),
                    add_stream(Fibs, tail(Fibs))
                end)
        end
    ).

这样修改以后,虽然fibs()用隐式的方式定义流,mem_proc就能解决问题了,处理过程如下,每个计算结果只对应一个缓存。

mem_proc fibs naming exec result 1

mem_proc fibs naming exec result 2

mem_proc fibs naming exec result 3

我们还可以通过下面这种方式查看用递归函数定义流有多少计算在里面,加了mem_proc是否能减少计算量。如果cnt加1,就说明mem_proc起作用了。对于fibs来说,我们真的希望第三个元素的计算依赖于前面两个元素,可以看一下是不是真的用上了前面两个元素。

cons_stream(H, T) ->
    Ref = make_ref(),
    % io:format("make_ref ~p~n", [Ref]),
    F = fun() ->
            % io:format("Ref ~p~n", [Ref]),
            case get(Ref) of
                undefined ->
                    Res = T(),
                    put(Ref, Res),
                    % io:format("put {~p, ~p}~n", [Ref, Res]),
                    Res;
                R ->
                    % io:format("get {~p, ~p}~n", [Ref, R]),
                    put(cnt, get(cnt) + 1),
                    R
            end
        end,
    [H, F].

可以比较一下fibs和fibs_naming的运行结果,fibs_naming确实用上了缓存的前面两个元素的计算结果。

mem_proc fibs vs fibs_naming exec result

7.29 Stream implementation based on process

7.29.1 Stream implementation

前面我们在构建stream的时候选择直接用Pair把head和tail粘合(glue)在一起。在Erlang中,进程是组织程序的单位,就像在LISP和Scheme中函数或者Procedure过程是组织程序的单位一样。

如果用进程Process而不是Pair作为流的glue方式,该怎么做呢?Stream是一个数据抽象,我们想要改的是它的构造子和选择子,对上面不会有任何影响,不用修改在Stream上做的所有变换。

一个Stream就是一个Process。tail(T)出来的也应该是一个进程。不管是生成式定义的流还是隐式定义的流,要都能用。如果用Process模拟Stream,流在计算完了之后可以缓存的特性是内置的,在Process的loop带的状态就可以缓存,不再需要进程字典。一个进程有内部的状态,它表现出来应该跟真实的数据结构是一样的,而不是一个递归函数。

-module(stream_p).
-compile(export_all).

-type stream() :: pid() | [].
-spec cons_stream(term(), fun((stream()) -> stream())) -> stream().

cons_stream(H, T) ->
    spawn(fun() -> stream_idle(H, T) end).

head(S) ->
    Ref = make_ref(),
    S ! {ask_head, Ref, self()},
    receive
        {head, Ref, H} ->
            H
    end.

tail(S) ->
    Ref = make_ref(),
    S ! {ask_tail, Ref, self()},
    receive
        {tail, Ref, T} ->
            T
    end.

在cons_stream()起一个进程,进入idle状态。head和tail是选择子。

注意cons_stream的spec,原来的cons_stream的T是无参的,这里的cons_stream的T带了stream的参数,所以跟原来的cons_stream比,现在的cons_stream用起来不一样。为什么要带stream参数呢?这个stream参数就流自身。原来在隐式定义流时,调用递归函数产生的是一个新的流。如果不这么做,Process本来就是我自己,计算后的状态就在Process内部,本身已经内置了mem_proc的性质。另外,既然是Process,它有唯一标识,对我来说就是一个变量。

但正因为加了流自身这个参数,状态要分成idle、tl_waiting、ready。

stream_idle(H, TF) ->
    receive
        {ask_head, Ref, Pid} ->
            Pid ! {head, Ref, H};
            stream_idle(H, TF);
        {ask_tail, Ref, Pid} ->
            S = self(),
            spawn(
                fun() ->
                    TR = TF(S),
                    S ! {tail_ready, TR}
                end),
            stream_tl_waiting(H, [{Pid, Ref}])
    end.

在idle状态中,head还是head,但是tail,因为没有人要我去算,所以它还只是一个Promise,TF。

因为只有两个选择子,所以这里只有两个外部消息需要处理。对于ask_head消息,就把head返回给它,仍然进入stream_idle的状态。idle的意思是,我还是原先的,我没有对任何我Promise的东西进行计算。

对于ask_tail消息,又起了一个进程。如果用cons_stream构建的,TF(S)就应该是一个进程,为什么还要再起一个进程?在这个进程中只是做了一个计算,TF(S)产生了一个流,计算完了以后给自己发了一个{tail_ready, TR}消息。为什么还要起一个进程呢?

在很多隐式定义的流中,既然是隐式定义的,再加上流本身被delay的Promise,我有可能只需要知道前头,就希望这个流构建出来,然后再往后计算。也就是说,后面的这些计算很可能要基于这个流来做。现在的流是一个进程,在Erlang中的进程只能收发消息。在add_stream的那些例子中,比如integers():

integers() ->
    cons_stream(1, 
                fun() ->
                    add_stream(integers(), ones())
                end).
stream_idle(H, TF) ->
    receive
        {ask_head, Ref, Pid} ->
            Pid ! {head, Ref, H};
            stream_idle(H, TF);
        {ask_tail, Ref, Pid} ->
            S = self(),
            % spawn(
            %     fun() ->
                    TR = TF(S),
            %       S ! {tail_ready, TR}
            %     end),
            stream_tl_waiting(H, [{Pid, Ref}])
    end.

当执行add_stream(integers(), ones())时,要开始计算了,从integers()中取出head,再从ones()中取出head,把两个head相加。从integers()中取出head的操作,相当于对integers流(进程)再发一个消息。如果没有那个spawn,写在spawn()中的代码其实是在integers流(进程)中,给integers流(进程) 本身又发了一个消息(ask_head),ask_tail消息还没有处理完,新发的ask_head消息又在integers进程的mail_box中,ask_tail消息就没法处理完成了。所以要再起一个进程。

另外,如果这里的cons_stream的T不带参数的话,用进程来模拟Stream或者作为Stream的基础设施,它能给Stream带来的效果是不大的。只有cons_stream的T带参数,整个就会具备stream所有的内置的mem_proc,再加上隐式定义时的所有东西。对于生成式的流,不起进程也没问题,起进程也可以,但对于隐式的流,要起进程。

但正是因为spawn进程,加了一个异步机制,不能直接从idle变成tail_ready状态,因为在异步过程中很可能还有其他消息进来。从idle变成tail_waiting状态。

stream_tl_waiting(H, TWaits) ->
    receive
        {ask_head, Ref, Pid} ->
            Pid ! {head, Ref, H},
            stream_tl_waiting(H, TWaits);
        {ask_tail, Ref, Pid} ->
            stream_tl_waiting(H, [{Pid, Ref} | TWaits]);
        {tail_ready, TR} ->
            [Pid ! {tail, Ref, TR} || {Pid, Ref} <- TWaits],
            stream_ready(H, TR)
    end.

在stream_tl_waiting()中除了要处理ask_head和ask_tail这两个外部消息,还要处理tail_ready内部消息。

stream_ready(H, TR) ->
    receive
        {ask_head, Ref, Pid} ->
            Pid ! {head, Ref, H},
            stream_ready(H, TR);
        {ask_tail, Ref, Pid} ->
            Pid ! {tail, Ref, TR},
            stream_ready(H, TR)
    end.

7.29.2 Redefining Integer and Fibonacci Streams

为了重新定义integers和fibs,先增加以下代码:

the_empty_stream() -> [].

is_empty_stream([]) -> true;
is_empty_stream(_) -> false.

add_stream(S1, S2) ->
    case {is_empty_stream(S1), is_empty_stream(S2)} of
        {true, true} ->
            the_empty_stream();
        {true, false} ->
            S2;
        {false, true} ->
            S1;
        {false, false} ->
            cons_stream(
                head(S1) + head(S2),
                fun(_S) ->
                    add_stream(tail(S1), tail(S2))
                end)
    end. 

需要注意,add_stream()中的cons_stream()的第二个参数需要带参,就是本身正在定义的流,但因为用不到这个流本身,所以参数变量名加一个“_”。其实流操作相关的代码如果包含了cons_stream,它的第二个参数都必须是带参的函数,否则也不符合spec。

Ones = cons_stream(1, fun(S) -> S end).

之前定义ones流稍微有点问题,虽然永远都是1,但每次调用函数都是一个新的流。现在肯定能解决这个问题了。在cons_stream的时候,原来的第二个参数fun是无参的,现在是带参的,而这个参数就是你现在正在定义的流。

fun(S)的输入流S就是正在定义的流,输出还是流,对ones来说,除了1以后剩下的还是ones,所以返回S。可以看到,Ones现在是一个数据、一个变量。之前的ones是一个函数,不停地调用。

Ins = cons_stream(
        1,
        fun(I) ->
            add_stream(I, Ones)
        end).

提供一个带参的函数“fun(I)”,只要知道这个参数的含义,知道怎么结合,保证返回的是一个流就可以写出来了。

Fibs = cons_stream(
        0,
        fun(S1) ->
            cons_stream(
                1,
                fun(S2) ->
                    add_stream(S1, S2)
                end)
        end).

S1就是Fibs,S2就是tail(Fibs),我们就是要把Fibs和tail(Fibs)加起来。

测试代码:

test() ->
    Ones = cons_stream(1, fun(S) -> S end),

    Ins = cons_stream(
            1,
            fun(I) ->
                add_stream(I, Ones)
            end),

    Fibs = cons_stream(
            0,
            fun(S1) ->
                cons_stream(
                    1,
                    fun(S2) ->
                        add_stream(S1, S2)
                    end)
            end),

    1 = nth_stream(4, Ones),
    4 = nth_stream(4, Ins),
    2 = nth_stream(4, Fibs),

    test_ok.

这里用到的nth_stream跟之前的一样,不用做任何修改。

Erlang的Tuple Module能够根据你传入的不同的Primitive Module,换不同的下面的数据抽象、George的实现,上面一点都不用改。

7.30 Modularity from stream’s view –Cesaro (rand_stream)

7.30.1 random stream

从流的角度看,什么是流编程的模块化,或者说,模块化的能力。

还是Cesaro这个例子。我们希望不断地获取随机数,能不能生成一个random_stream流,这个流中每次都能保证产生随机数。

%% Cesaro
-define(RANGE, 100000).

rand_stream() ->
    {Rd, NSeed} = random:uniform_s(?RANGE, erlang:now()),
    cons_stream(
        {Rd, NSeed},
        fun(RS) ->
            map_stream(
                fun({_, Seed}) ->
                    random:uniform_s(?RANGE, Seed)
                end,
                RS)
        end).

在原来的流上做一个变换产生后面的流,本来后面的流中的元素就不需要在构建这个流的时候就产生。把流作为一个整体,在它上面做操作。random算法就是一个映射,把前一个东西做一个操作产生后一个东西。

这里用到的map_stream的cons_stream也要稍微改一下,改成带参的。

map_stream(Proc, S) ->
    case is_empty_stream(S) of
        true ->
            the_empty_stream();
        false ->
            cons_stream(
                Proc(head(S)), 
                fun(_) ->
                    map_stream(Proc, tail(S))
                end)
    end. 

7.30.2 cesaro stream

两个随机数,判断它们的gcd是不是等于1。

写一个map_successive_pairs,给一个随机数的流和Func,产生一个新流,新流里的每一个元素都是一对随机数。

map_successive_pairs(F, S) ->
    cons_stream(
        F(head(S), head(tail(S))),
        fun(_) ->
            map_successive_pairs(
                F,
                tail(tail(S)))
        end).

这是生成式的定义。在另外一个流上做map映射。

gcd(X, 0) -> X;
gcd(X, Y) -> gcd(Y, X rem Y).

cesaro_stream(Rs) ->
    map_successive_pairs(
        fun({R1, _}, {R2, _}) ->
            gcd(R1, R2) =:= 1
        end, Rs).

7.30.3 monte carlo stream

monte_carlo(S, Total, Passed) ->
    NPassed = case head(S) of
                true ->
                    Passed + 1;
                false ->
                    Passed
              end,
    cons_stream(
        NPassed / (Total + 1),
        fun(_) ->
            monte_carlo(tail(S), Total + 1, NPassed)
        end).

7.30.4 pi stream

pis() ->
    map_stream(
        fun(0.0) -> 0.0;
           (P) -> math:sqrt(6 / P)
        end,
        monte_carlo(cesaro_stream(rand_stream()), 0, 0)).

7.30.5 pi

pi(Toler) ->
    stream_limit(pis(), Toler).

stream_limit(S, Toler) ->
    stream_limit(head(S), tail(S), Toler).

stream_limit(Pre, S, Toler) ->
    Next = head(S),
    if
        abs(Pre - Next) =< Toler ->
            Next;
        true ->
            stream_limit(Next, tail(S), Toler)
    end.

跟之前求平方根一样,可以设定一个Tolerance,当流中的两个元素足够接近时,终止计算。

我自己测试了一下,感觉不容易算准。

stream_pi exec result 1

所以又用nth_stream计算了一下。

stream_pi exec result 2

可以看到,stream到stream的变换全部是纯数学函数,没有任何中间状态的变化。

用这个例子说明为什么要引入OO,希望模块化,再到Stream,再来看用stream的模块化能力,怎么把这个例子做出来。

7.31 A functional-programming view of time

从函数式编程的角度看一看时间。

赋值刻画了每个时间的状态,我们认为我们跟我们身处的世界打交道的时候,我们是它的一部分。所谓的一部分是指,我跟它隔离开来,我有我的独立变化,它有它的独立变化,时间被显式化在外面,不断在变化、在交互。

但在函数式过程中,希望有一个定义良好的数学函数能够把时间包在里面,我会觉得时间跟我是融为一体的,没有状态,没有赋值,也没有其他的。这样,程序的组织能力会比较高,比较好。

来看一个银行账户的例子。

%% A functional-programming view of time
stream_withdraw(Balance, AmountS) ->
    cons_stream(
        Balance,
        fun(_) ->
            stream_withdraw(
                Balance - head(AmountS),
                tail(AmountS))
        end).

Bank account merge

先不看Merge和Merge左边的部分。客户在银行账户(Bank Account)上可能存钱、可能取钱,经过Bank Account计算,出来的是在这个账号上剩下多少钱的流。Bank Account就是一个纯数学函数。客户的这些行为,如果从外部来看的话,没有任何变化。

从OO的角度来看,Bank Account是一个对象,它有内部状态。从流的角度来看,Bank Account左边就是不断在存取钱的流,右边是当前账户余额的流,其实就是两个流的变换,没有赋值、没有状态,是一个纯数学函数,时间合在了流上,不需要单独考虑。我们觉得非常好,也是函数式编程一贯提倡的。数学函数比较好,又容易组合。

我们在OO中引入时间以后,要考虑到事件的顺序问题,要显式地考虑时间,加入并发控制,加入锁,要解决同步的问题。如果我们用流的方式来思考的话,如果这个银行账户只属于一个人的话,就没有问题。如果银行账户是公有的,又有George的,又有Harold的,我希望Bank Account左边还是一个流,由于流的建模方式和现实世界建模方式不一样,现实世界就是要交互的、跟时间有关的,GJS’ request是一个独立的流进行存取,Harold’s request也是一个独立的流进行存取,这两个流需要merge。这个merge是跟现实世界相关的,没有办法做到纯数学函数,这里的归并是一种关系,不是一种函数,因为给你任意两个跟时间相关的流,它们的归并一定存在多种可以允许的顺序,既然是不确定的,它就不是一个纯数学函数。归并这两个流有很多顺序,比如 George上取一个,Harold上取一个,可行吗?按照之前确定性的interleave函数的实现,如果George不在,interleave又一定要从George上取一个,那就永远被挂住了。跟现实世界时间有关的问题,流编程同样会碰到。Merge不是一个纯数学函数,它只是一种关系,任何两个流的归并有无数种可能的方式。

在OO中要显式的做时间同步,在流编程或者函数式编程中,真正跟现实世界时间相关的,要考虑fair merge,必定要跟时间打交道,必定要选择一种顺序。

OO和流这两种不同的看待问题的建模视角,各有其优劣,但都会碰到一个难点。如果我们承认数学函数是美的、优雅的、能把概念分得很清晰。如果我们换种视角,把程序的组织顺序和实际执行顺序解耦,这样来编程的话,程序的组合性就很高。我们完全可以把内部的东西作为一个整体来考虑,只在有些边界、要跟实际的现实世界打交道的时候,再把类似Merge的东西引入进来。这样可以极大地增强软件的组合能力,降低它的复杂性。

时间并发性是本质的问题,是不可被逾越的。

⚠️ **GitHub.com Fallback** ⚠️