zookeeper 研究 - 823126028/book_reader GitHub Wiki

ZAB 协议:

重要组成部分:

  • logic clock 逻辑时钟 用来记录选举周期。
  • epoch,加最终日志id 组成zid 判断大小。

选举过程:

  • looking 状态搜寻选票,增加logic clock , 如果有选票,判断票箱中的数据。
  • 判断收到的选票,大于自己丢弃所有选票项,如果小于自己忽略。 等于自己根据zid判断是否应该把票投给发送方。
  • 根据选票进行判断大于自己,记录下来,重新广播自己的投票。否则,发送自己选票出去。
  • 大于 1/2 选票数目进行退出,等待指定时间进入选举同步阶段。
  • 如果是其他状态,看是否有1/2投票出来是主,退出,认为主

Zookeeper 服务发现功能

  • zookeeper 是提供一个基于k,v 结构的分布式存储系统
  • 附带提供一个实现相对较好的服务发现系统。

服务发现的重要支撑

  • 服务端和客户端的事件通知机制 :

    1.watcher 事件触发器。

  • 服务注册功能的实现:

    1.客户端的心跳touch,failover机制。

    2.创建临时节点EPHEMERAL。

    3.zookeeper server, time wheel 定时器来监控客户端session timeout。

zookeeper server 数据在内存中基本数据结构:

1. DataNode: 基本数据节点。
    public class DataNode implements Record {
        DataNode parent;    //父节点,当数据被删除时候,如果父节点关注了,会通知
        byte data[];        //存放的数据
        Long acl;           //acl
        StatPersisted stat; //如果在DB中所需要记录的一些信息,例如数据修改version,修改时间,创建时间,zxid事务id等,///如果是临时节点还会记录拥有着的sessionId
        private Set<String> children = null;  //孩子节点的key   
    }
    
2. DataTree: 存放所有数据的数据结构

    public class DataTree {
        /**
        * This hashtable provides a fast lookup to the datanodes. The tree is the
        * source of truth and is where all the locking occurs
        */
        private final ConcurrentHashMap<String, DataNode> nodes =
            new ConcurrentHashMap<String, DataNode>();

        /**
         * 
         * 对于数据修改关注的watcher的管理类
         */
        private final WatchManager dataWatches = new WatchManager();
    
        /**
         * 
         * 对于子节点增加删除感兴趣的watcher的管理类
         */
        private final WatchManager childWatches = new WatchManager();
    }
    

zookeeper如何感知到有服务注册和服务取消


  • 初始化数据并创建与服务端连接。:调用client 包中的API。
    /** connectString: 选择连接的服务端列表,尝试连上任意一个服务器  */
    /** sessionTimeout command timeout 到了会断开连接进入重新连接服务器阶段 */
    /** watcher 默认的监视器,默认是解析connectString/xxx 后面的XXX*/
    /** 连接的server 在 split-brain的情况下是否提供只读服务,还是选择断开*/
    
	public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        //1.赋值watcher.
        
        /** 2.初始化server端的列表,用于重连用
         HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        */
       
        //3 . 创建一个客户端连接的session
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }
    
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        /**
            省去一段初始化一些基本配置的代码
        */
        readOnly = canBeReadOnly;
        //通讯包发送类。
        sendThread = new SendThread(clientCnxnSocket);
        //回调消息处理类
        eventThread = new EventThread();

    }
} 
    
  • 客户端和服务端的消息通讯。

    客户端的连接始终保持着connecting 和 connected两种状态。永远在尝试着去连到服务端。

    
    SendThread 消息发送线程逻辑。
    
    @Override
        public void run() {
           ....
           ....
           
            while (state.isAlive()) {
                try {
                    if (!clientCnxnSocket.isConnected()) {
                       ....
                       ....
                       startConnect();
                    }

                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        .......
                    }
                    //判断是否需要发送心跳包,防止服务端清掉客户端的session
                    if (state.isConnected()) {
                        ...
                        int timeToNextPing = readTimeout / 2
                                - clientCnxnSocket.getIdleSend();
                        if (timeToNextPing <= 0) {
                            sendPing();
                       
                       }
                       ...
                    }
            
                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                            //计算访问时间,定时尝试访问
                            pingRwServer();
                     
                    }
                    //进入NIO 的 消息任务发送处理类(该thread 内部有一个nio 的类,做消息处理和消息接收)
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                        //发生任何IO错误,关闭连接,向自己的eventThread发送disConnected消息
                        cleanup();
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                       ....
                    }
                }
            }
            /**   服务关闭逻辑处理清理=======*/
        }
        
    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
    /**
        处理NIO 消息的方法。outgoingQueue,所有的向外发出的消息包都存放在outgoingQueue中。
        除心跳,服务端wacther触发命令(包体的头命令号来标志)等一些包除外的其他客户端请求都是同步的,都是用pendingQueue来一一对应发回的包。在这里也会把一些服务端发回来的wacther触发命令解析出来分发给wacther。
    */
         
    eventThread是消息处理类,用于处理服务端watcher的回调。

  • 客户端和服务端的通信,以及创建临时节点。

    1. 服务端所有与客户端通信,都是封装成包体格式,serialize后,发送到服务端进行通信(jute格式 类似于java serialize 跨语言)。

