zookeeper安装 - wtdig/study GitHub Wiki

zookeeper使用

一、zookeeper的简介

原理介绍

参考资料

1、zookeeper的特性:

1.命名服务 2.配置管理 3.集群管理 4.分布式锁 5.队列管理

1.最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。 

2.可靠性:具有简单、健壮、良好的性能,如果消息被到一台服务器接受,那么它将被所有的服务器接受。 

3.实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。 

4.等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待。 

5.原子性:更新只能成功或者失败,没有中间状态。 

6.顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。

2、实现原理:

Zookeeper 的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和 leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

二、zookeeper单机版的安装

1、下载zookeeper安装包

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

2、解压安装包

tar -zxvf zookeeper-3.4.10.tar.gz

3、修改配置文件

将zookeeper-3.4.7/conf/目录下的文件zoo_sample.cfg 复制为zoo.cfg;

修改zoo.cfg文件:

dataDir=/usr/local/../data data目录事先要创建好

dataLogDir=/usr/local/../logs logs目录事先要创建好

4、环境变量设置

为了方便运行zkServer.sh 脚本,我们将zookeeper 的bin 路径加入到/etc/profile 中,作为一个全局变量进行输出到PATH 中,记得修改完成之后运行source /etc/profile 使修改生效

export ZOO_HOME=/home/zookeeper-3.4.7
export PATH=$PATH:$ZOO_HOME/bin:$ZOO_HOME/conf

5、运行

进入zookeeper-3.4.7/bin 目录

启动zookeeper:./zkServer.sh start

停止zookeeper: ./zkServer.sh stop

查看状态: ./zkServer.sh status

zookeeper 客户端:zkCli.sh -server [IP]:12182 #[IP]为zookeeper 服务的IP 地址

三、zookeeper集群使用(3台集群示例,192.168.2.51,192.168.2.52,192.168.2.53)

1、分别在3台机器上的zoo.cfg配置文件中新增以下配置:

server.0=192.168.2.51:2888:3888
server.1=192.168.2.52:2888:3888
server.2=192.168.2.53:2888:3888

2、分别在3台机器上的data目录(单机版是已经配置的)下新建myid文件

192.168.2.51 的myid里面写上0

192.168.2.51 的myid里面写上1

192.168.2.51 的myid里面写上2

3、启动zookeeper(一台台启动)

./zkServer.sh start

四、zookeeper的api使用

1)原生api

pom文件

<!--zookeeper原生api-->
    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
    </dependencies>

java代码

package com.wtdig;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

/**
 * @author wb-wt261136
 * @version 2018/7/3. 14:37
 */
public class ZookeeperDemo {
    private static final String CONNECT_STRING = "45.78.9.159:12181";
    private static final int SESSION_TIMEOUT = 3000;
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {

            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
                    if (watchedEvent.getType().equals(Event.EventType.None)) {
                        connectedSemaphore.countDown();
                        System.out.println("zk开始");
                    }
                }
            }
        });

        //进行阻塞
        connectedSemaphore.await();

        System.out.println("..");
        //创建父节点
        //zk.create("/testRoot", "testRoot".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //创建子节点,使用EPHEMERAL,主程序执行完成后该节点被删除,只在本次会话内有效,可以用作分布式锁。
        //zk.create("/testRoot/children", "children data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        //获取节点信息
//		byte[] data = zk.getData("/testRoot", false, null);
//		System.out.println(new String(data));
//    	System.out.println(zk.getChildren("/testRoot", false));

        //修改节点的值,-1表示跳过版本检查,其他正数表示如果传入的版本号与当前版本号不一致,则修改不成功,删除是同样的道理。
//		zk.setData("/testRoot", "modify data root".getBytes(), -1);
//		byte[] data = zk.getData("/testRoot", false, null);
//		System.out.println(new String(data));

        //判断节点是否存在
//		System.out.println(zk.exists("/testRoot/children", false));
        //删除节点
//		zk.delete("/testRoot/children", -1);
//		System.out.println(zk.exists("/testRoot/children", false));

        zk.close();
    }
}

2)zkClient的api

pom文件

