201611 SICP系列 第7讲 流计算模型 学习笔记 (三) - xiaoxianfaye/Learning GitHub Wiki

7 Stream

7.11 Power is exactly weakness

既然从“流”的视角看待问题的解决方案这么好,概念层次这么清晰,又是那么多程序的生成范式,有那么多算子:map、filter、accumulate、enumerate等,这些能帮助我们组合出很多程序。比如,八皇后问题本来以为是一个控制结构的问题,其实可以用“流”的方式表现出来。那为什么还要有传统的编程方式呢?原因是什么呢?先来看下面这个例子。

Second prime

用“流”的方式实现如下:

second_prime() ->
    head(
        tail(
            filter_stream(
                fun is_prime/1,
                enumerate_interval(10000, 1000000)))).

我可能只想要这个区间的第二个质数,但至少在enum_interval这一步要有足够的内存把这些数先存下来,才可能有机会再往下做。即便内存不是问题,计算消耗也很大,我本来只想要这个区间的第二个质数,但是上面的实现其实把这个区间的所有质数都算出来了。第一内存可能有问题,第二性能也有问题。

传统编程把enumerate、map、filter、accumulate和控制过程全部都混杂在一起的那样一种概念不清晰的丑陋之处其实正是传统编程能够高效的关键,它的缺点正是它的强大之处。

Stream能够让程序的概念层次很清晰、程序组织更灵活、组织复杂性很高的程序,但却不实用。

有没有一种方式既能够在流的概念层次上清晰,能够看到map、filter、accumulate,这样就能随意组装,还能够保持传统编程的高效?鱼和熊掌兼得。

当然是可以的。关键在于Stream不是列表。

7.12 Streams are not lists

Streams are not list

前面的Stream使用List来表现的,但其实Stream本身跟List有着完全不一样的特性。Stream不是列表,它需要自己的数据抽象。它是一个自己能够不断地计算出自身的数据结构。

按需产生数据,按需的意思就是要“tug(拽)一下”,不拽不产生数据。现在要第一个质数,拽一下“filter is_prime”右边的箭头,“filter is_prime”的元素来自于“enumerate_interval”,于是就拽一下“enumerate_interval”,直到一个质数生成出来。

Stream是一个按需来计算出自己的数据结构。Stream不是列表。如果没需要,它根本不会产生任何计算。

7.13 On-demand data structure

数据结构感觉是一个静态的东西。按需的数据结构能够根据需要去计算,产生出你需要的东西,所以感觉更像是一个过程。因为只有过程才会计算自己,而目前接触到的数据结构感觉都是静态的。

按需的数据结构,你要的时候,我就去计算出来给你,你不要的时候,我不会产生多余的计算。这样才能达到,我能按照这样的方式编写程序,我能达到概念上的清晰,同时又能得到传统程序能够给我带来的在内存上的精简和在性能上的提高。鱼和熊掌都能兼得。

On-demand data structure

说明:

  1. cons_stream把X和delay(Y)合到一起,把X跟一个能够在需要的时候马上把Y计算出来的这么一个承诺粘合在一起。delay是指我给你一个承诺,我现在先不去计算,你需要的时候我就去计算。
  2. 当你需要的时候,即访问tail的时候,把承诺取出来,强制(force)产生一个计算。
  3. head和之前是一样的。

7.14 The stream implementation in action

为了便于理解,贴上enumerate_interval、filter_stream和second_prime的相关代码。

enumerate_interval(Low, High) when Low > High ->
    the_empty_stream();
enumerate_interval(Low, High) ->
    cons_stream(Low, enumerate_interval(Low + 1, High)).

filter_stream(F, S) ->
    case is_empty_stream(S) of
        true ->
            the_empty_stream();
        false ->
            case F(head(S)) of
                true ->
                    cons_stream(head(S), filter_stream(F, tail(S)));
                false ->
                    filter_stream(F, tail(S))
            end
    end.

