Zookeeper 服务器 - litter-fish/ReadSource GitHub Wiki


启动流程图:

预启动
- 预启动, QuorumPeerMain 启动入口
// --QuorumPeerMain
public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        main.initializeAndRun(args);
    } 
    // ......
    LOG.info("Exiting normally");
    System.exit(0);
}
- 解析配置文件
public void parse(String path) throws ConfigException {
    File configFile = new File(path);
    LOG.info("Reading configuration from: " + configFile);
    try {
        if (!configFile.exists()) {
            throw new IllegalArgumentException(configFile.toString()
                    + " file is missing");
        }
        // 加载指定配置文件
        Properties cfg = new Properties();
        FileInputStream in = new FileInputStream(configFile);
        try {
            cfg.load(in);
        } finally {
            in.close();
        }
        // 解析配置文件内容
        parseProperties(cfg);
    } catch (IOException e) {
        throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
        throw new ConfigException("Error processing " + path, e);
    }
}
- 创建历史文件清理器
// --DatadirCleanupManager
public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }
    // 创建一定时任务进行清理工作
    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
    purgeTaskStatus = PurgeTaskStatus.STARTED;
}
创建一个独立线程执行清理历史文件记录
// --PurgeTask
static class PurgeTask extends TimerTask {
    private String logsDir;
    private String snapsDir;
    private int snapRetainCount;
    public PurgeTask(String dataDir, String snapDir, int count) {
        logsDir = dataDir;
        snapsDir = snapDir;
        snapRetainCount = count;
    }
    @Override
    public void run() {
        LOG.info("Purge task started.");
        try {
            // 清理事务日志文件和快照文件
            PurgeTxnLog.purge(new File(logsDir), new File(snapsDir), snapRetainCount);
        } catch (Exception e) {
            LOG.error("Error occurred while purging.", e);
        }
        LOG.info("Purge task completed.");
    }
}
清理文件
public static void purge(File dataDir, File snapDir, int num) throws IOException {
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }
    // 创建操作数据文件的接口
    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
    // 找到最后的num个快照文件
    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        // 清理旧的快照文件
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}
- 判断当前是集群模式还是单机模式 根据服务器地址列表判断是否是单机还是集群模式
// --QuorumPeerMain
protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
{
    // 省略其他
    if (args.length == 1 && config.servers.size() > 0) {
        //集群模式
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        // 单机模式下委托 ZooKeeperServerMain 启动服务器
        ZooKeeperServerMain.main(args);
    }
}
- 再次解析配置文件
// --ZooKeeperServerMain
protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
{
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }
    // 再次解析配置文件
    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    } else {
        config.parse(args);
    }
    // 运行服务器根据配置文件
    runFromConfig(config);
}
- 创建服务器实例 ZooKeeperServer
// --ZooKeeperServerMain
public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // 创建服务器实例,创建服务器统计器
        final ZooKeeperServer zkServer = new ZooKeeperServer();
       
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));
        // 创建ZK数据管理器 FileTxnSnapLog
        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        // 设置服务器tickTime及超时时间
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        // 创建ZK服务器连接工厂
        cnxnFactory = ServerCnxnFactory.createFactory();
        // 初始化zk连接工厂
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns());
        // 启动ZK连接工厂
        cnxnFactory.startup(zkServer);
        shutdownLatch.await();
        shutdown();
        cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}
