消费者——订阅状态器 - 969251639/study GitHub Wiki

SubscriptionState定义了消费者的订阅规则,它默认有四种方式,以枚举方式定义如下

    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    }
  • NONE:空,初始状态
  • AUTO_TOPICS:按照指定的topic进行订阅,自动分配分区
  • AUTO_PATTERN:按照指定的正则匹配topic进行订阅,自动分配分区
  • USER_ASSIGNED:用户手动指定topic下的具体分区号

其中AUTO_TOPICS和AUTO_PATTERN和USER_ASSIGNED都是互斥的关系,也就是只能选一种订阅模式,这三种关系都是通过setSubscriptionType方法来实现互斥,下面是其主要的成员变量

    //订阅类型
    private SubscriptionType subscriptionType;

    //订阅topic的正则
    private Pattern subscribedPattern;

    //订阅到的主题集合
    private Set<String> subscription;

    //consumer group有个leader,如果该consumer是leader则保存了所有consumer订阅的topic,否则follower则保存了自身订阅的topic
    private final Set<String> groupSubscription;

    //分区分配状态
    private final PartitionStates<TopicPartitionState> assignment;

    //默认的重载消费策略
    private final OffsetResetStrategy defaultResetStrategy;

    //发送reblance时的监听器
    private ConsumerRebalanceListener rebalanceListener;

它有个内部类用来记录分区的消费状态TopicPartitionState

    private static class TopicPartitionState {
        //最后一次消费的偏移量
        private Long position; // last consumed position
        //抓取到最新能看到的偏移量
        private Long highWatermark; // the high watermark from last fetch
        //最开始的偏移量
        private Long logStartOffset; // the log start offset
        //最后一个稳定的偏移量???
        private Long lastStableOffset;
        //是否暂停消费
        private boolean paused;  // whether this partition has been paused by the user
        //消费重置策略
        private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
        //允许下一次重试的时间
        private Long nextAllowedRetryTimeMs;
        ...
    }

AUTO_TOPICS

    public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        setSubscriptionType(SubscriptionType.AUTO_TOPICS);//设置订阅类型
        this.rebalanceListener = listener;//设置reblance监听器
        changeSubscription(topics);//修改订阅主题列表
    }

AUTO_PATTERN

    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        setSubscriptionType(SubscriptionType.AUTO_PATTERN);//设置订阅类型
        this.rebalanceListener = listener;//设置reblance监听器
        this.subscribedPattern = pattern;//设置正则匹配规则表达式
    }

USER_ASSIGNED

    public void assignFromUser(Set<TopicPartition> partitions) {
        setSubscriptionType(SubscriptionType.USER_ASSIGNED);//设置订阅类型

        if (!this.assignment.partitionSet().equals(partitions)) {//如果新手动分配到的分区没在该consumer中
            fireOnAssignment(partitions);//触发手动分配监听器的onAssignment回调方法
            //重新标记分区与分区状态的映射,保存到assignment中
            Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
            for (TopicPartition partition : partitions) {
                TopicPartitionState state = assignment.stateValue(partition);
                if (state == null)
                    state = new TopicPartitionState();
                partitionToState.put(partition, state);
            }
            this.assignment.set(partitionToState);
        }
    }

可见不管调用哪种订阅模式,都会先调用setSubscriptionType方法对上面的三种模式互斥

    private void setSubscriptionType(SubscriptionType type) {
        if (this.subscriptionType == SubscriptionType.NONE)//如果是初始化状态则设置订阅类型
            this.subscriptionType = type;
        else if (this.subscriptionType != type)//如果订阅类型被修改成不等,则抛出异常
            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
    }