second_prime() ->
    head(
        tail(
            filter_stream(
                fun is_prime/1,
                enumerate_interval(10000, 1000000)))).

Stream implementation action 1

第一步从enum_interval开始。X是10000,delay(Y)是delay(enum_interval(10000+1, 1000000))。把X和delay(Y)粘合起来就是cons_stream(10000, delay(enum_interval(10000+1, 1000000)))。

对filter_stream来说,肯定要访问流中的第一个元素,用断言去判断第一个元素是真还是假。10000不是质数,根据filter_stream的实现机制(不满足过滤条件的false分支),filter_stream会继续取后面的流cons_stream(10000, delay(enum_interval(10000+1, 1000000)))的tail。之前放了一个promise,一调流的tail,就会force这个promise,即force(delay(enum_interval(10000+1, 1000000))),就会执行enum_interval(10000+1, 1000000),返回cons_stream(10001, delay(enum_interval(10001+1, 1000000)))。

Stream implementation action 2

10001是质数,根据filter_stream的实现机制(满足过滤条件的true分支),filter_stream会继续取后面的流cons_stream(10001+1, delay(enum_interval(10001+1, 1000000)))的tail。因为要的是第二个质数,还会继续往下,直到cons_stream(10009, …)。cons_stream(10009, …)的后面一部分的filter_stream也是整个被promise delay住的。

强制执行enum_interval(10000+1, 1000000),返回cons_stream(10001, delay(enum_interval(10001+1, 1000000))),交给filter_stream。10001已经是第一个质数了。按此步骤一直往下,直到cons_stream(10009,…)。因为10009是质数,根据filter_stream的实现机制(满足过滤条件的true分支),应该是cons_stream(10009, delay(filter_stream(fun is_prime/1, tail(cons_stream(10009, delay(enum_interval(10009+1, 1000000)))))))。可以看到,filter_stream是一个流,enum_interval是一个流,不拽前面的(filter_stream)就不会往后面(enum_interval)要。

按“On-demand data structure”中的语义理解delay和force以后,比较一下这个执行过程跟程序有什么差别?

写程序的时候,enum_interval在下面,filter_stream在上面。但是一旦执行起来以后,会发现enum_interval和filter_stream在整个执行序上是交叉在执行。

Stream之所以能达到高效的目的,就在于你看到的执行顺序跟你写程序的顺序是不一样的。这样的效果是由delay和force达成的。真正的执行顺序和传统编程方式的执行顺序是一样的。而写程序的时候完全就是按照概念非常清晰地写的。

为什么我们可以用流这样来解决问题,既可以概念清晰,又可以做到高效?是因为解耦了两个顺序,一个是程序的组织顺序,一个是程序的执行顺序。之所以能解耦,就是delay和force带来的。

我们往往直接用计算过程随着时间变化来为程序建模,写出来的程序就是程序的执行过程。但如果我们换一个角度来思考问题的话,表面上还可以非常清晰地写程序,实际执行的时候是非常高效的。

7.15 Implementing delay and force

Implementing delay and force

原书中是用Lisp或Scheme实现的,delay(Exp)不是一个函数,Exp是不被求值的。对于大多数语言来说,会认为delay(Exp)是一个函数,函数要能被调用的话,函数参数要被求值。Lisp和Scheme是非常柔性的语言,可以自己定义语法,Exp不被求值。

我们现在用到的绝大多数语言都是应用序。在替换模型中,应用序求值的过程是:函数的所有参数先全部求值,再把Body拿出来,把参数换进去,然后再对Body求值。但是在LISP或者Scheme中,delay和force不是函数,是内置的一个语法的东西,它提供了很多原语能够让你直接定义出delay和force,Exp会直接当原始的Exp来看待。

在Erlang或函数式编程中,用function来模拟,也能达成目的——给你一个承诺但我不去计算,从而分离程序的组织顺序和执行顺序。