<!-- https://mvnrepository.com/artifact/com.github.sgroschupf/zkclient -->
        <dependency>
            <groupId>com.github.sgroschupf</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.1</version>
        </dependency>

java代码

package com.wtdig;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
 * zookeeper的客户端api
 *
 * @author wb-wt261136
 * @version 2018/7/4. 10:07
 */
public class ZookeeperClientDemo {


    public static void main(String[] args) {
        String path = "45.78.9.159:12181";
        ZkClient zkClient = new ZkClient(path, 10000, 10000, new SerializableSerializer());
        System.out.println("zookeeper已经连接");
        User user = new User();
        user.setName("wt11");
        user.setCity("city111");

        // 创建节点
        String message = zkClient.create("/testUserNode5", user, CreateMode.PERSISTENT);
        zkClient.createPersistent("/testUserNode4", "1111122222");
        System.out.println(message);

        System.out.println(">>>>>>>>>>>>>>>>>>>");

        //读取节点数据
        Stat stat = new Stat();
        //User o = zkClient.readData("/testUserNode", stat);
        User o1 = zkClient.readData("/testUserNode/test");
        System.out.println(o1);
        System.out.println(stat);


        System.out.println(">>>>>>>>>>>>>>>>>>>");

        //判断节点是否存在
        boolean exists = zkClient.exists("/testUserNode");
        System.out.println(exists);

        //删除节点
        //删除单个节点
        boolean delete = zkClient.delete("/testUserNode");
        //删除含有子节点的节点
        boolean b = zkClient.deleteRecursive("/test");
        System.out.println(delete);
        System.out.println(b);
        System.out.println(">>>>>>>>>>>>>>>>>>>");
        //更新节点数据
        User user1 = new User();
        user.setName("wt1111");
        user.setCity("city1111");
        zkClient.writeData("/testUserNode/test", user1);
        User o2 = zkClient.readData("/testUserNode/test");
        System.out.println(o2.getCity());
        zkClient.writeData("/testUserNode3", "www1112223333");
        zkClient.close();


    }
}



package com.wtdig;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.List;

/**
 * 订阅节点的信息改变(创建节点,删除节点,添加子节点)
 */
public class SubscribeChildChanges {
    private static class ZKChildListener implements IZkChildListener {
        /** 
         * handleChildChange: 用来处理服务器端发送过来的通知 
         * parentPath:对应的父节点的路径 
         * currentChilds:子节点的相对路径 
         */  
        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            System.out.println("节点变动了");
            System.out.println(parentPath);  
            System.out.println(currentChilds.toString());  
              
        }  
          
    }  
      
    public static void main(String[] args) throws InterruptedException {  
        //zk集群的地址  
        String ZKServers = "45.78.9.159:12181";
        ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());
        System.out.println("conneted ok!");  
        /** 
         * "/testUserNode" 监听的节点,可以是现在存在的也可以是不存在的 
         */  
        zkClient.subscribeChildChanges("/testUserNode3", new ZKChildListener());  
        Thread.sleep(Integer.MAX_VALUE);  
    }  
}  


package com.wtdig;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

/**
 * 订阅节点的数据内容的变化
 */
public class SubscribeDataChanges {
    private static class ZKDataListener implements IZkDataListener {

        public void handleDataChange(String dataPath, Object data) throws Exception {
            System.out.println("节点数据变动了");
            System.out.println(dataPath + ":" + data.toString());
        }

        public void handleDataDeleted(String dataPath) throws Exception {
            System.out.println("节点数据移除了");
            System.out.println(dataPath);

        }


    }

    public static void main(String[] args) throws InterruptedException {
        //zk集群的地址  
        String ZKServers = "45.78.9.159:12181";
        ZkClient zkClient = new ZkClient(ZKServers, 10000, 10000, new SerializableSerializer());
        System.out.println("conneted ok!");

        zkClient.subscribeDataChanges("/testUserNode4", new ZKDataListener());
        Thread.sleep(Integer.MAX_VALUE);

    }
}  

3) curator框架api

引入的curator的jar包:curator-framework-2.4.2.jar、curator-recipes-2.4.2.jar、curator-client-2.4.2.jar

1、基本使用
package bjsxt.curator.base;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;

