使用ZooKeeper实现负载均衡 - xbwen/xbwen.github.io GitHub Wiki

实现负载均衡最常见的几个算法包括:

(1)随机(Random)算法。即,把新的连接请求,随机分配到N个服务器的其中一个。

(2)轮叫(Round Robin)算法。即,把新的连接请求,依次分配到第1个、第2个、直至第N个服务器上。

(3)最少连接(Least Connection)算法。即,把新的连接请求,分配到当前连接数最少(负载最小)的服务器上。

接下来,我们使用ZooKeeper来实现这几个算法。我们使用Curator作为ZooKeeper Client Library,使用Netty实现和Server和BalancerServer。

0、Balancer和BalanceServer

首先,定义一个Balancer接口:

public interface Balancer {
    //返回Server的IP地址
    public String select() throws Exception;
}

然后,实现一个BalanceServer,其中关键的代码如下:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Balancer balancer = new RandomBalancer(BalanceServer.zkClient);
    //Balancer balancer = new RoundRobinBalancer(BalanceServer.zkClient);
    //Balancer balancer = new LeastConnectionBalancer(BalanceServer.zkClient);
    String ip = balancer.select();
    if(ip == null){
        ctx.close();
    }
    ByteBuf buf = ctx.alloc().buffer();
    buf.writeBytes(ip.getBytes());
    ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
}

1、随机算法

使用随机算法,ZooKeeper上只需记录有哪几个Server可用。Server程序启动的时候,在ZooKeeper上创建一个值为null的临时节点:

zkClient.create().withMode(CreateMode.EPHEMERAL).forPath("/workers/" + myIP, null);

RandomBalancer的代码如下:

public class RandomBalancer implements Balancer {
    
    private CuratorFramework zkClient;
    
    public RandomBalancer(CuratorFramework zkClient){
        this.zkClient = zkClient;
    }

    @Override
    public String select() throws Exception{
        List<String> workers = zkClient.getChildren().forPath("/workers");
        if(workers == null){
            return null;
        }
        int size = workers.size();
        if(size == 0){
            return null;
        }
        Random r = new Random();
        int index = r.nextInt(size);
        return workers.get(index);
    }

}

2、轮叫算法

使用轮叫算法,ZooKeeper上也只需记录有哪几个Server可用。Server程序启动的时候,在ZooKeeper上创建一个值为null的临时节点:

zkClient.create().withMode(CreateMode.EPHEMERAL).forPath("/workers/" + myIP, null);

RoundRobinBalancer的代码如下:

public class RoundRobinBalancer implements Balancer {
    
    private CuratorFramework zkClient;
    
    public RoundRobinBalancer(CuratorFramework zkClient){
        this.zkClient = zkClient;
    }

    @Override
    public String select() throws Exception {
        List<String> workers = zkClient.getChildren().forPath("/workers");
        if(workers == null){
            return null;
        }
        int size = workers.size();
        if(size == 0){
            return null;
        }
        
        int round = -1;
        SharedCount sc = new SharedCount(zkClient, "/round", -1);
        boolean success = false;
        try {
            sc.start();
            while(!success){
                round = sc.getCount();
                round++;
                if(round >= Integer.MAX_VALUE){
                    round = -1;
                }
                success = sc.trySetCount(round);
            }
        }finally{
            CloseableUtils.closeQuietly(sc);
        }
        
        int index = round % size;
        return workers.get(index);
    }

}

3、最少连接算法

使用最少连接算法,ZooKeeper上需要记录下每个Server节点当前的连接数。

zkClient.create().withMode(CreateMode.EPHEMERAL).forPath("/workers/" + myIP, 0);

同时,在Server程序中,当channelActive的时候,ZooKeeper记录的连接数要增加1;当channelInactive的时候,ZooKeeper记录的连接数要减少1。

LeastConnectionBalancer的源代码如下:

public class LeastConnectionBalancer implements Balancer {
    
    private CuratorFramework zkClient;
    
    public LeastConnectionBalancer(CuratorFramework zkClient){
        this.zkClient = zkClient;
    }

    @Override
    public String select() throws Exception {
        List<String> workers = zkClient.getChildren().forPath("/workers");
        if(workers == null){
            return null;
        }
        int size = workers.size();
        if(size == 0){
            return null;
        }
        int leastConnection = -1;
        int leastNode = -1;
        for(int i=0; i<size; i++){
            byte[] data = zkClient.getData().forPath("/workers/" + workers.get(i));
            int connection = Integer.parseInt(new String(data));
            if(leastConnection == -1 || connection < leastConnection){
                leastConnection = connection;
                leastNode = i;
            }
        }
        return workers.get(leastNode);
    }

}
⚠️ **GitHub.com Fallback** ⚠️