cons_stream的第二个参数必须要传一个function,只有这样function的Body才不会被立即执行。force很简单,直接调用function。

-type stream() :: {term(), fun(() -> stream())}.

%% Stream
-spec cons_stream(term(), fun(() -> stream())) -> stream().
cons_stream(H, T) -> [H, T].
head([H, _]) -> H.
tail([_, T]) -> T().

the_empty_stream() -> [].

is_empty_stream([]) -> true;
is_empty_stream(_) -> false.

查看cons_stream的spec,cons_stream中的T本身是个函数,这个函数是无参的,产生出一个流,cons_stream最后构造出一个流。

如果没有函数包一层的话,T本身是个流,我们希望能够延迟计算,所以T是一个函数,本身产生出一个流,cons_stream最后构造出一个流。

delay真正的本质在于:解耦了程序表面上所看到的顺序跟在计算机中真正执行的顺序之间的关系。

它又改变了我们的观念。在八皇后问题中,一开始把计算过程描述成时间相关的状态,在这里不再直接把程序中所看到的顺序映射到计算机中实际执行时的顺序。换句话说,我们不再用计算机中的实际执行时的顺序来对我们的计算过程建模。原先我们对计算过程的建模,编程出来的顺序完全就是计算机中执行的顺序,等于是用计算机中实际执行时的顺序来直接映射到程序中的顺序。现在,我们完全抛弃了这样一个概念。这就是delay带给我们的自由度。

这才是delay的关键,延迟计算只是一种实现,其实delay想做的事情是两个顺序的解耦,完全颠覆对计算模型建模的角度。

7.16 Delay version of stream implementation

7.16.1 Operations on stream

把凡是调用了原cons_stream()函数的代码都改为调用新的cons_stream(),第一个参数不变,第二个参数用fun wrap一下原cons_stream()函数的第二个参数。其它没有调用原cons_stream()函数的代码不用修改。

修改了map_stream、filter_stream、append_stream、enumerate_tree和enumerate_interval。

map_stream(Proc, S) ->
    case is_empty_stream(S) of
        true ->
            the_empty_stream();
        false ->
            cons_stream(Proc(head(S)), 
                        fun() ->
                            map_stream(Proc, tail(S))
                        end)
    end.

filter_stream(F, S) ->
    case is_empty_stream(S) of
        true ->
            the_empty_stream();
        false ->
            case F(head(S)) of
                true ->
                    cons_stream(head(S), 
                                fun() ->
                                    filter_stream(F, tail(S))
                                end);
                false ->
                    filter_stream(F, tail(S))
            end
    end.

append_stream(S1, S2) ->
    case is_empty_stream(S1) of
        true ->
            S2;
        false ->
            cons_stream(head(S1), 
                        fun() ->
                            append_stream(tail(S1), S2)
                        end)
    end.

enumerate_tree({Left, Right}) ->
    append_stream(enumerate_tree(Left), enumerate_tree(Right));
enumerate_tree(N) ->
    cons_stream(N, 
                fun() ->
                    the_empty_stream()
                end).

enumerate_interval(Low, High) when Low > High ->
    the_empty_stream();
enumerate_interval(Low, High) ->
    cons_stream(Low, 
                fun() ->
                    enumerate_interval(Low + 1, High)
                end).

其他Stream相关的函数,例如acc_stream、flattern、flatmap等不用修改。

为了便于测试,在stream_delay模块中增加了几个整数流生成器函数和从列表转换为流的函数。

%% Stream Generators
integers(N) -> integers(1, N).

integers(Low, High) -> integers_inc(Low, High, 1).

integers_inc(Low, High, Inc) ->
    cons_stream(Low,
                fun() ->
                    if
                        Low < High ->
                            integers_inc(Low + Inc, High, Inc);
                        true ->
                            the_empty_stream()
                    end
                end).

list_to_stream([]) ->
    the_empty_stream();
list_to_stream([H|T]) ->
    cons_stream(H, 
                fun() ->
                    list_to_stream(T)
                end).