其他主要方法

    private void changeSubscription(Set<String> topicsToSubscribe) {//修改主题列表
        if (!this.subscription.equals(topicsToSubscribe)) {//如果订阅的主题列表和当前已订阅的主题列表不等
            this.subscription = topicsToSubscribe;//新主题列表覆盖原有主题列表
            this.groupSubscription.addAll(topicsToSubscribe);//添加到组订阅列表中
        }
    }

    public void groupSubscribe(Collection<String> topics) {//该方法会在group leader中使用
        if (this.subscriptionType == SubscriptionType.USER_ASSIGNED)//由于consumer group是用来辅助自动分配分区用的,所以使用手动分配分区时,该方法抛异常
            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
        this.groupSubscription.addAll(topics);//添加新的主题列表到组中
    }

    //两种自动分配模式的分配方式,与上面的assignFromUser方法互斥
    public void assignFromSubscribed(Collection<TopicPartition> assignments) {
        if (!this.partitionsAutoAssigned())//如果不是AUTO_TOPICS,AUTO_PATTERN两种自动分配模式,抛出异常
            throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");

        Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments);//和assignFromUser方法一样,记录了分区和分区状态的映射,保存在assignment中
        fireOnAssignment(assignedPartitionStates.keySet());//触发手动分配监听器的onAssignment回调方法

        if (this.subscribedPattern != null) {//如果是AUTO_PATTERN模式订阅主题
            for (TopicPartition tp : assignments) {//遍历主题
                if (!this.subscribedPattern.matcher(tp.topic()).matches())//如果订阅的正则表达式中不匹配待分配的主题,抛出异常
                    throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic regex pattern; subscription pattern is " + this.subscribedPattern);
            }
        } else {//如果是AUTO_TOPICS模式订阅主题
            for (TopicPartition tp : assignments)//遍历主题
                if (!this.subscription.contains(tp.topic()))//如果订阅的主题列表中不包含待分配的主题,抛出异常
                    throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic; subscription is " + this.subscription);
        }

        this.assignment.set(assignedPartitionStates);
    }

    public void unsubscribe() {//取消订阅
        this.subscription = Collections.emptySet();//清空所有订阅列表
        this.assignment.clear();//情况分配的分区
        this.subscribedPattern = null;//正则匹配设置为null
        this.subscriptionType = SubscriptionType.NONE;//订阅类型设置为初始状态
        fireOnAssignment(Collections.<TopicPartition>emptySet());//回调分配监听器方法
    }

    public Set<TopicPartition> pausedPartitions() {//获取暂停的分区
        HashSet<TopicPartition> paused = new HashSet<>();
        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) {//遍历订阅到的所有主题分区状态
            if (state.value().paused) {//该分区属于暂停状态
                paused.add(state.topicPartition());//添加到set集合中
            }
        }
        return paused;
    }

    public void seek(TopicPartition tp, long offset) {//设置消费的起始偏移量
        assignedState(tp).seek(offset);
    }
    private TopicPartitionState assignedState(TopicPartition tp) {
        TopicPartitionState state = this.assignment.stateValue(tp);//获取分区状态
        if (state == null)
            throw new IllegalStateException("No current assignment for partition " + tp);
        return state;
    }

TopicPartitionState.seek方法用于设置分区偏移量

        private void seek(long offset) {
            this.position = offset;//设置偏移量
            this.resetStrategy = null;//清空重置策略
            this.nextAllowedRetryTimeMs = null;//清空下一次重试时间
        }

SubscriptionState还有许多方法,都比较简单,很多都是基于TopicPartitionState的属性进行操作,另外在上面的assignFromSubscribed方法或assignFromUser方法中,最后都会将分区和分区状态的映射保存在assignment变量中,assignment变量是一个PartitionStates类,PartitionStates本身是一个泛型类,记录了已分配到的分区与泛型的一个集合记录体,在SubscriptionState中它是一个记录了TopicPartitionState分区状态的泛型

private final PartitionStates<TopicPartitionState> assignment;

PartitionStates内部有一个LinkedHashMap的map的成员变量来记录分区与这些泛型的映射关系,同时也用了一个不可修改的partitionSetView来保存已分配到的分区的视图,它方法基本都是对map的操作,它最重要的方法是set方法,负责更新整个分区关系

public class PartitionStates<S> {
    //分区与泛型的关系映射
    private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
    //分区视图,随map的变化而变化
    private final Set<TopicPartition> partitionSetView = Collections.unmodifiableSet(map.keySet());

    /* the number of partitions that are currently assigned available in a thread safe manner */
    private volatile int size = 0;//当前已分配到的分区数量

    ...

    /**
     * Update the builder to have the received map as its state (i.e. the previous state is cleared). The builder will
     * "batch by topic", so if we have a, b and c, each with two partitions, we may end up with something like the
     * following (the order of topics and partitions within topics is dependent on the iteration order of the received
     * map): a0, a1, b1, b0, c0, c1.
     */
    public void set(Map<TopicPartition, S> partitionToState) {
        map.clear();//先清空
        update(partitionToState);//从新设置新的映射关系
        updateSize();//更新分区数量
    }
    ...
}
⚠️ **GitHub.com Fallback** ⚠️