初始化
- 创建服务器统计器 ServerStats
public ZooKeeperServer() {
    serverStats = new ServerStats(this);
    listener = new ZooKeeperServerListenerImpl(this);
}
ServerStats 中的一些计数器,都是从Zookeeper启动开始,最近一次重置服务端统计信息开始的
public class ServerStats {
    // 服务器向客户端发送的响应包次数
    private long packetsSent;
    // 服务器接收客户端请求包次数
    private long packetsReceived;
    // 服务器端请求处理的最大延迟
    private long maxLatency;
    // 服务器端请求处理的最小延迟
    private long minLatency = Long.MAX_VALUE;
    // 服务器端请求处理的总延迟
    private long totalLatency = 0;
    // 服务端处理客户端的总请求次数
    private long count = 0;
}
- 创建Zookeeper数据管理器 FileTxnSnapLog 是上层服务器和底层数据存储之间的对接层,主要包括事务日志文件和快照数据文件, 通过配置文件中dataDir指定快照文件的存储目录,即dataLogDir指定事务日志文件存储位置
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
    LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);
    this.dataDir = new File(dataDir, version + VERSION);
    this.snapDir = new File(snapDir, version + VERSION);
    // 省略目录的读写检查
    // check content of transaction log and snapshot dirs if they are two different directories
    // See ZOOKEEPER-2967 for more details
    if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
        checkLogDir();
        checkSnapDir();
    }
    // 事务日志文件管理器
    txnLog = new FileTxnLog(this.dataDir);
    // 快照文件管理器
    snapLog = new FileSnap(this.snapDir);
}
- 设置服务器tickTime和会话超时时间限制
// 设置服务器tickTime
zkServer.setTickTime(config.tickTime);
// 设置会话超时时间
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
- 创建 ServerCnxnFactory
static public ServerCnxnFactory createFactory() throws IOException {
    // 可以通过系统配置属性(zookeeper.serverCnxnFactory)进行指定使用
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                .getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}
- 初始化 ServerCnxnFactory
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    configureSaslLogin();
    // 初始化 ZooKeeperThread 线程
    thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
    thread.setDaemon(true);
    maxClientCnxns = maxcc;
    // 初始化 NIO服务器
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    ss.register(selector, SelectionKey.OP_ACCEPT);
}
- 启动 ServerCnxnFactory 主线程
public void startup(ZooKeeperServer zks) throws IOException,
        InterruptedException {
    // 启动 ZooKeeperThread 线程
    start();
    setZooKeeperServer(zks);
    zks.startdata();
    zks.startup();
}
- 恢复本地文件
//  -- ZooKeeperServer
public void startdata() throws IOException, InterruptedException {
    // check to see if zkDb is not null
    if (zkDb == null) {
        zkDb = new ZKDatabase(this.txnLogFactory);
    }
    // 如果是未初始化这进行本地数据恢复
    if (!zkDb.isInitialized()) {
        loadData();
    }
}
- 创建并启动会话管理器
public synchronized void startup() {
    if (sessionTracker == null) {
        // 创建会话管理器
        createSessionTracker();
    }
    // 启动会话管理器
    startSessionTracker();
    // 省略其他
}
创建会话管理器
protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, 1, getZooKeeperServerListener());
}
启动会话管理器,用于定期清理超时会话
protected void startSessionTracker() {
    ((SessionTrackerImpl)sessionTracker).start();
}
- 初始化Zookeeper的请求处理链
public synchronized void startup() {
     // 省略其他
    // 初始化Zookeeper的请求处理链
    setupRequestProcessors();
}
初始化Zookeeper的请求处理链
protected void setupRequestProcessors() {
    // 最终的请求处理器
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    // 第一个请求处理器
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}
处理链: PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
- 注册JMX
public synchronized void startup() {
    // 省略其他
    
    // 注册JMX
    registerJMX();
    
    // 省略其他
}
注册JMX
protected void registerJMX() {
    // register with JMX
    try {
        jmxServerBean = new ZooKeeperServerBean(this);
        MBeanRegistry.getInstance().register(jmxServerBean, null);
        try {
            jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxDataTreeBean = null;
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxServerBean = null;
    }
}
- 注册Zookeeper服务器实例
public synchronized void startup() {
    if (sessionTracker == null) {
        // 创建会话管理器
        createSessionTracker();
    }
    // 启动会话管理器
    startSessionTracker();
    // 初始化Zookeeper的请求处理链
    setupRequestProcessors();
    // 注册JMX
    registerJMX();
    // 注册Zookeeper服务器实例
    setState(State.RUNNING);
    notifyAll();
}
启动流程图:

预启动
- QuorumPeerMain 启动入口
- 解析配置文件
- 创建历史文件清理器
- 判断当前是集群模式还是单机模式
初始化
- 创建 ServerCnxnFactory
- 初始化 ServerCnxnFactory
- 创建Zookeeper数据管理器 FileTxnSnapLog
- 创建 QuorumPeer 实例 QuorumPeer代表Zookeeper服务器实例(ZookeeperServer)的托管者,从集群角度看,代表着一台机器,QuorumPeer 会不断的检测服务器运行状态,同时根据情况触发Leader选举
protected QuorumPeer getQuorumPeer() throws SaslException {
    return new QuorumPeer();
}
protected QuorumPeer() throws SaslException {
    super("QuorumPeer");
    quorumStats = new QuorumStats(this);
    initialize();
}
- 创建内存数据库ZKDatabase ZKDatabase是Zookeeper的内存数据库,负责管理Zookeeper的所有会话记录及DataTree的事务日志存储
public ZKDatabase(FileTxnSnapLog snapLog) {
    // 初始化DataTree
    dataTree = new DataTree();
    // 初始化sessionsWithTimeouts
    sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
    this.snapLog = snapLog;
}
初始化DataTree,创建一些默认的节点
public DataTree() {
    /* Rather than fight it, let root have an alias */
    nodes.put("", root);
    nodes.put(rootZookeeper, root);
    /** add the proc node and quota node */
    root.addChild(procChildZookeeper);
    // 创建 /zookeeper
    nodes.put(procZookeeper, procDataNode);
    procDataNode.addChild(quotaChildZookeeper);
    // 创建 /zookeeper/quota
    nodes.put(quotaZookeeper, quotaDataNode);
}
- 初始化 QuorumPeer
public void runFromConfig(QuorumPeerConfig config) throws IOException {
  try {
      ManagedUtil.registerLog4jMBeans();
  } catch (JMException e) {
      LOG.warn("Unable to register log4j JMX control", e);
  }
  LOG.info("Starting quorum peer");
  try {
      ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
      cnxnFactory.configure(config.getClientPortAddress(),
                            config.getMaxClientCnxns());
      // 创建 QuorumPeer 实例,代表Zookeeper服务器实例(ZookeeperServer)的托管者
      // 从集群角度看,代表着一台机器,QuorumPeer 会不断的检测服务器运行状态,同时根据情况触发Leader选举
      quorumPeer = getQuorumPeer();
      // 初始化  QuorumPeer
      quorumPeer.setQuorumPeers(config.getServers());
      // 创建FileTxnSnapLog
      quorumPeer.setTxnFactory(new FileTxnSnapLog(
              new File(config.getDataLogDir()),
              new File(config.getDataDir())));
      quorumPeer.setElectionType(config.getElectionAlg());
      quorumPeer.setMyid(config.getServerId());
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setInitLimit(config.getInitLimit());
      quorumPeer.setSyncLimit(config.getSyncLimit());
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      quorumPeer.setCnxnFactory(cnxnFactory);
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
      quorumPeer.setClientPortAddress(config.getClientPortAddress());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      // 创建ZKDatabase
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      quorumPeer.setLearnerType(config.getPeerType());
      quorumPeer.setSyncEnabled(config.getSyncEnabled());
      // sets quorum sasl authentication configurations
      quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
      if(quorumPeer.isQuorumSaslAuthEnabled()){
          quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
          quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
          quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
          quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
          quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
      }
      quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
      quorumPeer.initialize();
      quorumPeer.start();
      quorumPeer.join();
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}
- 恢复本地数据
public synchronized void start() {
    // 加载本地数据
    loadDataBase();
    // 启动 ServerCnxnFactory 主线程
    cnxnFactory.start();
    // leader 选举
    startLeaderElection();
    super.start();
}
加载本地数据
- 启动ServerCnxnFactory 主线程
Leader选举
- 初始化Leader选举
synchronized public void startLeaderElection() {
    try {
        // 生成一个初始化投票
        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    } catch(IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    
    // 省略无关代码
    
    // 创建选举算法
    this.electionAlg = createElectionAlgorithm(electionType);
}
创建选举算法
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    // 省略不支持的选举算法
    case 3:
        // 创建Leader选举所需的网络I/O层 QuorumCnxManager
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            // 开启对Leader选举端口的监听,等待集群中的其他从服务器创建连接
            listener.start();
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}
开启对Leader选举端口的监听,等待集群中的其他从服务器创建连接
public class Listener extends ZooKeeperThread {
    volatile ServerSocket ss = null;
    public Listener() {
        // During startup of thread, thread name will be overridden to
        // specific election address
        super("ListenerThread");
    }
    /**
     * Sleeps on accept().
     */
    @Override
    public void run() {
        int numRetries = 0;
        InetSocketAddress addr;
        while((!shutdown) && (numRetries < 3)){
            try {
                ss = new ServerSocket();
                ss.setReuseAddress(true);
                if (listenOnAllIPs) {
                    int port = view.get(QuorumCnxManager.this.mySid)
                        .electionAddr.getPort();
                    addr = new InetSocketAddress(port);
                } else {
                    addr = view.get(QuorumCnxManager.this.mySid)
                        .electionAddr;
                }
                LOG.info("My election bind port: " + addr.toString());
                setName(view.get(QuorumCnxManager.this.mySid)
                        .electionAddr.toString());
                ss.bind(addr);
                while (!shutdown) {
                    // 等待其他服务器连接
                    Socket client = ss.accept();
                    setSockOpts(client);
                    LOG.info("Received connection request "
                            + client.getRemoteSocketAddress());
                    // Receive and handle the connection request
                    // asynchronously if the quorum sasl authentication is
                    // enabled. This is required because sasl server
                    // authentication process may take few seconds to finish,
                    // this may delay next peer connection requests.
                    if (quorumSaslAuthEnabled) {
                        receiveConnectionAsync(client);
                    } else {
                        // 接受其他服务器的连接请求
                        receiveConnection(client);
                    }
                    numRetries = 0;
                }
            } catch (IOException e) {
                // 省略其他代码
            }
        }
        LOG.info("Leaving listener");
        if (!shutdown) {
            LOG.error("As I'm leaving the listener thread, "
                    + "I won't be able to participate in leader "
                    + "election any longer: "
                    + view.get(QuorumCnxManager.this.mySid).electionAddr);
        }
    }
    /**
     * Halts this listener thread.
     */
    void halt(){
        try{
            LOG.debug("Trying to close listener: " + ss);
            if(ss != null) {
                LOG.debug("Closing listener: "
                          + QuorumCnxManager.this.mySid);
                ss.close();
            }
        } catch (IOException e){
            LOG.warn("Exception when shutting down listener: " + e);
        }
    }
}
其他内容请查看选举章节
- 注册JMX服务
- 检测服务器状态
- Leader选举 以上三个步骤主要在QuorumPeer的主线程中进行
public void run() {
    setName("QuorumPeer" + "[myid=" + getId() + "]" +
            cnxnFactory.getLocalAddress());
    LOG.debug("Starting quorum peer");
    try {
        jmxQuorumBean = new QuorumBean(this);
        // 注册JMX服务
        MBeanRegistry.getInstance().register(jmxQuorumBean, null);
        for(QuorumServer s: getView().values()){
            ZKMBeanInfo p;
            if (getId() == s.id) {
                p = jmxLocalPeerBean = new LocalPeerBean(this);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxLocalPeerBean = null;
                }
            } else {
                p = new RemotePeerBean(s);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                }
            }
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxQuorumBean = null;
    }
    try {
        /*
         * Main loop
         */
        while (running) {
            //  检测服务器状态
            switch (getPeerState()) {
            case LOOKING: // Leader选举
                LOG.info("LOOKING");
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                            logFactory, this,
                            new ZooKeeperServer.BasicDataTreeBuilder(),
                            this.zkDb);
                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();
                        setBCVote(null);
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {
                        setBCVote(null);
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING");
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );                        
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case LEADING: // LEADING服务器的启动
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}
Leader服务器与Learner服务器启动的交互 具体参考“Learner与Leader启动期的交互过程”章节内容 Leader和 Follower服务器启动
- 创建并启动会话管理器
public synchronized void startup() {
    if (sessionTracker == null) {
        // 创建会话管理器
        createSessionTracker();
    }
    // 启动会话管理器
    startSessionTracker();
    // 初始化Zookeeper的请求处理链
    setupRequestProcessors();
    // 注册JMX
    registerJMX();
    // 注册Zookeeper服务器实例
    setState(State.RUNNING);
    notifyAll();
}
对于Leader Session的创建和启动:
// Leader 创建Session管理器
public void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, getZKDatabase()
            .getSessionWithTimeOuts(), tickTime, self.getId(),
            getZooKeeperServerListener());
}
// Leader 启动session管理器
@Override
protected void startSessionTracker() {
    ((SessionTrackerImpl)sessionTracker).start();
}
对于Learner Session的创建和启动:
// Learner 创建Session
public void createSessionTracker() {
    sessionTracker = new LearnerSessionTracker(this, getZKDatabase()
            .getSessionWithTimeOuts(), self.getId(),
            getZooKeeperServerListener());
}
// Learner 的session交给Leader管理
@Override
protected void startSessionTracker() {}
- 
Zookeeper 处理器链的初始化 
- 
注册JMS 服务