同样也是为了方便测试,在stream_delay模块中增加了collect_stream/1函数,将delay stream转换为List。否则每次输出只会给出一个解,单步执行下去才能找到所有解,如下图所示:

Stream delay map_stream step

%% Terminals
collect_stream(S) ->
    case is_empty_stream(S) of
        true -> [];
        false -> [head(S)|collect_stream(tail(S))]
    end.

测试代码如下。需要注意的是,虽然acc_stream、flatten、flatmap等函数本身代码不用修改,但测试代码还是要改的。

test_map_stream() ->
    [2, 4] = collect_stream(
                map_stream(fun(X) -> X * 2 end, list_to_stream([1, 2]))),
    test_map_stream_ok.

test_filter_stream() ->
    [2, 4] = collect_stream(
                filter_stream(fun(X) -> X rem 2 =:= 0 end, 
                              list_to_stream([1, 2, 3, 4]))),
    test_filter_stream_ok.

test_append_stream() ->
    [1, 2, 3, 4] = collect_stream(
                        append_stream(list_to_stream([1, 2]), 
                                      list_to_stream([3, 4]))),
    test_append_stream_ok.

test_enumerate_tree() ->
    [1, 2, 7, 19, 12, 14] = collect_stream(
                                enumerate_tree({{1, {2, 7}}, {19, {12, 14}}})),
    test_enumerate_tree_ok.

test_enumerate_interval() ->
    [1, 2, 3, 4] = collect_stream(enumerate_interval(1, 4)),
    test_enumerate_interval_ok.

test_acc_stream() ->
    10 = acc_stream(fun(X, Y) -> X + Y end, 0, list_to_stream([1, 2, 3, 4])),
    test_acc_stream_ok.

test_flatten() ->
    [1, 2, 3, 4, 5] = collect_stream(
                        flatten(list_to_stream([list_to_stream([1, 2]), 
                                                list_to_stream([3, 4, 5])]))),
    test_flatten_ok.

test_flatmap() ->
    [1, 2, 1, 2, 3] = collect_stream(
                        flatmap(fun(X) -> list_to_stream(lists:seq(1, X)) end, 
                                list_to_stream([2, 3]))),
    test_flatmap_ok.

7.16.2 Delay version of eight queens problem

只列出queen_delay相对于queen修改的代码:

-module(queen_delay).
-compile(export_all).

-import(stream_delay, [cons_stream/2, head/1, tail/1,
                       the_empty_stream/0, is_empty_stream/1,
                       map_stream/2, filter_stream/2, flatmap/2, 
                       enumerate_interval/2, collect_stream/1]).

把cons_stream()函数的第二个参数用fun wrap一下:

re_queen(N) -> fill_rows(N, N).

fill_rows(_Size, 0) ->
    cons_stream(the_empty_board(), fun() -> the_empty_stream() end);
fill_rows(Size, Row) ->
    filter_stream(
        fun is_safe/1,
        flatmap(
            fun(RestPos) ->
                map_stream(
                    fun(Col) ->
                        adjoin_position(Row, Col, RestPos)
                    end, 
                    enumerate_interval(1, Size))
            end, 
            fill_rows(Size, Row - 1))).

re_queen_2(N) -> fill_rows_2(N, N).

fill_rows_2(_Size, 0) ->
    cons_stream(the_empty_board(), fun() -> the_empty_stream() end);
fill_rows_2(Size, Row) ->
    filter_stream(
        fun is_safe/1,
        flatmap(
            fun(Col) ->
                map_stream(
                    fun(RestPos) -> 
                        adjoin_position(Row, Col, RestPos)
                    end, 
                    fill_rows_2(Size, Row - 1))
            end, 
            enumerate_interval(1, Size))).

queen_delay的测试代码也需要修改一下:

test_re_queen() ->
    92 = length(collect_stream(re_queen(8))),
    2 = length(collect_stream(re_queen(4))),
    0 = length(collect_stream(re_queen(3))),

    test_re_queen_ok.

test_re_queen_2() ->
    92 = length(collect_stream(re_queen_2(8))),
    2 = length(collect_stream(re_queen_2(4))),
    0 = length(collect_stream(re_queen_2(3))),

    test_re_queen_2_ok.

test() ->
    test_bt_queen(),
    test_bt_queen_2(),
    test_re_queen(),
    % test_re_queen_2(),
    test_ok.

需要注意的是,这个版本的re_queen_2压根算不出来,内存耗尽了。

Queen delay re_queen_2 result

7.16.3 Ask for element of stream

访问流中的元素除了head和tail之外,可以按需再提供几种方式。例如,nth_stream()和print_stream()。

  • nth_stream():访问流中的第N个元素,注意从1开始,跟Erlang一样。
  • print_stream():打印流。
nth_stream(Idx, S) ->
    case is_empty_stream(S) of
        true ->
            throw(no_this_idx);
        false ->
            if
                Idx =:= 1 ->
                    head(S);
                true ->
                    nth_stream(Idx - 1, tail(S))
            end
    end.

print_stream(S) ->
    case is_empty_stream(S) of
        true ->
            io:format("Done~n");
        false ->
            io:format("~p~n", [head(S)]),
            print_stream(tail(S))
    end.

测试代码:

test_nth_stream() ->
    S = list_to_stream([1, 2]),
    1 = nth_stream(1, S),
    2 = nth_stream(2, S),
    test_nth_stream_ok.

test_print_stream() ->
    S = list_to_stream([1, 2]),
    print_stream(S).

7.16.4 Procedures with stream

这部分程序和之前的是一样的,不需要修改。

%% Procedures with Stream
sum_odds_square(T) ->
    acc_stream(fun add/2, 0, 
               map_stream(fun square/1, 
                          filter_stream(fun is_odd/1, 
                                        enumerate_tree(T)))).

odd_fibs(N) ->
    acc_stream(fun({Idx, _E}, A) -> [Idx|A] end, [], 
               filter_stream(fun is_odd_idx/1, 
                             map_stream(fun fib_idx/1, 
                                        enumerate_interval(1, N)))).

is_odd(N) ->
    N rem 2 =/= 0.

fib(N) -> fib(N, 0, 1).

fib(0, A, _) -> A;  
fib(N, A, B) -> fib(N - 1, B, A + B).

square(N) -> N * N.

add(X, Y) -> X + Y.

is_odd_idx({_Idx, E}) ->
    is_odd(E).

fib_idx(Idx) ->
    {Idx, fib(Idx)}.

%% I J Pair Prime with Stream
prime_sum_pairs(N) ->
    filter_stream(
        fun({_, _, S}) -> 
            is_prime(S)
        end, 
        flatmap(
            fun(I) -> 
                map_stream(
                    fun(J) -> 
                        {I, J, I + J} 
                    end, 
                    enumerate_interval(1, I - 1)) 
            end, 
            enumerate_interval(1, N))).

is_prime(N) -> is_prime(2, N).

is_prime(I, N) when I >= N -> 
    true;
is_prime(I, N) ->
    if
        N rem I =:= 0 ->
            false;
        true ->
            is_prime(I + 1, N)
    end.

%% Embedded Flatmap
triples(N) ->
    flatmap(
        fun(I) -> 
            flatmap(
                fun(J) -> 
                    map_stream(fun(K) -> {I, J, K} end, enumerate_interval(1, J - 1)) 
                end, enumerate_interval(1, I - 1)) 
        end, enumerate_interval(1, N)).

这次second_prime()很快就有结果了。

%% Second Prime
second_prime() ->
    head(
        tail(
            filter_stream(
                fun is_prime/1,
                enumerate_interval(10000, 1000000)))).  

test_second_prime() ->
    10009 = second_prime(),
    test_second_prime_ok.

