ZK Watcher - litter-fish/ReadSource GitHub Wiki

Watcher 相关对象

WatchedEvent

public class WatchedEvent {
    // 通知状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 节点路径
    private String path;
}

WatcherEvent 实现了序列化接口用于网络传输

public class WatcherEvent implements Record {
	// 事件类型
	private int type;
	// 通知状态
	private int state;
	// 节点路径
	private String path;
}

客户端注册

客户端注册Watcher时序图:

6a0945de5cebb2f3e9c6a14683a1e070.png

zookeeper 中调用 getData 时会传入一个watcher

// org/apache/zookeeper/ZooKeeper.java
 public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx)
{
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        // 封装DataWatchRegistration用于暂时保存节点的路径和watcher关系
        wcb = new DataWatchRegistration(watcher, clientPath);
    }
    // 处理 chrootPath clientPath是一个相对路径,需要根据chrootPath进行拼接
    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    // 放入发送队列,等待客户端发送
    cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
            clientPath, serverPath, ctx, wcb);
}

客户端等待服务端响应,进行watcher注册

class SendThread extends ZooKeeperThread {
	/**
     * 接收来自服务端的响应
     */
    void readResponse(ByteBuffer incomingBuffer) throws IOException {
    	......
		try {
		    //
		    packet.replyHeader.setXid(replyHdr.getXid());
		    packet.replyHeader.setErr(replyHdr.getErr());
		    packet.replyHeader.setZxid(replyHdr.getZxid());
		    if (replyHdr.getZxid() > 0) {
		        lastZxid = replyHdr.getZxid();
		    }
		    if (packet.response != null && replyHdr.getErr() == 0) {
		        packet.response.deserialize(bbia, "response");
		    }

		    if (LOG.isDebugEnabled()) {
		        LOG.debug("Reading reply sessionid:0x"
		                + Long.toHexString(sessionId) + ", packet:: " + packet);
		    }
		} finally {
		    finishPacket(packet);
		}
    }
}

private void finishPacket(Packet p) {
    if (p.watchRegistration != null) {
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            p.notifyAll();
        }
    } else {
        p.finished = true;
        eventThread.queuePacket(p);
    }
}

public void register(int rc) {
    if (shouldAddWatch(rc)) {
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized(watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                // 将 watcher 注册到ZKWatchManager中进行管理
                watches.put(clientPath, watchers);
            }
            watchers.add(watcher);
        }
    }
}


服务端处理watcher

服务端处理watcher时序图:

156df4e6545dd227a4297db313a471b3.png

FinalRequestProcessor 中的 processRequest 判断是否需要进行watcher注册

public void processRequest(Request request) {
	......
	try {
		......
		switch (request.type) {
			...
			case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                // 反序列化GetDataRequest请求
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);
                // 获取数据节点
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                // 检查ACL
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo);
                Stat stat = new Stat();
                // 获取内容及stat,并注册watch
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null); // cnxn 表示一个客户端和服务端的连接
                rsp = new GetDataResponse(b, stat);
                break;
            }
            ...
		}
		......
	}
	......
}

class WatchManager {
	public synchronized void addWatch(String path, Watcher watcher) {
	    HashSet<Watcher> list = watchTable.get(path);
	    if (list == null) {
	        list = new HashSet<Watcher>(4);
	        watchTable.put(path, list);
	    }
	    list.add(watcher);

	    HashSet<String> paths = watch2Paths.get(watcher);
	    if (paths == null) {
	        paths = new HashSet<String>();
	        watch2Paths.put(watcher, paths);
	    }
	    paths.add(path);
	}
}

最终 watcher 客户端中会被保存在 WatchManager

public class WatchManager {

    // 从数据节点维度来托管Watcher
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();
    // 从watcher的维度来控制触发事件需要触发节点路径
    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();
}

watcher 事件服务端的触发,以setData为例

watcher 事件服务端的触发时序图:

b0090853771bd0592ed12077a9818ae1.png

FinalRequestProcessor 中接收到setData操作后,最终会在 WatchManager 中进行事件触发操作

// org/apache/zookeeper/server/FinalRequestProcessor.java
public void processRequest(Request request) {
    ProcessTxnResult rc = null;
    synchronized (zks.outstandingChanges) {
        // ......
        if (request.hdr != null) {
           TxnHeader hdr = request.hdr;
           Record txn = request.txn;
           // 应用事务
           rc = zks.processTxn(hdr, txn);
        }
        // ......
    }
    // ......
}

应用事务

// org/apache/zookeeper/server/ZooKeeperServer.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    // 应用事务到内存
    rc = getZKDatabase().processTxn(hdr, txn);
    // ......
    return rc;
}

应用事务到内存

// org/apache/zookeeper/server/ZKDatabase.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    return dataTree.processTxn(hdr, txn);
}

处理事务