以创建节点为例:


  public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException{
       
        RequestHeader h = new RequestHeader();
        /**
                利用参数组合RequestHeader
        */
        
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
    }
    
    
    public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        //将封装的包体数据放到outgoing队列中准备发送
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        //packet 会被存放在pendingQueue 和 outgoingQueue 中,当消息正常从服务器中返回时候,会signal 指定任务
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }
    
    

服务端的启动,信息处理,session管理(服务端分为单机版和分布式版):

单击版服务端启动:ZooKeeperServerMain:

1.读取配置文件

2.恢复文件DB的数据(增量log + 全量dump snapshot)

3.将数据载入内存

4.初始化服务器端口号。

5.建立sessionTracker session管理器。// startSessionTracker();

6.初始化责任链处理器。setupRequestProcessors();


###SessionTracker的主要功能:

1.创建session: 生成sessionid。

2.利用time wheel算法清理过期session

时间轮

定时轮的工作原理可以类比于始终,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。 这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间) 以及 timeUnit(时间单位)。主要解决了当连接数很多的时候,每次tick(往往服务发现tick的延迟不能太长)检查心跳会很耗时。

    sessionTracker的时间轮实现算法
    
    1 .不断循环检查当前时间轮,如果到期那么直接清除该轮的所有session。
    2. 如果有心跳过来,移动session去其他的轮子里。
    
    @Override
    synchronized public void run() {
        try {
            while (running) {
                currentTime = System.currentTimeMillis();
                if (nextExpirationTime > currentTime) {
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
                SessionSet set;
                set = sessionSets.remove(nextExpirationTime);
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        setSessionClosing(s.sessionId);
                        expirer.expire(s);
                        
                        /**
                            最终调用的逻辑是expire
                            private void close(long sessionId) {
                                    submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
                            }
                        */
    
                    }
                }
                nextExpirationTime += expirationInterval;
            }
        } 
    }

    synchronized public boolean touchSession(long sessionId, int timeout) {
    
        SessionImpl s = sessionsById.get(sessionId);
       
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
        
        if (s.tickTime >= expireTime) {
            return true;
        }
        
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
            set.sessions.remove(s);
        }
        s.tickTime = expireTime;
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);
        return true;
    }

###责任链的处理流程

由NIO Thread 解析网络中传来的包体,解析成Requset类调用责任链来解析包。

单机版责任链代码:

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }
    

    