可以运行一下stream_delay:prime_sume_pairs/1、queen_delay:re_queen/1:

Exec prime sum pairs result

Exec re_queen result

需要注意的是,stream_delay:print_stream(Q)只截取了部分结果。

7.17 Infinite Stream

到目前为止看到的都是有限流,流的元素个数是有限的。但既然是流,流中的元素也可能是无限的。用List是无法定义出无限流的。正因为流跟List不一样,是按需的,我们才能定义出无限流。

%% Infinite Stream
integers_from(N) ->
    cons_stream(N, fun() -> integers_from(N + 1) end). 

-define(INT, integers_from(1)).    

no_sevens() ->
    filter_stream(
        fun(N) ->
            N rem 7 =/= 0
        end, ?INT).

test_integers_from() ->
    S = integers_from(2),
    2 = nth_stream(1, S),
    3 = nth_stream(2, S),

    1 = nth_stream(1, ?INT),
    2 = nth_stream(2, ?INT),

    test_integers_from_ok.

test_no_sevens() ->
    S = no_sevens(),
    1 = nth_stream(1, S),
    8 = nth_stream(7, S),
    16 = nth_stream(14, S),

    test_no_sevens_ok.

像这样的流的构造方式,也可以用下面这幅图来表示。

integers_from

半圆形的是流的构造子cons_stream。虚线表示流中起始的一些固定的元素,实线表示产生的流。integers_from定义了一个递归的数据结构,像递归过程一样含着自身。从黑盒(图中外层的integers_from)中出来的头元素是N,同时又对N做了“1+”操作,放到另外一个黑盒 (图中内层的integers_from)里,这个黑盒会输出一个流,把头元素和流组合起来,产生整个流。

最早画出这幅图的人是Peter Handson,就是Picture Language的发明人。这幅图可以帮助我们加深对流的理解。

7.18 Sieve of Eratosthenes

7.18.1 Problem

素数筛 —— 一个非常古老、朴素的算法。

给定所有从2开始的正整数(素数从2开始): 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 …

“筛”的步骤如下:

  1. 流的第一个元素是2,是一个素数,在2后面所有的正整数中去除2的倍数,产生一个新的流,下划线表示去除,如下:

    2 3 _4 5 _6 7 _8 9 _10 11 _12 13 _14 15 _16 17 _18 19 _20 21 _22 23 _24 25 _26 27 _28 29 …

  2. 在前一步产生的流的基础上继续,流的头元素是3,是一个素数,在3后面所有的正整数中去除3的倍数,产生一个新的流,如下:

    2

    3 5 7 _9 11 13 _15 17 19 _21 23 25 _27 29 …

  3. 在前一步产生的流的基础上继续,流的头元素是5,是一个素数,在5后面所有的正整数中去除5的倍数,产生一个新的流,如下:

    2 3

    5 7 11 13 17 19 23 _25 29 …

  4. 在前一步产生的流的基础上继续,流的第四个元素是7,是一个素数,在7后面所有的正整数中去除7的倍数,产生一个新的流,如下:

    2 3 5

    7 11 13 17 19 23 29 … …… 这样最终产生的是一个素数流。

7.18.2 Implementation

%% Sieve of Eratosthenes
primes() ->
    sieve(integers_from(2)).

sieve(S) ->
    cons_stream(head(S),
                fun() ->
                    sieve(
                        filter_stream(
                            fun(N) ->
                                N rem head(S) =/= 0
                            end,
                            tail(S)))
                end).

test_primes() ->
    Ps = primes(),
    2 = nth_stream(1, Ps),
    3 = nth_stream(2, Ps),
    5 = nth_stream(3, Ps),
    23 = nth_stream(9, Ps),
    29 = nth_stream(10, Ps),
    71 = nth_stream(20, Ps),

    test_primes_ok.