public class CuratorBase {
	
	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
//					.namespace("super")
					.build();
		//3 开启连接
		cf.start();
		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
		
//		System.out.println(States.CONNECTED);
//		System.out.println(cf.getState());
		
		// 新加、删除
		/**
		//4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
		//5 删除节点
		cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
		*/
		
		// 读取、修改
		/**
		//创建节点
//		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
//		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes());
		//读取节点
//		String ret1 = new String(cf.getData().forPath("/super/c2"));
//		System.out.println(ret1);
		//修改节点
//		cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
//		String ret2 = new String(cf.getData().forPath("/super/c2"));
//		System.out.println(ret2);	
		*/
		
		// 绑定回调函数
		/**
		ExecutorService pool = Executors.newCachedThreadPool();
		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
		.inBackground(new BackgroundCallback() {
			@Override
			public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
				System.out.println("code:" + ce.getResultCode());
				System.out.println("type:" + ce.getType());
				System.out.println("线程为:" + Thread.currentThread().getName());
			}
		}, pool)
		.forPath("/super/c3","c3内容".getBytes());
		Thread.sleep(Integer.MAX_VALUE);
		*/
		
		
		// 读取子节点getChildren方法 和 判断节点是否存在checkExists方法
		/**
		List<String> list = cf.getChildren().forPath("/super");
		for(String p : list){
			System.out.println(p);
		}
		
		Stat stat = cf.checkExists().forPath("/super/c3");
		System.out.println(stat);
		
		Thread.sleep(2000);
		cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
		*/
		
		
		//cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
		
	}
}

2、事件监听
package bjsxt.curator.watcher;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class CuratorWatcher1 {
	
	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		
		//3 建立连接
		cf.start();
		
		//4 建立一个cache缓存
		final NodeCache cache = new NodeCache(cf, "/super", false);
		cache.start(true);
		cache.getListenable().addListener(new NodeCacheListener() {
			/**
			 * <B>方法名称:</B>nodeChanged<BR>
			 * <B>概要说明:</B>触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作。<BR>
			 * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged()
			 */
			@Override
			public void nodeChanged() throws Exception {
				System.out.println("路径为:" + cache.getCurrentData().getPath());
				System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
				System.out.println("状态为:" + cache.getCurrentData().getStat());
				System.out.println("---------------------------------------");
			}
		});
		
		Thread.sleep(1000);
		cf.create().forPath("/super", "123".getBytes());
		
		Thread.sleep(1000);
		cf.setData().forPath("/super", "456".getBytes());
		
		Thread.sleep(1000);
		cf.delete().forPath("/super");
		
		Thread.sleep(Integer.MAX_VALUE);
		
		

	}
}


package bjsxt.curator.watcher;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class CuratorWatcher2 {
	
	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		
		//3 建立连接
		cf.start();
		
		//4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
		PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
		//5 在初始化的时候就进行缓存监听
		cache.start(StartMode.POST_INITIALIZED_EVENT);
		cache.getListenable().addListener(new PathChildrenCacheListener() {
			/**
			 * <B>方法名称:</B>监听子节点变更<BR>
			 * <B>概要说明:</B>新建、修改、删除<BR>
			 * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
			 */
			@Override
			public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
				switch (event.getType()) {
				case CHILD_ADDED:
					System.out.println("CHILD_ADDED :" + event.getData().getPath());
					break;
				case CHILD_UPDATED:
					System.out.println("CHILD_UPDATED :" + event.getData().getPath());
					break;
				case CHILD_REMOVED:
					System.out.println("CHILD_REMOVED :" + event.getData().getPath());
					break;
				default:
					break;
				}
			}
		});

		//创建本身节点不发生变化
		cf.create().forPath("/super", "init".getBytes());
		
		//添加子节点
		Thread.sleep(1000);
		cf.create().forPath("/super/c1", "c1内容".getBytes());
		Thread.sleep(1000);
		cf.create().forPath("/super/c2", "c2内容".getBytes());
		
		//修改子节点
		Thread.sleep(1000);
		cf.setData().forPath("/super/c1", "c1更新内容".getBytes());
		
		//删除子节点
		Thread.sleep(1000);
		cf.delete().forPath("/super/c2");		
		
		//删除本身节点
		Thread.sleep(1000);
		cf.delete().deletingChildrenIfNeeded().forPath("/super");
		
		Thread.sleep(Integer.MAX_VALUE);
		

	}
}