单机版时序

  • PrepRequestProcessor做一些预处理工作,例如临时节点不能有孩子的检查,如果是更新操作,产生一个事务作为处理结果。执行一个操作并产生一条事务的数据放在Request的数据结构中,只有会对node做update or create 才会产生一个事务,读操作不会产生事务,也不会记日志。

  • SyncRequestProcessor。它负责持久化事务到磁盘上。本质上是把事务按照顺序append到事务日志上,并每隔一段时间产生快照。

-FinalRequestProcessor。真正的处理类,如果是读数据正常返回数据(当然也会注册watcher),如果是写数据也会操作DataTree

preRequest 中的刚才closeSession的相关代码:

  主要流程:
        1.获取被删除的session 所拥有的临时节点。
        2.产生删除临时节点事件,放入到finalRequest 等待删除
        3.清理session
        
  case OpCode.closeSession:
                
                HashSet<String> es = zks.getZKDatabase()
                        .getEphemerals(request.sessionId);
                synchronized (zks.outstandingChanges) {
                    for (ChangeRecord c : zks.outstandingChanges) {
                        if (c.stat == null) {
                            es.remove(c.path);
                        } else if (c.stat.getEphemeralOwner() == request.sessionId) {
                            es.add(c.path);
                        }
                    }
                    for (String path2Delete : es) {
                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                path2Delete, null, 0, null));
                    }

                    zks.sessionTracker.setSessionClosing(request.sessionId);

finalRequstProcessor的部分代码

    public ProcessTxnResult processTxn(TxnHeader header, Record txn)
    {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
                case OpCode.create:
                    CreateTxn createTxn = (CreateTxn) txn;
                    rc.path = createTxn.getPath();
                    createNode(
                            createTxn.getPath(),
                            createTxn.getData(),
                            createTxn.getAcl(),
                            createTxn.getEphemeral() ? header.getClientId() : 0,
                            createTxn.getParentCVersion(),
                            header.getZxid(), header.getTime());
                    break;
                case OpCode.delete:
                    DeleteTxn deleteTxn = (DeleteTxn) txn;
                    rc.path = deleteTxn.getPath();
                    deleteNode(deleteTxn.getPath(), header.getZxid());
                    break;
                ....
                ....
                ....
                
    1.初始化节点信息。
    2.修改 TreeData 。
    3.建立与父节点的映射关系 。
    4.如果是临时节点建立与session的绑定关系。
    5.触发触发器,通知客户端。
    
    public String createNode(String path, byte data[], List<ACL> acl,
            long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            Set<String> children = parent.getChildren();
            if (children != null) {
                if (children.contains(childName)) {
                    throw new KeeperException.NodeExistsException();
                }
            }
            
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }    
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = convertAcls(acl);
            DataNode child = new DataNode(parent, data, longval, stat);
            parent.addChild(childName);
            nodes.put(path, child);
            if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
        }
 
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }

                
                

Watcher 监视器实现机制

  • ZooKeeper通过调用指定函数(getData,exsist,getChilden) 向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件,由客户端做一些回调处理。

监视器的组成部分

  1. 客户端 eventThread 不断处理自身(node ACL ERROR,node disconnect, connecting) 和 服务端发回由sendThread(nio解析的任务) queueEvent。

  2. 客户端 WatchManager:客户端会根据不同的方法,初始化不同的watcher,到wactherManager中.

  3. 服务端 ZooKeeper WatcherManager,服务端根据TreeNode 的变化触发消息,来回调客户端的watcher请求。


主要流程:

  1. 客户端在向 ZooKeeper 服务器注册 Watcher

  2. 成功之后会将 Watcher 对象存储在客户端的 WatchManager 中。

  3. 当 ZooKeeper 服务器端触发 Watcher 事件后,会向客户端发送通知

  4. 客户端IO读写线程解析命令生成event从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。

服务端注册wacher:

    服务端wactherManager:
    
    // 当使用方法 exsist 和 getData 注册时候会放入 dataWactcherManager 中。
    private final WatchManager dataWatches = new WatchManager();
    
    // 当使用 getChildren 注册时候 会放入 childWachersManager中
    private final WatchManager childWatches = new WatchManager();
    
    public class WatchManager{
    
            private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

            private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();
    }

