tutorial - adyliu/zkclient GitHub Wiki

zkclient 快速指南

Maven依赖

最新的版本发布在Maven中央库

<dependency>
    <groupId>com.github.adyliu</groupId>
    <artifactId>zkclient</artifactId>
    <version>2.0</version>
</dependency>

如果不使用Maven依赖,那么需要以下依赖:

  • org.apache.zookeeper:zookeeper
  • log4j:log4j

zookeeper 3.4.x以上依赖于

  • org.slf4j:slf4j-api
  • org.slf4j:slf4j-log4j12

快速入门

操作的接口在com.github.zkclient.IZkClient中,默认的实现com.github.zkclient.ZkClient。

构造一个ZkClient非常简单,只需要如下代码即可:

IZkClient zkClient = new ZkClient("127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002");

读取数据

byte[] readData(String path);
byte[] readData(String path, boolean returnNullIfPathNotExists);
byte[] readData(String path, Stat stat);

写入数据

Stat writeData(String path, byte[] data);
Stat writeData(String path, byte[] data, int expectedVersion);

创建节点

void createPersistent(String path);
void createPersistent(String path, boolean createParents);
void createPersistent(String path, byte[] data);
String createPersistentSequential(String path, byte[] data);
void createEphemeral(final String path);
void createEphemeral(final String path, final byte[] data);
String createEphemeralSequential(final String path, final byte[] data);
String create(final String path, byte[] data, final CreateMode mode);

删除节点

boolean delete(final String path);
boolean deleteRecursive(String path);

查询节点

boolean exists(final String path);
List<String> getChildren(String path);
long getCreationTime(String path);
int countChildren(String path);

订阅事件

zkclient的强大之处不在于基本zookeeper api操作,而在于事件监听机制,也就是zookeeper的watches

Zookeeper的watcher在使用上存在一次性、session过期等难点,因此zkclient对这些问题进行了封装和屏蔽。 zkclient一共定义了三种事件:

  • com.github.zkclient.IZkStateListener
    • public void handleStateChanged(KeeperState state) throws Exception;
    • public void handleNewSession() throws Exception;
  • com.github.zkclient.IZkDataListener
    • public void handleDataChange(String dataPath, byte[] data) throws Exception;
    • public void handleDataDeleted(String dataPath) throws Exception;
  • com.github.zkclient.IZkChildListener
    • public void handleChildChange(String parentPath, List currentChildren) throws Exception;

IZkStateListener 定义了两种事件,一种是连接状态的改变,例如由未连接改变成连接上,连接上改为过期等;另一种创建一个新的session(连接), 通常是由于session失效然后新的session被建立时触发。一般此时需要开发者重新创建临时节点(Ephemeral Nodes)。

IZkDataListener 也定义了两种事件,一种是节点数据的变化,另一种是节点被删除。

IZkChildListener 定义了一种事件,描述子节点变化了,这时候获取到的是新的子节点列表。如果此节点被删除,那么子节点列表是null(非空列表)。

IZkClient能够非常方便的订阅这三种事件:

  • void subscribeStateChanges(IZkStateListener listener);
  • void subscribeDataChanges(String path, IZkDataListener listener);
  • List subscribeChildChanges(String path, IZkChildListener listener);

而zkclient最强大之处在于,当发送session失效时能够自动重新订阅这些事件,而不需要开发者重新订阅。

例如,订阅一个节点的数据变更:

zkclient.subscribeDataChanges('/xpower/ddd/compass', new IZkDataListener() {
    public void handleDataDeleted(String path) throws Exception {
        destroyDatasource("compass");
    }
    public void handleDataChange(String path, byte[] data) throws Exception {
        rebuildDatasource("compass",data);
    }
});

jafka中大量使用了事件监听的机制来保证数据的一致性,例如在session失效时,需要重新注册broker以及其topics:

public void handleNewSession() throws Exception {
    logger.info("re-registering broker info in zookeeper for broker " + config.getBrokerId());
    registerBrokerInZk();
    synchronized (lock) {
        logger.info("re-registering broker topics in zookeeper for broker " + config.getBrokerId());
        for (String topic : topics) {
            processTask(new TopicTask(TaskType.CREATE, topic));
        }
    }
}

public void handleStateChanged(KeeperState state) throws Exception {
    // do nothing, since zkclient will do reconnect for us.
}
⚠️ **GitHub.com Fallback** ⚠️