201903 Distributed Fault Tolerant System Design 学习笔记 (3) Bully Algorithm - xiaoxianfaye/Learning GitHub Wiki

1 Bully Algorithm

详见 https://en.m.wikipedia.org/wiki/Bully_algorithm

Bully算法是一个Leader选举算法。在分布式计算里,Bully算法是一个很有名、经常用的算法。一个网卡有很多网口,它们之间竞争就是Bully算法选举。

在分布式系统里,Total Order很难达成,如何达成呢?可以通过一种算法选出一个Leader,所有操作都由Leader一个人来完成,就有顺序了。这是在分布式系统里非常常见的一种保证有序的方式。

1.1 What is Bully Algorithm?

Bully算法是一个Leader选举算法。

bully algorithm

Bully算法的思想很简单。有一组process,每个process有一个ID,ID之间可以比较,永远选最高的那个当Leader。

1.2 Assumptions

要理解清楚一个算法,必须要有假设,也就是算法的适用性,否则就没有完全理解,为何这样设计呢?换个地方还能不能用呢?

bully algorithm assumptions

  • 同步系统。限定了这个算法如果在真实的分布式系统里必须要改造,否则没法用。
  • 失效模型。进程任何点上都会失败。但不会篡改,要么死、要么重启。不会说,没死又发假消息,不是拜占庭问题。
  • 有一个失败检测器,检测失败进程。我有个东西,进程死了,我是知道的。
  • 消息是可靠传输的,不会丢。
  • 每个进程都知道自己的id和地址,也知道别人的,能发消息。

这些都是假设,在这些假设下,算法才有意义。

1.3 Algorithm

bully algorithm description

协议里有三种消息:

  • 选举消息(Election)
  • 应答消息(Answer)
  • 胜利消息(Victory)
  1. 一个进程从crash中重启或刚刚启动,如果它发现自己ID是最高的,因为它知道所有的ID,它就直接给其他所有进程发一个胜利消息,我就是Leader。选举结束。否则,它就给所有ID比它高的进程发一个选举消息。
  2. 如果一个进程发完了选举消息以后,在一段时间内没有收到任何应答消息,那我就胜利了,我就给所有进程发一个胜利消息,我就是Leader。可以做这样的判断,因为假设同步,认为大家都会回我,消息可靠传输,要么你死了,要么就是你活着没回我。
  3. 如果一个进程收到一个选举消息,而且发选举消息的进程ID比我低,就回一个应答消息。如果一个进程发完选举消息以后得到一个应答,ID肯定比我的大,那我就什么也不做,等胜利消息。
  4. 有两种状态,选举态和正常工作态。如果一个进程是在正常工作的情况下,我收到一个选举消息,发现ID比我的低,我就回一个应答,然后再发起选举。如果进程已经在选举态,就把消息忽略掉,不会再发起新的选举。
  5. 如果一个进程收到一个胜利消息,就把胜利消息的发送者当作Leader,就结束了。

算法本身是很简单的,但要证明它的正确性,要下点功夫的。它是不是一定能结束(Liveness属性)、是不是一定不会出现错误结果(Safety属性),可以用TLA+来写。

bully algorithm description 2

看一下示意图。假设7是Leader,4给ID比自己高的5、6、7发Election,7死了,5、6的ID比4大,都给4回Answser,5发现收到的Election的ID比我小,又开始选举,发Election给ID比自己高的6和7,6给5会Answer,7已经死了不会给5回Answer,6超时胜利。

如果7重启了,6已经是Leader了,根据协议,7发现自己最大,直接给所有人发Victory。

Bully算法的缺点就是如果在一个不稳定的网络里,Leader会不停地抖动。如果7是因为负载高失效,6负载也高,7活过来就抢6的Leader,6抢7的,互相抢。如果是这种场景就不要用这种算法。

稍微改造一下就可以用在异步环境里了,甚至可以容忍分区。现在很多商业软件都是基于这个改造的。

2 Implementation

用Erlang实现这个算法非常直白,协议理解了,表达起来就很顺畅。

Bully算法中的进程可以是一个服务,可以是一个节点,这里用Erlang进程来表达Bully算法中的进程。

-module(bully).
-compile(export_all).

start(I, N) ->
    Pid = spawn(fun() -> init(I, N) end),
    Pid ! start.

stop(I) ->
    send_to([I], stop).

send_to(ProcNos, Msg) ->
    [case whereis(proc_name(I)) of
        undefined ->
            pass;
        Pid ->
            Pid ! Msg
     end || I <- ProcNos].

proc_name(I) ->
    list_to_atom("p_" ++ integer_to_list(I)).

init(I, N) ->
    io:format("process ~p started~n", [I]),
    register(proc_name(I), self()),
    idle(I, N).

idle(I, N) ->
    receive
        start when I =:= N ->
            io:format("process ~p is leader~n", [I]),
            send_to(lists:seq(0, N - 1), {victory, self()}),
            working(I, N, self());
        start when I < N ->
            send_to(lists:seq(I + 1, N), {election, I, self()}),
            erlang:send_after(2000, self(), waiting_answers_timeout),
            election(I, N, 0)
    end.

election(I, N, Answers) ->
    receive
        waiting_answers_timeout when Answers =:= 0 ->
            io:format("process ~p is leader~n", [I]),
            send_to(lists:seq(0, N) -- [I], {victory, self()}),
            working(I, N, self());
        {answer, ProcID} when ProcID > I ->
            erlang:send_after(5000, self(), waiting_victory_timeout),
            waiting(I, N);
        {victory, NewLeader} ->
            erlang:monitor(process, NewLeader),
            working(I, N, NewLeader);
        {election, ProcNo, Pid} when ProcNo < I ->
            Pid ! {answer, I},
            election(I, N, Answers)
    end.