客户端注册wacher

客户端wacherManager 中 包含 exsitWacherMap, getDataWacherMap, getChildrenWacherMap 根据服务端不同事件进行触发

public void register(int rc) {
    if (shouldAddWatch(rc)) {
            Map<String, Set<Watcher>> watches = getWatches(rc);
            synchronized(watches) {
                Set<Watcher> watchers = watches.get(clientPath);
                if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                }
                watchers.add(watcher);
            }
    }
 }

当服务端删除或者创建node的时候调用triggerWatcher

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);
            
        //将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个 WatchedEvent 对象
        HashSet<Watcher> watchers;
        
        synchronized (this) {
        // 根据数据节点的节点路径从 watchTable 里面取出对应的 Watcher, 同时会直接从 watchTable 和 watch2Paths 里删除 // Watcher,即 Watcher 是一次性的,触发一次就失效了。
        watchers = watchTable.remove(path);
        
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
        
            ..........
        }
         
        // 所以这里调用的 process 方法实质上是 ServerCnxn 的对应方法
        
        w.process(e);
    }
    return watchers;
}

客户端处理wacher消息

class EventThread extends ZooKeeperThread {

       public void queueEvent(WatchedEvent event) {
               if (event.getType() == EventType.None
                       && sessionState == event.getState()) {
               return;
           }
            sessionState = event.getState();
           
            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
            watcher.materialize(event.getState(), event.getType(),
            event.getPath()),
            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
           }
       }
       
       public void run() {
               try {
                   isRunning = true;
                   while (true) {
                           Object event = waitingEvents.take();
                           if (event == eventOfDeath) {
                                   wasKilled = true;
                           } else {
                                   processEvent(event);
                           }
                       if (wasKilled){
                               synchronized (waitingEvents) {
                               if (waitingEvents.isEmpty()) {
                               isRunning = false;
                               break;
                       }
                   }
       }
       

客户端在识别出事件类型 EventType 之后,会从相应的 Watcher 存储中删除对应的 Watcher,获取到相关的 Watcher 之后,会将其放入 waitingEvents 队列,eventThread会处理waitingEvents里的内容。


作为服务发现 与 Tair Config Server的对比:

  • 相同点:
  1. 都是以心跳来完成对服务是否下线的监控。

  2. 版本号判断数据是否修改。

  • 不同点:
  1. 可用性的对比,config server 采用主备切换,而zookeeper 必须经过类似paxos算法重新选出主来提供服务。在服务发现这种宁可数据错误而不能不提供服务的场景,zookeeper保证一致性的行为降低了扩展性。(当没有主存在的情况下 服务只能提供读请求)

作为配置中心与 diamond的区别。

  1. 连接方式 HTTP 长轮训 和 TCP方式。

  2. 中心化 与 去中心化。

  3. (diamond 在各种情况下都能有最低降级策略),zookeeper 选不出主基本不能提供服务。

ZK 启动

##zookeeper 单机数据库启动:

  • zookeeper DB 文件分为: snapshot 文件 和 增量日志文件。snapshot 文件的文件名是以tran_id组成的。
  • loadDB过程:搜寻最大的snapshot_id,并且crc校验字符是否完成。然后搜取对应的增量文件对应最大小于snapshot_id的记录id,来恢复数据。
  • 同步过程: 初始化DB的时候,将snapshot 数据放入 commitLog 内存,commit Log 参数,low_index,high_index。如果在low和high之前的从机的同步应该使用diff方式。如果不在这之间的就要使用全量复制。

Zookeeper Session 管理 和 临时节点处理


上周五,在部门内部交流各种分布式锁的优缺点时,有同学提出zookeeper在发生集群内部网络分裂时会发生一致性错误。由于集群脑裂,出现了对共享资源的争抢,导致分布式锁并没有起到作用。 虽然大家都知道复杂的ZK是一致性存储方面不会出现低级错误,即使是一瞬间,旧的leader并没有意识到自己不再是leader的情况下也不应该出现DataNode(EphemeralNode)不一致的情况。 那么为什么有些同学会认为master失联会引发数据不一致呢? 我猜测应该是有些同学认为 client 连接的 follower 与 master 失去连接之后,new master会去除所有相关client 创建的Ephemeral Node, 导致new master可以创建 相关的EphemeralNode 而出现不一致? 虽然情况不能一一列举,但是我们捋一捋 session 和 ephemeralNode 建立时的流程,应该能举一反三去解决疑惑。

脑裂

既然提出了网络分裂,那我们就要讲讲脑裂的定义和影响。

  • 脑裂:由于某些节点的失效,部分节点的网络连接会断开,并形成一个与原集群一样名字的集群,这种情况称为集群脑裂(split-brain)现象。这个问题非常危险,因为两个新形成的集群会同时索引和修改集群的数据。

现在的各个开源分布式系统相对比较成熟,由于笔者经验欠缺,没有在真实系统中遇见脑裂的现象,所以只能自己编造一个可能产生脑裂的系统: Alt text

上面是笔者编造的一个HA的文件修改系统,master 负责定时向file.a写信息,slave 监控master状态,如果master失败,slave接过master的枪。这个时候如果master 和 slave 之间的网路出现故障,而master并没有宕机,slave也开始对file.a进行写操作,这个时候就会出现文件的错乱。

zk 集群网络故障会发生什么?

由于 zookeeper 由 quorum 机制进行选主,所以即使原来的 group 分裂成两部分,leader 也有且最多一个。 那会不会有一些临界情况呢? 这个时候我们就要看看leader 是如何自己放弃 leader位置的,而follower是如何发现leader 已经易主的呢,在这之间是否有临界情况呢?

  • leader自检测:leader会在成为主之后不断检测自己是否符合quorum情况(follower 数目大于 1/2 ),如果已经不符合就进入lookingfor状态, (因为网络故障,暂不考虑其他节点,通知他已经不是leader的情况)。
 代码位置: org.apache.zookeeper.server.quorum.Leader.lead()
 while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick++;
                }
                HashSet<Long> syncedSet = new HashSet<Long>();
                // lock on the followers when we use it.
                syncedSet.add(self.getId());
                for (LearnerHandler f : getLearners()) {
                    // Synced set is used to check we have a supporting quorum, so only
                    // PARTICIPANT, not OBSERVER, learners should be used
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }
              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                    // Lost quorum, shutdown
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    // make sure the order is the same!
                    // the leader goes to looking
                    return;
              } 
              tickSkip = !tickSkip;
          }
  • follwer自检测 : 在follower.followLeader() 内部socket close 后,进入looking 模式,进行重新选主。新选出来的主有所有的数据,因为新的主上一定有最新的txid和session 信息。
        case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;

