消费者——分区分配策略 - 969251639/study GitHub Wiki

PartitionAssignor接口定义了分区的基本行为,它由AbstractPartitionAssignor这个抽象类来实现

PartitionAssignor有两个内部类,分别定义了订阅的主题和分区分配到分区号

class Subscription {
        private final List<String> topics;//消费者订阅的主题列表
        private final ByteBuffer userData;//用户缓存数据,序列化二进制字节码时用

        public Subscription(List<String> topics, ByteBuffer userData) {
            this.topics = topics;
            this.userData = userData;
        }

        public Subscription(List<String> topics) {
            this(topics, ByteBuffer.wrap(new byte[0]));
        }

        public List<String> topics() {
            return topics;
        }

        public ByteBuffer userData() {
            return userData;
        }

        @Override
        public String toString() {
            return "Subscription(" +
                    "topics=" + topics +
                    ')';
        }
    }

    class Assignment {
        private final List<TopicPartition> partitions;//消费者分配到的分区列表
        private final ByteBuffer userData;//用户缓存数据,序列化二进制字节码时用

        public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
            this.partitions = partitions;
            this.userData = userData;
        }

        public Assignment(List<TopicPartition> partitions) {
            this(partitions, ByteBuffer.wrap(new byte[0]));
        }

        public List<TopicPartition> partitions() {
            return partitions;
        }

        public ByteBuffer userData() {
            return userData;
        }

        @Override
        public String toString() {
            return "Assignment(" +
                    "partitions=" + partitions +
                    ')';
        }
    }

AbstractPartitionAssignor抽象类实现了PartitionAssignor接口

public abstract class AbstractPartitionAssignor implements PartitionAssignor {
    private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class);

    /**
     * Perform the group assignment given the partition counts and member subscriptions
     * @param partitionsPerTopic The number of partitions for each subscribed topic. Topics not in metadata will be excluded
     *                           from this map.
     * @param subscriptions Map from the memberId to their respective topic subscription
     * @return Map from each member to the list of partitions assigned to them.
     */
    //抽象方法,由具体子类实现
    public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                             Map<String, Subscription> subscriptions);

    @Override
    public Subscription subscription(Set<String> topics) {//返回订阅的载体Subscription
        return new Subscription(new ArrayList<>(topics));
    }

    @Override
    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {//分配分区
        Set<String> allSubscribedTopics = new HashSet<>();//记录该消费者下所有的主题
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet())
            allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());

        Map<String, Integer> partitionsPerTopic = new HashMap<>();
        for (String topic : allSubscribedTopics) {//遍历主题
            Integer numPartitions = metadata.partitionCountForTopic(topic);//获取主题下的分区数
            if (numPartitions != null && numPartitions > 0)//如果该主题已有分区
                partitionsPerTopic.put(topic, numPartitions);//缓存到partitionsPerTopic 
            else
                log.debug("Skipping assignment for topic {} since no metadata is available", topic);
        }

        Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, subscriptions);//根据不同的分区分配算法,返回每个主题下的分配到的分区

        // this class maintains no user data, so just wrap the results
        //处理分配结果后返回
        Map<String, Assignment> assignments = new HashMap<>();
        for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet())
            assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));
        return assignments;
    }

    @Override
    public void onAssignment(Assignment assignment) {//消费者收到leader分配结果时的毁掉方法,这里是空实现,一般由子类覆盖
        // this assignor maintains no internal state, so nothing to do
    }

    protected static <K, V> void put(Map<K, List<V>> map, K key, V value) {
        List<V> list = map.computeIfAbsent(key, k -> new ArrayList<>());
        list.add(value);
    }

    protected static List<TopicPartition> partitions(String topic, int numPartitions) {
        List<TopicPartition> partitions = new ArrayList<>(numPartitions);
        for (int i = 0; i < numPartitions; i++)
            partitions.add(new TopicPartition(topic, i));
        return partitions;
    }
}

AbstractPartitionAssignor的分区分配实现kafka提供三种分配算法,分别是

  • RangeAssignor:根据消费者的总数和分区数进行运算,尽可能的根据这两个数字均匀的分配
  • RoundRobinAssignor:轮询的分配
  • StickyAssignor:黏性的分配,相对于上面两种算法可以更加的均匀分配分区,实现比较复杂

kafka默认使用RangeAssignor来分配,用户可以通过partition.assignment.strategy配置项来指定具体使用哪个分配算法

.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    Type.LIST,
    Collections.singletonList(RangeAssignor.class),
    new ConfigDef.NonNullValidator(),
    Importance.MEDIUM,
    PARTITION_ASSIGNMENT_STRATEGY_DOC)

比如,有两个消费者c1, c2,两个主题t1,t2,每个主题都有编号从0到2的三个分区号t1p0, t1p1, t1p2, t2p0, t2p1, t2p2,那么通过RangeAssignor算法分配后得到下面的分配结果
c1[t1p0, t1p1,t1p2, t2p0],c2[t1p2, t2p2],它的具体计算规则如下

针对每个主题,n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区

那么上面的例子消费者共2个,主题共2个,每个主题3个分区,则每个主题的分配如下
n = 3 / 2 = 1, m = 3 % 2 = 1,那么前1个消费者分配了1 + 1 = 2个分区,后面的消费者(2 - 1 = 1)则分配了1个分区,两个分区分配完后即可得到上面的分配结果

⚠️ **GitHub.com Fallback** ⚠️