waiting(I, N) ->
    receive
        {victory, NewLeader} ->
            erlang:monitor(process, NewLeader),
            working(I, N, NewLeader);
        waiting_victory_timeout ->
            self() ! start,
            idle(I, N);
        {election, ProcNo, Pid} when ProcNo < I ->
            Pid ! {answer, I},
            waiting(I, N);
        {answer, ProcID} when ProcID > I ->
            waiting(I, N)
    end.

working(I, N, Leader) ->
    receive
        {election, _ProcNo, Pid} when I =:= N ->
            Pid ! {answer, I},
            working(I, N, Leader);
        {election, ProcNo, Pid} when ProcNo < I ->
            Pid ! {answer, I},
            self() ! start,
            idle(I, N);
        {victory, NewLeader} when NewLeader =/= Leader ->
            erlang:monitor(process, NewLeader),
            working(I, N, NewLeader);
        {'DOWN', _, process, Leader, _} ->
            io:format("process ~p found leader down~n", [I]),
            self() ! start,
            idle(I, N);
        stop ->
            io:format("process ~p stopped~n", [I]),
            stop;
        _ ->
            working(I, N, Leader)
    end.

test(N) ->
    [start(I, N) || I <- lists:seq(0, N)],
    ok.
  • start(I, N):start一个进程,I是第几号进程,N是总共几个进程。
  • stop(I):给某一个进程发stop消息。
  • send_to(ProcNos, Msg):给一组进程发一个消息。whereis是根据进程名找Pid,进程死了注册就没有了,如果找不到发消息会挂,从安全性考虑。
  • proc_name(I):把进程号变成atom,才能注册。
  • init(I, N):打印第几个进程启动起来了,接着注册自己的名字,注册名字的目的在于别人可以给它发消息,不用传Pid了,然后进入idle状态。

现在开始是针对每一个进程:

  • idle(I, N):一个进程在idle状态收到start消息,可以认为进程重启了。进程刚刚启动或者Crash之后重启,收到start消息。

    • I =:= N:如果我是最大的,直接给除我之外的其他进程(0~N-1)发victory消息,把我的Pid带上,别人知道我是Leader。这是协议的一部分。然后进入working状态,就不用再选了。
    • I < N:如果I小于N,证明我不是最大的,根据协议,给所有比我大的进程(I+1~N)发election消息,带上我的号和Pid。然后起一个定时器,等answer。send_after()第一个参数是等多长时间,第二个参数是消息发给谁,第三个参数是发什么消息。2秒钟以后给我发一个waiting_answers_timeout消息。然后进入election状态。election()的第三个参数是收到answer的计数。
  • election(I, N, Answers):发送election消息以后进入election状态。

    • waiting_answers_timeout:超时了,判断一下Answers等于0,证明没有一个人回answer,那我就是Leader,给除我之外的所有其他进程发victory消息。然后进入working状态。
    • answer:如果有人给我回answer,我就啥也不干,等victory消息。如果victory消息等不到,就要重新发起,不然会死锁。如果没有重新发起的过程,死锁的情况经常会发生。假设0、1、2三个进程,2是Leader,2死了之后1当选,1已经给0回了answer,但还没有回victory的时候,1死了,0就只能永远等着了,所以这里需要保证Liveness。如果answer来了,我啥也不干,等victory消息,超时等不到就重新来,保证不会在这里死等。
    • victory:收到victory结束,监控新Leader,选举结束进入working状态。
    • election:如果有人给我发了election,号比我小,你就歇着吧,你肯定当不了Leader。
  • waiting(I, N):

    • victory:waiting状态就是等victory消息,等到了就monitor新Leader,然后进入working状态。
    • waiting_victory_timeout:如果超时等不到victory消息,就重来,重新发起选举。等同于把进程杀了重启,因为进程也没什么数据。
    • election:如果有人给我发了election,号比我小,回answer。
    • answer:其他情况下不动,因为answer我已经收到过了,再收到answer不理就行了。
  • working(I, N, Leader):

    • election when I =:= N:working状态下收到election,发现我就是最大的,不用折腾,回一个answer选举就结束了。
    • election when ProcNo < I:working状态下收到election,我不是最大,要回一个answer,因为它比我小,然后发起选举。
    • victory:收到victory消息,发现Leader变化了,monitor新的Leader。
    • 'DOWN':如果Leader down了,重新选举。发start消息进入idle状态就是重新发起选举。
    • stop:为了实验,杀死看看会不会生成新的Leader。
    • _:其他消息不处理,可能是非选举协议的消息,working即可。
  • test(N):起N个process,起来之后就可以做实验了。

1> c(bully).
{ok,bully}
2> bully:test(5).
process 0 started
process 1 started
process 2 started
process 3 started
process 4 started
process 5 started
process 5 is leader
ok
3> bully:stop(5).
process 5 stopped
process 0 found leader down
process 2 found leader down
process 1 found leader down
process 4 found leader down
process 3 found leader down
[stop]
4> process 4 is leader
4> bully:stop(4).
process 4 stopped
process 0 found leader down
process 2 found leader down
process 3 found leader down
process 1 found leader down
[stop]
5> process 3 is leader
5> bully:start(5, 5).
process 5 started
process 5 is leader
start
6>
  • 起5个进程,跑完之后选出一个Leader 5。
  • stop 5,选出新Leader 4。
  • stop 4,选出新Leader 3。
  • 重新启动5,5成为Leader。
⚠️ **GitHub.com Fallback** ⚠️