以上论证了 zk 集群在网络故障时不会长时间出现 两个master 同时错在的情况,然而在其他follower已经选出新主,旧的leader 并没有放弃主的情况下。这个时候临时节点有没有可能出现不一致呢? 这就要看临时节点的存放位置是和如何移除的逻辑:

  • 情况 1: client 连接的 follower 与新leader断了,新leader上这个follower上的session 和 DataNode 都消失了,如果ZK 的设计是这样的那么 的确会出现 旧master上的ephemeralNode 依然存在而 新master上可以建立的ephemeralNode节点的情况(分布式锁并没有起作用)

  • 情况 2: session 的管理 和 DataNode 一样,创建和删除都是先经过master进行 quorum proposal,并在大于 1/2的follower上都存有相同副本的情况,在这种设计的情况,session在新的master必然是存在的,这样必然不会出现其他client能再次创建ephemeralNode的情况(分布式锁没有失效),然而这个session如何迁移,由谁来失效呢?

集群下 zk 如何管理 session呢?

其实关注 ephemeralNode 如何自动消除就是关注session的管理,因为ephemeralNode的监控功能是服务端对 zk client的session 的心跳移除来实现的。

1. 创建session:

client和server建立连接,会触发session的创建,follower的责任链会将createSession请求交付给leader,leader收到请求后,会发起一个createSession的Proposal,如果表决成功(quorm),最终所有的Server都会在其内存中建立同样的Session。等表决通过后,与客户端建立连接的Server为这个session生成一个password,连同 sessionId,sessionTimeOut一起返回给客户端(ConnectResponse)。任何一个Session只能被一个Server所服务,Leader会保留每个Session被哪个Server所持有。 Alt text