写素数筛,对流有什么要求?head(S)一定是一个素数,也就是说,下面再去调用的时候要保证返回来的流的第一个元素也一定是素数。这样才能满足,把第一个元素取出来以后,把该元素的倍数划掉以后,剩下的流的第一个元素肯定是一个素数。

从2开始,S的第一个元素是素数,const_stream的一个参数是head(S),第二个参数中的fun()/end是一个delay运算,filter_stream把刚才tail(S)中head(S)的倍数去掉,产生从3开始的流,它只能表明从3开始的流中3是一个在2后面的素数,不能表示从3开始都是素数,所以还要再sieve一下。

运行结果如下:

Infinite stream primes result 1

如果用print_stream去打印素数流,因为流是无限的,所以如果不强行终止,会一直打印下去。因此写了一个print_stream_limit,可以限制一下打印的元素个数。

print_stream_limit(0, _S) ->
    io:format("Done limited~n");
print_stream_limit(N, S) ->
    case is_empty_stream(S) of
        true ->
            io:format("Done completely~n");
        false ->
            io:format("~p~n", [head(S)]),
            print_stream_limit(N - 1, tail(S))
    end.

运行结果如下:

Infinite stream primes result 2

print_stream_limit还是不方便写单元测试用例,写了一个collect_stream_limit,可以从无限流中输出有限的元素到列表中。

collect_stream_limit(0, _S) -> [];
collect_stream_limit(Num, S) ->
    [head(S)|collect_stream_limit(Num - 1, tail(S))].

test_primes() ->
    Ps = primes(),
    2 = nth_stream(1, Ps),
    3 = nth_stream(2, Ps),
    5 = nth_stream(3, Ps),
    23 = nth_stream(9, Ps),
    29 = nth_stream(10, Ps),
    71 = nth_stream(20, Ps),

    Ps2 = primes(),
    [2, 3, 5, 7, 11, 13, 17, 19, 23, 29] = collect_stream_limit(10, Ps2),
    
    test_primes_ok.

7.18.3 Picture of sieve

Picture of sieve

我们是在定义一个素数流,它应该像是一个数据结构,但没想到在这样的数据结构中还可以递归地定义数据结构,我们只想过递归的过程、递归函数。这个素数流不断地递归的定义自己产生自己。

左边半圆形的是反方向的cons_stream,它把流S中的两个部分head和tail取出来。Head就是最终输出流的头元素,用虚线表示。虚线表示单个的元素,实线表示流。tail是一个流,用实线表示,head要进入filter_stream,tail也要进入filter_stream,tail经过filter_stream过滤一下,过滤条件用到了head元素。过滤后的流再经过sieve筛一下,这个sieve中又是图中这样的一个过程。

7.18.4 Execution of sieve

看一下sieve的执行过程,体会一下表面上程序的组织顺序跟在计算机中执行顺序的差别。下面的代码不是能执行的程序,只是用来描述执行顺序。为了描述简洁,去掉了“fun() -> end”之类的字符。

最开始,如下图所示。

% 2 3 4 5 6 7 8 9 10 
% 11 12 13 14 15 16 17 18 19 20 
% 21 22 23 24 25 26 27 28 29 …
sieve(integers_from(2))

调用sieve(),因此用sieve()的函数体整体替换,其中tail(S)就是integers_from(3),因此用cons_stream(3, integers_from(4))替换。

cons_stream(
    2,
    sieve(
        filter_stream(
            N rem 2 =/= 0,
            cons_stream(
                3,
                integers_from(4)))))