// org/apache/zookeeper/server/DataTree.java
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
            // case
            case OpCode.setData:
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                rc.path = setDataTxn.getPath();
                // 设置节点
                rc.stat = setData(setDataTxn.getPath(), setDataTxn
                        .getData(), setDataTxn.getVersion(), header
                        .getZxid(), header.getTime());
                break;
            // case
        }
    } // catch
    //
    return rc;
}

设置节点值

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    // 
    // 触发节点值改变
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

触发节点值改变

// org/apache/zookeeper/server/WatchManager.java
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    // 封装WatchedEvent对象
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        // 指定watcher获取并从管理中移除
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            //
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        // 调用触发操作
        w.process(e);
    }
    return watchers;
}

调用触发操作

// org/apache/zookeeper/server/NIOServerCnxn.java
synchronized public void process(WatchedEvent event) {
    // 标记-1表示一个通知
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    //
    // 将WatchedEvent 包装成WatcherEvent 进行网络传输序列化
    WatcherEvent e = event.getWrapper();
    // 向客户端发送通知
    sendResponse(h, e, "notification");
}

客户端接收到事件回调处理

客户端接收到事件回调时序图:

40a85335b22538668669a87cf2b85ade.png

对于来自服务端的通知,统一由SendThread.readResponse接收处理

接收来自服务端的响应

// org/apache/zookeeper/ClientCnxn$SendThread
void readResponse(ByteBuffer incomingBuffer) throws IOException {
	...
	if (replyHdr.getXid() == -1) {
	    // -1 means notification
	    if (LOG.isDebugEnabled()) {
	        LOG.debug("Got notification sessionid:0x"
	            + Long.toHexString(sessionId));
	    }
	    // 反序列化
	    WatcherEvent event = new WatcherEvent();
	    event.deserialize(bbia, "response");

	    // convert from a server path to a client path
	    // chrootPath的处理,需要根据其解析成相对路径
	    if (chrootPath != null) {
	        String serverPath = event.getPath();
	        // 如果路径相同,这设置相对路径为“/”
	        if(serverPath.compareTo(chrootPath)==0)
	            event.setPath("/");
	        else if (serverPath.length() > chrootPath.length())
	            event.setPath(serverPath.substring(chrootPath.length()));
	        else {
	        	LOG.warn("Got server path " + event.getPath()
	        			+ " which is too short for chroot path "
	        			+ chrootPath);
	        }
	    }

	    // 还原成watchedEvent对象
	    WatchedEvent we = new WatchedEvent(event);
	    if (LOG.isDebugEnabled()) {
	        LOG.debug("Got " + we + " for sessionid 0x"
	                + Long.toHexString(sessionId));
	    }
	    // 交给eventThread线程进行watcher的回调
	    eventThread.queueEvent( we );
	    return;
	}
	...
}

EventThread 线程 watcher的回调

// org/apache/zookeeper/ClientCnxn$EventThread
public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    // materialize the watchers based on the event
    // 取出相关Watcher
    WatcherSetEventPair pair = new WatcherSetEventPair(
            // 从ZKWatcherMananger获取相关watcher
            watcher.materialize(event.getState(), event.getType(),
                    event.getPath()),
                    event);
    // queue the pair (watch set & event) for later processing
    // 加入队列中,稍后进行处理
    waitingEvents.add(pair);
}

从ZKWatcherMananger获取相关watcher

// org/apache/zookeeper/ZooKeeper.java
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                Watcher.Event.EventType type,
                                String clientPath)
{
    Set<Watcher> result = new HashSet<Watcher>();

    switch (type) {
    // case
    case NodeDataChanged:
    case NodeCreated:
        synchronized (dataWatches) {
            addTo(dataWatches.remove(clientPath), result);
        }
        synchronized (existWatches) {
            addTo(existWatches.remove(clientPath), result);
        }
        break;
    // case
    }

    return result;
}

EventThread 线程处理回调

// org/apache/zookeeper/ClientCnxn$EventThread
public void run() {
    try {
      isRunning = true;
      while (true) {
         Object event = waitingEvents.take();
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
            processEvent(event);
         }
         // 
   } // catch
}

处理事件

// org/apache/zookeeper/ClientCnxn$EventThread
private void processEvent(Object event) {
  try {
      if (event instanceof WatcherSetEventPair) {
          WatcherSetEventPair pair = (WatcherSetEventPair) event;
          for (Watcher watcher : pair.watchers) {
              try {
                  // 调用回调方法
                  watcher.process(pair.event);
              } catch (Throwable t) {
                  LOG.error("Error while calling watcher ", t);
              }
          }
      } else {
          // 
      }
  } catch (Throwable t) {
      LOG.error("Caught unexpected throwable", t);
  }
}
}

Watcher 的特性

  1. 一次性,客户端和服务端中如果事件被触发,都会将其从事件存储中移除
  2. 客户端串行执行,客户端的 Watcher 的回调过程是一个的串行同步
  3. 轻量,Watcher 事件非常简单只会告诉客户端发生了什么事情,事情的具体内容并没有附带
⚠️ **GitHub.com Fallback** ⚠️