2. session 同步

client每次向follower发送消息或者发送心跳消息时,都会把timeout 存在 follower里。而follower 会定时 执行ping 命令,把session的信息同步给master,(LearnerSessionTracker中的touchTable)发送给Leader,在LearnerHandler线程中run方法轮询处理Follower发送给Leader的消息,执行touch方法(即sessionTrackerImpl的touch方法,目的是“延续”session过期时间)。

   类名: org.apache.zookeeper.server.quorum.Learner
   
   //key session id, value 超时时间,follower 定时把这个hashmap 传给 master
   
   HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
   
   private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
    // follower ping master
    protected void ping(QuorumPacket qp) throws IOException {
       // Send back the ping with our session data
       ByteArrayOutputStream bos = new ByteArrayOutputStream();
       DataOutputStream dos = new DataOutputStream(bos);
       HashMap<Long, Integer> touchTable = zk
               .getTouchSnapshot();
       for (Entry<Long, Integer> entry : touchTable.entrySet()) {
           dos.writeLong(entry.getKey());
           dos.writeInt(entry.getValue());
       }
       qp.setData(bos.toByteArray());
       writePacket(qp, true);
   }
   // master 处理ping
   {
          case Leader.PING:
           // Process the touches
           ByteArrayInputStream bis = new ByteArrayInputStream(qp
                   .getData());
           DataInputStream dis = new DataInputStream(bis);
           while (dis.available() > 0) {
               long sess = dis.readLong();
               int to = dis.readInt();
               leader.zk.touch(sess, to);
           }
           break;
  }
  

3. session 的 tracker

由上面的叙述大家应该猜到了其实所有session的移除和超时计算都是由leader决定的,session的状态,完全有leader保持,follower只是定期通过ping把自己维护的session信息包括过期时间发送给leader,leader决定是否过期,以及发送closeSession提议。Leader会周期性的检测全局Session列表,是否有过期的,如果有,将会向所有的Follower发送cloaseSession提议,Follower在接收到提议后,将Session删除。 Alt text 时间轮

4. 客户端重试 session 换到其他 follower的情况。

client 其实是处于一个不断重连的状态,如果没有连上,会尝试连接其他的服务器,因为各个follower (在RW 可写的机器上)都存在着session数据,如果session没有超时,客户端直接可以通过带上的sessionId 和 passwd进行重连。但是如果这个过程如果耗时过长,超过 SESSION_TIMEOUT 后还没有成功连接上服务器,那么服务器认为这个session已经结束了。客户端重新连接上了服务器,服务器会告诉客户端SESSIONEXPIRED(发生在sessionTracker.checkSession(),如果找不到session,将会导致sessionExpired)。此时客户端要做的事情就看应用的复杂情况了,总之,要重新实例zookeeper对象,重新操作所有临时数据(包括临时节点和注册Watcher),所以客户端必须要对sessionExpire进行处理。

总结

通过上面的叙述,其实大家已经很清楚由于session的管理是在master上,所以新的master拥有所有的session数据,在新master创建的node 不会有重复数据,而老的master已经不能成功proposal 新的提交,所以分布式锁在上述的极端情况下亦不会出错。

⚠️ **GitHub.com Fallback** ⚠️