3、分布式锁
package bjsxt.curator.lock;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
/**
 * jvm中普通锁
 * @author Administrator
 *
 */
public class Lock1 {

	static ReentrantLock reentrantLock = new ReentrantLock();
	static int count = 10;
	public static void genarNo(){
		try {
			reentrantLock.lock();
			count--;
			//System.out.println(count);
		} finally {
			reentrantLock.unlock();
		}
	}
	
	public static void main(String[] args) throws Exception{
		
		final CountDownLatch countdown = new CountDownLatch(1);
		for(int i = 0; i < 10; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						countdown.await();
						genarNo();
						SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
						System.out.println(sdf.format(new Date()));
						//System.out.println(System.currentTimeMillis());
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
					}
				}
			},"t" + i).start();
		}
		Thread.sleep(50);
		countdown.countDown();
	}
}


package bjsxt.curator.lock;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 分布式锁
 * @author Administrator
 *
 */
public class Lock2 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	static int count = 10;
	public static void genarNo(){
		try {
			count--;
			System.out.println(count);
		} finally {
		
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
//					.namespace("super")
					.build();
		//3 开启连接
		cf.start();
		
		//4 分布式锁
		final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
		//final ReentrantLock reentrantLock = new ReentrantLock();
		final CountDownLatch countdown = new CountDownLatch(1);
		
		for(int i = 0; i < 10; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						countdown.await();
						//加锁
						lock.acquire();
						//reentrantLock.lock();
						//-------------业务处理开始
						//genarNo();
						SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
						System.out.println(sdf.format(new Date()));
						//System.out.println(System.currentTimeMillis());
						//-------------业务处理结束
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
						try {
							//释放
							lock.release();
							//reentrantLock.unlock();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
				}
			},"t" + i).start();
		}
		Thread.sleep(100);
		countdown.countDown(); 
	}
}

4、分布式计数器
package bjsxt.curator.atomicinteger;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
/**
 * 分布式计数器
 * @author Administrator
 *
 */
public class CuratorAtomicInteger {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		//3 开启连接
		cf.start();
		//cf.delete().forPath("/super");
		

		//4 使用DistributedAtomicInteger
		DistributedAtomicInteger atomicIntger = 
				new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000));
		
		AtomicValue<Integer> value = atomicIntger.add(1);
		System.out.println(value.succeeded());
		System.out.println(value.postValue());	//最新值
		System.out.println(value.preValue());	//原始值
		
	}
}

5、同时开始,同时结束
package bjsxt.curator.barrier;

import java.util.Random;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
 * 同时开始同时结束
 * @author Administrator
 *
 */
public class CuratorBarrier1 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		for(int i = 0; i < 5; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
						CuratorFramework cf = CuratorFrameworkFactory.builder()
									.connectString(CONNECT_ADDR)
									.retryPolicy(retryPolicy)
									.build();
						cf.start();
						
						DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
						Thread.sleep(1000 * (new Random()).nextInt(3)); 
						System.out.println(Thread.currentThread().getName() + "已经准备");
						barrier.enter();
						System.out.println("同时开始运行...");
						Thread.sleep(1000 * (new Random()).nextInt(3));
						System.out.println(Thread.currentThread().getName() + "运行完毕");
						barrier.leave();
						System.out.println("同时退出运行...");
						

					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}

		
		
	}
}

package bjsxt.curator.barrier;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
/**
 * 同时开始,不一定同时结束
 * @author Administrator
 *
 */
public class CuratorBarrier2 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	static DistributedBarrier barrier = null;
	
	public static void main(String[] args) throws Exception {
		for(int i = 0; i < 5; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
						CuratorFramework cf = CuratorFrameworkFactory.builder()
									.connectString(CONNECT_ADDR)
									.sessionTimeoutMs(SESSION_OUTTIME)
									.retryPolicy(retryPolicy)
									.build();
						cf.start();
						barrier = new DistributedBarrier(cf, "/super");
						System.out.println(Thread.currentThread().getName() + "设置barrier!");
						barrier.setBarrier();	//设置
						barrier.waitOnBarrier();	//等待
						System.out.println("---------开始执行程序----------");
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}

		Thread.sleep(5000);
		barrier.removeBarrier();	//释放
		
		
	}
}