把上图中sieve()中的内容做一个变换。“cons_stream(3”挪到上面了。

cons_stream(
    2,
    sieve(
        cons_stream(
            3,
            filter_stream(
                N rem 2 =/= 0,
                integers_from(4)))))

将上图中的sieve()调用,用sieve()的函数体整体替换。替换后的sieve()中,在过滤掉2的倍数基础上还要过滤掉3的倍数。

cons_stream(
    2,
    cons_stream(
        3,
        sieve(
            filter_stream(
                N rem 3 =/= 0,
                filter_stream(
                    N rem 2 =/= 0,
                    integers_from(4))))))

将上图中的“filter_stream(N rem 2 =/= 0, integers_from(4))”计算一下,就是cons_stream(5, filter_stream(N rem 2 =/= 0, intergers_from(6)))。

cons_stream(
    2,
    cons_stream(
        3,
        sieve(
            filter_stream(
                N rem 3 =/= 0,
                cons_stream(
                    5,
                    filter_stream(
                        N rem 2 =/= 0,
                        integers_from(6)))))))

把上图中sieve()中的内容做一个变换。“cons_stream(5”挪到上面了。

cons_stream(
    2,
    cons_stream(
        3,
        sieve(
            cons_stream(
                5,
                filter_stream(
                    N rem 3 =/= 0,
                        filter_stream(
                            N rem 2 =/= 0,
                            integers_from(6)))))))

将上图中的sieve()调用,用sieve()的函数体整体替换。替换后的sieve()中,在过滤掉2和3的倍数基础上还要过滤掉5的倍数。

cons_stream(
    2,
    cons_stream(
        3,
        cons_stream(
            5,
            sieve(
                filter_stream(
                    N rem 5 =/= 0,
                    filter_stream(
                        N rem 3 =/= 0,
                            filter_stream(
                                N rem 2 =/= 0,
                                integers_from(6))))))))

在这个过程中,素数筛不断往下叠加的过程中,后面的素数要求过滤掉前面所有素数的倍数。

7.19 Phase Summary

稍微回顾一下前面的内容。有两种看待世界的方式,或者建模的方式。

一个是OO的视角。我们把自己当成世界的一部分,或者说,我们把要去建模的现实世界看成是由各个独立的部分组成的,每个部分都有自己的独立可变的局部的状态,通过状态的改变、消息传递来进行交互协作。

另一个是流的视角。在OO中,我们把时间单独提出来,不从整体看,分割成一个一个部分。在流处理中,我们把它作为一个整体,放到整个空间来考虑,在空间变换、在流上的变换操作。

讨论这两种不同的建模角度,主要是引发大家的思考,探讨在OO中把对象纳入到计算机的必要性。把对象纳入计算机,让模型随着时间变化,就把时间引入进来了,但是不是一定需要这样做?探讨是不是在建模的时候一定要去写那种让状态随着时间发生变化的计算过程。可以看到,不管是做八皇后还是素数筛,都探讨了一个方式,可以将程序在表面上的组织顺序跟它实际在计算机中真正执行时的顺序分离。如果能把这两个解耦的话,可以认为时间已经完全隐含其中,对这个模型来说,时间不再是独立的、外在的、随之变化的东西。

当我们把两个顺序解耦了,在编程的时候,我们拥有了非常强大的能力。有了很多概念上清晰的算子,可以随意组合混搭,帮我们应对很多复杂性。如果现在把时间引入进来,时间刻画了一个个状态,状态会发生变化,副作用有了,简单的替换模型都失效了,要引入复杂的环境模型。把时间引入进来以后,在编程中要解决现实世界中最大的一个问题是事件顺序和同步的问题。

解决事件顺序和同步的问题会用到很多技术,例如锁。解决事件顺序和同步问题,会带来一堆啰嗦复杂的问题要解决,导致的最大的问题是程序的组合性变差了,对复杂世界建模的能力就变差了。

为什么会把时间引入进来?引入赋值的初衷是为了把状态刻画出来,把状态刻画出来是为了能做可计算对象,能够让对象之间交互,能建模。但把这些东西都引入进来后,引入了很复杂的同步问题要解决,解决同步问题使得程序的组合能力变差。本来引入它是想去建模复杂世界,但引入它带来的复杂问题又降低了应对复杂问题的能力,这是一个两难的问题。这是OO要面临的问题。

⚠️ **GitHub.com Fallback** ⚠️