6、事件通知,重复注册

一个事件监听,可以用于多个客户端;节点的监听事件,在每次重新启动监听事件的类时,都会将监听该节点下的所有节点信息都通知出来(比如:我监听了节点创建事件【监听节点的类启动了】,现在我将该类停止,在该节点下创建一个子节点,当我们启动该监听类时,该类就会将所有历史常见节点的信息全部通知出来。我注册一个地址,等会又注册一个地址,他会将注册的所有地址都提供给你)

package bjsxt.curator.cluster;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class CuratorWatcher {

	/** 父节点path */
	static final String PARENT_PATH = "/super";
	
	/** zookeeper服务器地址 */
	public static final String CONNECT_ADDR = "45.78.9.159:12181";	/** 定义session失效时间 */
	
	public static final int SESSION_TIMEOUT = 30000;
	
	public CuratorWatcher() throws Exception{
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_TIMEOUT)
					.retryPolicy(retryPolicy)
					.build();
		//3 建立连接
		cf.start();
		
		//4 创建跟节点
		if(cf.checkExists().forPath(PARENT_PATH) == null){
			cf.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH,"super init".getBytes());
		}

		//4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
		PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true);
		//5 在初始化的时候就进行缓存监听
		cache.start(StartMode.POST_INITIALIZED_EVENT);
		cache.getListenable().addListener(new PathChildrenCacheListener() {
			/**
			 * <B>方法名称:</B>监听子节点变更<BR>
			 * <B>概要说明:</B>新建、修改、删除<BR>
			 * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent)
			 */
			@Override
			public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
				switch (event.getType()) {
				case CHILD_ADDED:
					System.out.println("CHILD_ADDED :" + event.getData().getPath());
					System.out.println("CHILD_ADDED :" + new String(event.getData().getData()));
					break;
				case CHILD_UPDATED:
					System.out.println("CHILD_UPDATED :" + event.getData().getPath());
					System.out.println("CHILD_UPDATED :" + new String(event.getData().getData()));
					break;
				case CHILD_REMOVED:
					System.out.println("CHILD_REMOVED :" + event.getData().getPath());
					System.out.println("CHILD_REMOVED :" + new String(event.getData().getData()));
					break;
				default:
					break;
				}
			}
		});
	}

}


package bjsxt.curator.cluster;



public class Client1 {

	public static void main(String[] args) throws Exception{
		
		CuratorWatcher watcher = new CuratorWatcher();
		Thread.sleep(100000000);
	}
}


package bjsxt.curator.cluster;


public class Client2 {

	public static void main(String[] args) throws Exception{
		
		CuratorWatcher watcher = new CuratorWatcher();
		Thread.sleep(100000000);
	}
}


package bjsxt.curator.cluster;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class Test {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "45.78.9.159:12181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		//3 开启连接
		cf.start();

		
		Thread.sleep(3000);
		System.out.println(cf.getChildren().forPath("/super").get(0));
		
		//4 创建节点
//		Thread.sleep(1000);
		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
		Thread.sleep(1000);
//		cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes());
//		Thread.sleep(1000);
//		
//		
//		
//		//5 读取节点
//		Thread.sleep(1000);
//		String ret1 = new String(cf.getData().forPath("/super/c1"));
//		System.out.println(ret1);
//
//		
//		//6 修改节点
		Thread.sleep(1000);
		cf.setData().forPath("/super/c2", "修改的新c2内容".getBytes());
		String ret2 = new String(cf.getData().forPath("/super/c2"));
		System.out.println(ret2);	
//		
//
//		
//		//7 删除节点
//		Thread.sleep(1000);
//		cf.delete().forPath("/super/c1");
		
		
		
		
	}
}

五、zookeeper查看工具:

1、Zookeeper数据查看工具ZooInspector

参考文件

⚠️ **GitHub.com Fallback** ⚠️