消费者——消费协调器 - 969251639/study GitHub Wiki

kafka用ConsumerCoordinator组件作为消费者的协调器,主要用来控制偏移量的提交,心跳,消费重平衡reblance等,Coordinator对于消费者来说就是一个服务器节点,至于哪一个broker持有控制Coordinator的权限,消费者并不关心,消费者只需要知道服务器告诉消费者Coordinator在哪台机器,消费者主动去跟该服务器保持连接即可,如果Coordinator宕机时,消费者也会重新向服务器请求返回最新的Coordinator的机器节点,然后再往该Coordinator节点进行连接。消费者每次轮询时都会调用协调者的poll方法,该方法也是其最核心的方法

    public boolean poll(Timer timer) {
        invokeCompletedOffsetCommitCallbacks();//每次轮询时候都会回调偏移量提交的回调方法,该方法也会在每次提交偏移量或者关闭协调者时调用,后面分析

        if (subscriptions.partitionsAutoAssigned()) {//如果是自动分区
            // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
            // group proactively due to application inactivity even if (say) the coordinator cannot be found.
            // 因为是自动分区,每当服务器的主题或者消费者发生变化时都会发生reblance操作,所以必须保持消费者对服务器的心跳
            pollHeartbeat(timer.currentTimeMs());//尝试发送心跳
            if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {//如果Coordinator协调器宕机且重新查找新的Coordinator节点重连超时,退出该轮询
                return false;
            }

            if (rejoinNeededOrPending()) {//是否需要执行reblance
                // due to a race condition between the initial metadata fetch and the initial rebalance,
                // we need to ensure that the metadata is fresh before joining initially. This ensures
                // that we have matched the pattern against the cluster's topics at least once before joining.
                if (subscriptions.hasPatternSubscription()) {
                    // For consumer group that uses pattern-based subscription, after a topic is created,
                    // any consumer that discovers the topic after metadata refresh can trigger rebalance
                    // across the entire consumer group. Multiple rebalances can be triggered after one topic
                    // creation if consumers refresh metadata at vastly different times. We can significantly
                    // reduce the number of rebalances caused by single topic creation by asking consumer to
                    // refresh metadata before re-joining the group as long as the refresh backoff time has
                    // passed.
                    if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
                        this.metadata.requestUpdate();
                    }

                    if (!client.ensureFreshMetadata(timer)) {
                        return false;
                    }
                }

                if (!ensureActiveGroup(timer)) {
                    return false;
                }
            }
        } else {//如果是手动分区
            // For manually assigned partitions, if there are no ready nodes, await metadata.
            // If connections to all nodes fail, wakeups triggered while attempting to send fetch
            // requests result in polls returning immediately, causing a tight loop of polls. Without
            // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
            // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
            // When group management is used, metadata wait is already performed for this scenario as
            // coordinator is unknown, hence this check is not required.
            //对应手动分配则简单许多,不需要考虑自动分配的问题,也就不需要与服务器保持心跳,因为该过程完全由用户控制,无需服务器参与,也就无需与服务器的心跳了
            //如果需要更新metadata元数据且有节点没有READY好,则阻塞更新元数据
            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
                client.awaitMetadataUpdate(timer);
            }
        }

        maybeAutoCommitOffsetsAsync(timer.currentTimeMs());//对于自动提交偏移量的情况下,尝试提交
        return true;
    }

上面的代码虽然行数少,但信息量十分庞大,其中最难的最复杂都是在自动分区那部分里面,所以下面只分析自动分区的情况

  1. 判断是否是自动分区
    public boolean partitionsAutoAssigned() {
        return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
    }

很简单,只要订阅类型是AUTO_TOPICS或者是AUTO_PATTERN即可

  1. 发送心跳
    protected synchronized void pollHeartbeat(long now) {
        if (heartbeatThread != null) {//如果心跳线程存在
            if (heartbeatThread.hasFailed()) {//线程是否正常运行
                // set the heartbeat thread to null and raise an exception. If the user catches it,
                // the next call to ensureActiveGroup() will spawn a new heartbeat thread.
                RuntimeException cause = heartbeatThread.failureCause();
                heartbeatThread = null;
                throw cause;
            }
            // Awake the heartbeat thread if needed
            if (heartbeat.shouldHeartbeat(now)) {//这里会判断是否需要重新发送心跳请求
                notify();//如果需要重新发送心跳请求的话就唤醒休眠的心跳线程
            }
            heartbeat.poll(now);//刷新心跳时间
        }
    }
  1. 宕机检查
    public boolean coordinatorUnknown() {//是否宕机
        return checkAndGetCoordinator() == null;
    }
    protected synchronized Node checkAndGetCoordinator() {//检查是否宕机
        //宕机无非就两种情况,一是coordinator等于null,二是coordinator节点连接失败
        if (coordinator != null && client.isUnavailable(coordinator)) {
            markCoordinatorUnknown(true);//标记宕机
            return null;
        }
        return this.coordinator;
    }
    protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
        if (this.coordinator != null) {
            log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
            Node oldCoordinator = this.coordinator;//记录旧的coordinator节点

            // Mark the coordinator dead before disconnecting requests since the callbacks for any pending
            // requests may attempt to do likewise. This also prevents new requests from being sent to the
            // coordinator while the disconnect is in progress.
            this.coordinator = null;//重新将coordinator设置为null

            // Disconnect from the coordinator to ensure that there are no in-flight requests remaining.
            // Pending callbacks will be invoked with a DisconnectException on the next call to poll.
            if (!isDisconnected)//如果旧的coordinator节点连接
                client.disconnectAsync(oldCoordinator);//断开连接
        }
    }
  1. 重连coordinator节点,直至超时
    protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
        if (!coordinatorUnknown())//如果没有宕机,直接返回true
            return true;

        do {
            final RequestFuture<Void> future = lookupCoordinator();//从服务器查找coordinator节点的服务器
            client.poll(future, timer);//发送查找请求,该方法会阻塞

            if (!future.isDone()) {//如果响应OK,跳出
                // ran out of time
                break;
            }

            if (future.failed()) {//响应失败
                if (future.isRetriable()) {//判断是否需要重试
                    log.debug("Coordinator discovery failed, refreshing metadata");
                    client.awaitMetadataUpdate(timer);//刷新metadata元数据后继续重试
                } else
                    throw future.exception();
            } else if (coordinator != null && client.isUnavailable(coordinator)) {//如果新的coordinator节点依旧不可用
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                markCoordinatorUnknown();//标记coordinator节点宕机
                timer.sleep(retryBackoffMs);//休眠一段时间重试,直到下面的用户指定的时间范围内超时
            }
        } while (coordinatorUnknown() && timer.notExpired());//如果还是宕机或者没超时

        return !coordinatorUnknown();//再次检查是否宕机
    }

首先看coordinator的查找

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (findCoordinatorFuture == null) {
            // find a node to ask about the coordinator
            Node node = this.client.leastLoadedNode();//查找负载最低的节点
            if (node == null) {
                log.debug("No broker available to send FindCoordinator request");
                return RequestFuture.noBrokersAvailable();//没有任何节点可用
            } else
                findCoordinatorFuture = sendFindCoordinatorRequest(node);//写到unsent缓冲队列等待发送
        }
        return findCoordinatorFuture;
    }

    private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
        // initiate the group metadata request
        log.debug("Sending FindCoordinator request to broker {}", node);
        FindCoordinatorRequest.Builder requestBuilder =
                new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);//构造请求体
        return client.send(node, requestBuilder)
                     .compose(new FindCoordinatorResponseHandler());//发送unsent缓冲队列,并设置响应的回调handler
    }
   
    private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {

        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
            log.debug("Received FindCoordinator response {}", resp);
            clearFindCoordinatorFuture();//清掉findCoordinatorFuture,可以下次继续查找coordinator

            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
            Errors error = findCoordinatorResponse.error();
            if (error == Errors.NONE) {//如果响应成功
                synchronized (AbstractCoordinator.this) {
                    // use MAX_VALUE - node.id as the coordinator id to allow separate connections
                    // for the coordinator in the underlying network client layer
                    int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();

                    AbstractCoordinator.this.coordinator = new Node(
                            coordinatorConnectionId,
                            findCoordinatorResponse.node().host(),
                            findCoordinatorResponse.node().port());//重置coordinator节点的服务器信息
                    log.info("Discovered group coordinator {}", coordinator);
                    client.tryConnect(coordinator);//根据重置后的节点信息,重连该服务器的coordinator
                    heartbeat.resetSessionTimeout();//重置会话超时时间
                }
                future.complete(null);//done
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));//设置失败的异常
            } else {
                log.debug("Group coordinator lookup failed: {}", error.message());
                future.raise(error);//设置失败的异常
            }
        }

        @Override
        public void onFailure(RuntimeException e, RequestFuture<Void> future) {//响应失败
            clearFindCoordinatorFuture();//清掉findCoordinatorFuture,可以下次继续查找coordinator
            super.onFailure(e, future);//设置失败的异常
        }
    }

查找到coordinator节点后发起真正的网络请求,该请求会阻塞等待

client.poll(future, timer);//发送查找请求,该方法会阻塞,由第一个参数控制是否阻塞

之所以阻塞因为RequestFuture对象本身就实现了PollCondition接口,重写了shouldBlock方法

public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
    private static final Object INCOMPLETE_SENTINEL = new Object();
    private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
    ...
    public boolean isDone() {//判断result是否是INCOMPLETE_SENTINEL初始对象,如果不是则说明已经调用了下面的complete方法
        return result.get() != INCOMPLETE_SENTINEL;
    }
    ...
    public void complete(T value) {
        try {
            if (value instanceof RuntimeException)
                throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");

            if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))//设置result对象为value
                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
            fireSuccess();
        } finally {
            completedLatch.countDown();
        }
    }
    ...
    @Override
    public boolean shouldBlock() {//是否阻塞
        return !isDone();
    }
}

很明显,当请求响应之后回调了FindCoordinatorResponseHandler的onSuccess方法时会调用future.complete(null)方法,这时isDone才会返回true,如果isDone返回true,那么shouldBlock方法就会返回false

  1. 是否需要自行reblance
    kafka满足reblance的条件如下
  • 有新的消费者加入消费者组
  • 有消费者宕机下线,也有可能不一定真正的下线,例如因为FullGC,网络延迟等导致消费者长时间未向coordinator节点发送心跳,coordinator节点会认为该消费者已下线
  • 有消费者主动离开消费者组
  • 消费者组订阅的任一主题出现了分区数量的变化
  • 有消费者取消了对某一主题的订阅
    @Override
    public boolean rejoinNeededOrPending() {
        if (!subscriptions.partitionsAutoAssigned())//如果不是自动分配,返回false
            return false;

        // we need to rejoin if we performed the assignment and metadata has changed
        //如果已分配完分区的时候的元数据跟每次更新完的最新元数据的快照进行比对,如果发生了分区数量的变化则两者肯定不等
        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
            return true;

        // we need to join if our subscription has changed since the last join
        //如果最后一次订阅的主题跟现有最新的订阅主题发生了变化,比如该消费者又订阅了其他主题,那么两者肯定不等
        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
            return true;
        
        return super.rejoinNeededOrPending();//其他有可能发生reblance的情况    
    }

5.1 元数据不一致场景
当有消费者join到一个消费者组后会调用performAssignment分配分区并记录assignmentSnapshot为当前的分配分区成功后的元数据

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions) {
        ...
        assignmentSnapshot = metadataSnapshot;
        ...
    }

而每次刷新完新的metadata后都会尝试更新metadataSnapshot,来保存最新的元数据的快照

    //添加更新元数据的监听器
    private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener() {//更新元数据后回调
            @Override
            public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
                ...

                // check if there are any changes to the metadata which should trigger a rebalance
                if (subscriptions.partitionsAutoAssigned()) {//如果是自动分配
                    MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);//创建新的快照
                    if (!snapshot.equals(metadataSnapshot))//旧快照与新快照不等,则更新快照
                        metadataSnapshot = snapshot;
                }
                ...
            }
        });
    }

    private static class MetadataSnapshot {
        private final Map<String, Integer> partitionsPerTopic;
        ...
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            MetadataSnapshot that = (MetadataSnapshot) o;
            //每个主题下的分区进行对比
            return partitionsPerTopic != null ? partitionsPerTopic.equals(that.partitionsPerTopic) : that.partitionsPerTopic == null;
        }
        ...
    }

5.2 已订阅主题不一致场景
当有消费者join到一个消费者组后会调用onJoinComplete处理完成join的逻辑

    @Override
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
        ...
        Set<String> addedTopics = new HashSet<>();
        //this is a copy because its handed to listener below
        Set<TopicPartition> assignedPartitions = new HashSet<>(subscriptions.assignedPartitions());
        for (TopicPartition tp : assignedPartitions) {
            if (!joinedSubscription.contains(tp.topic()))//原有的已订阅的主题不包含新的以分配的主题
                addedTopics.add(tp.topic());
        }

        if (!addedTopics.isEmpty()) {//新订阅的主题不为空
            Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
            Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
            newSubscription.addAll(addedTopics);
            newJoinedSubscription.addAll(addedTopics);//合并新旧主题

            this.subscriptions.subscribeFromPattern(newSubscription);
            this.joinedSubscription = newJoinedSubscription;//设置当前已订阅的主题
        }
        ...
    }

很明显,当有新有新订阅的主题时会导致下面的判断为true

        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
            return true;

5.3 其他场景
对于其他场景有两个变量进行控制

    protected synchronized boolean rejoinNeededOrPending() {
        // if there's a pending joinFuture, we should try to complete handling it.
        return rejoinNeeded || joinFuture != null;
    }

rejoinNeeded可能发生的变化

以上几种场景都可能会将rejoinNeeded设置为true
另外当加入消费者组成功后会将rejoinNeeded设置为false

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    ...
    joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
        @Override
        public void onSuccess(ByteBuffer value) {
            // handle join completion in the callback so that the callback will be invoked
            // even if the consumer is woken up before finishing the rebalance
            synchronized (AbstractCoordinator.this) {
                log.info("Successfully joined group with generation {}", generation.generationId);
                state = MemberState.STABLE;
                rejoinNeeded = false;
                if (heartbeatThread != null)
                    heartbeatThread.enable();
                }
            }
        }
        ...
    }
    ...
}

joinFuture可能的变化
当需要加入消费者组的时候会构造该future,当其不为空的时候表示需要处理该future

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        // we store the join future in case we are woken up by the user after beginning the
        // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
        // to rejoin before the pending rebalance has completed.
        if (joinFuture == null) {
            joinFuture = sendJoinGroupRequest();
            ...
        }
    }

当该future响应回来后也会进行情况操作

    boolean joinGroupIfNeeded(final Timer timer) {
        while (rejoinNeededOrPending()) {
            ...
            final RequestFuture<ByteBuffer> future = initiateJoinGroup();
            ...

            if (future.succeeded()) {
                ...
                resetJoinGroupFuture();
                ...
            } else {
                ...
                resetJoinGroupFuture();
                ...
            }
        }
    }

    private synchronized void resetJoinGroupFuture() {
        this.joinFuture = null;
    }
  1. 如果是正则匹配的订阅模式,则会先尝试刷新元数据
                if (subscriptions.hasPatternSubscription()) {
                    // For consumer group that uses pattern-based subscription, after a topic is created,
                    // any consumer that discovers the topic after metadata refresh can trigger rebalance
                    // across the entire consumer group. Multiple rebalances can be triggered after one topic
                    // creation if consumers refresh metadata at vastly different times. We can significantly
                    // reduce the number of rebalances caused by single topic creation by asking consumer to
                    // refresh metadata before re-joining the group as long as the refresh backoff time has
                    // passed.
                    //按照上面的解释由于是正则匹配,如果有新的主题创建并且满足正则,则在这里先去更新元数据,消费者可以立马感知到,那么下面在下面的reblance时会对按照最新的元数据的分区进行分配,减少reblance的次数
                    if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
                        this.metadata.requestUpdate();
                    }

                    if (!client.ensureFreshMetadata(timer)) {
                        return false;
                    }
                }
  1. 激活消费者,也是真正执行reblance和心跳的地方
    boolean ensureActiveGroup(final Timer timer) {
        // always ensure that the coordinator is ready because we may have been disconnected
        // when sending heartbeats and does not necessarily require us to rejoin the group.
        if (!ensureCoordinatorReady(timer)) {//检查coordinator节点是否OK
            return false;
        }

        startHeartbeatThreadIfNeeded();//如果心跳线程没创建则创建并启动心跳线程
        return joinGroupIfNeeded(timer);//申请加入消费者组    
    }

    boolean joinGroupIfNeeded(final Timer timer) {
        while (rejoinNeededOrPending()) {//是否需要申请加入消费者组或正在加入中
            if (!ensureCoordinatorReady(timer)) {//检查coordinator节点是否OK
                return false;
            }

            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
            // time if the client is woken up before a pending rebalance completes. This must be called
            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
            // refresh which changes the matched subscription set) can occur while another rebalance is
            // still in progress.
            if (needsJoinPrepare) {//申请加入消费者组时是否需要做些准备工作,当加入失败时跳过该操作
                onJoinPrepare(generation.generationId, generation.memberId);
                needsJoinPrepare = false;
            }

            final RequestFuture<ByteBuffer> future = initiateJoinGroup();//发起加入消费者组请求到unsent缓冲队列
            client.poll(future, timer);//发起网络请求
            if (!future.isDone()) {//如果响应为处理返回false,每次轮询时都会检查是否响应已处理,知道Reblance处理完后返回true
                // we ran out of time
                return false;
            }

            if (future.succeeded()) {//加入成功
                // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                ByteBuffer memberAssignment = future.value().duplicate();
                //完成对申请加入消费者的处理
                onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);

                // We reset the join group future only after the completion callback returns. This ensures
                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                resetJoinGroupFuture();//清除future
                needsJoinPrepare = true;//重置needsJoinPrepare为true,这样下次申请请求加入时就会再次做准备加入工作
            } else {
                resetJoinGroupFuture();//清除future
                final RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())//如果不需要重试,抛出异常
                    throw exception;

                timer.sleep(retryBackoffMs);//休眠一段时间重试
            }
        }
        return true;
    }

其实kafka的rebalance分为2步:Join和Sync
1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定,且是在客户端,而非服务端。

2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        // we store the join future in case we are woken up by the user after beginning the
        // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
        // to rejoin before the pending rebalance has completed.
        if (joinFuture == null) {//如果joinFuture为空,发请求,这样确保只有请求完成后才会继续下一次申请加入消费者组的请求
            // fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
            // Note that this must come after the call to onJoinPrepare since we must be able to continue
            // sending heartbeats if that callback takes some time.
            disableHeartbeatThread();//暂停心跳

            state = MemberState.REBALANCING;//设置消费者协调器状态为REBALANCING状态
            joinFuture = sendJoinGroupRequest();//发送申请加入消费者组请求到unsent缓冲队列
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    synchronized (AbstractCoordinator.this) {
                        log.info("Successfully joined group with generation {}", generation.generationId);
                        state = MemberState.STABLE;//设置消费者协调器状态为STABLE状态,表示加入成功
                        rejoinNeeded = false;//设置是否需要重新申请加入消费者组为false

                        if (heartbeatThread != null)
                            heartbeatThread.enable();//恢复心跳
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                    synchronized (AbstractCoordinator.this) {
                        state = MemberState.UNJOINED;//失败则设置消费者协调器状态为UNJOINED状态,表示未加入消费者组
                    }
                }
            });
        }
        return joinFuture;
    }

    RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())//确保coordinator节点没有宕机
            return RequestFuture.coordinatorNotAvailable();

        // send a join group request to the coordinator
        log.info("(Re-)joining group");
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                groupId,
                this.sessionTimeoutMs,
                this.generation.memberId,
                protocolType(),
                metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);//构造申请加入消费者组的请求体

        log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);

        // Note that we override the request timeout using the rebalance timeout since that is the
        // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.

        int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
        return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                .compose(new JoinGroupResponseHandler());//发送到unsent缓冲队列,设置JoinGroupResponseHandler作为响应回调handler
    }


    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = joinResponse.error();
            if (error == Errors.NONE) {//正常响应情况
                log.debug("Received successful JoinGroup response: {}", joinResponse);
                sensors.joinLatency.record(response.requestLatencyMs());

                synchronized (AbstractCoordinator.this) {
                    if (state != MemberState.REBALANCING) {//如果在reblance还没有完成前,消费者主动离开消费者组,那么这里注解给出异常
                        // if the consumer was woken up before a rebalance completes, we may have already left
                        // the group. In this case, we do not want to continue with the sync group.
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
                                joinResponse.memberId(), joinResponse.groupProtocol());
                        //下面的chain方法会调用future的complete方法,这样在joinGroupIfNeeded方法调用future的isDone方法时会返回true,也就是真正标记完成了reblance的操作,这里的chain是当Sync请求响应后会已拦截器的发送回调future中的complete方法
                        if (joinResponse.isLeader()) {//如果是消费者组的leader
                            onJoinLeader(joinResponse).chain(future);//发起附带分配方案的Sync请求,,等待最终的分配到分区
                        } else {//如果是非消费者组的leader
                            onJoinFollower().chain(future);//发起Sync请求,等待最终的分配到分区
                        }
                    }
                }
            //下面都是各种异常的处理
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
                // backoff and retry
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                // reset the member id and retry immediately
                resetGeneration();
                log.debug("Attempt to join group failed due to unknown member id.");
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                // re-discover the coordinator and retry with backoff
                markCoordinatorUnknown();
                log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                    || error == Errors.INVALID_SESSION_TIMEOUT
                    || error == Errors.INVALID_GROUP_ID) {
                // log the error and re-throw the exception
                log.error("Attempt to join group failed due to fatal error: {}", error.message());
                future.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                // unexpected error, throw the exception
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }
    }

    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {//leader
        try {
            // perform the leader synchronization and send back the assignment for the group
            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                    joinResponse.members());//定制分配分区方案

            SyncGroupRequest.Builder requestBuilder =
                    new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);//构造sync请求体,最后一个参数为分配分区的方案
            log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
            return sendSyncGroupRequest(requestBuilder);//发送sync请求
        } catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

    private RequestFuture<ByteBuffer> onJoinFollower() {//follower
        // send follower's sync group with an empty assignment
        SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
                        Collections.<String, ByteBuffer>emptyMap());//构造sync请求体,最后一个参数为空方案
        log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
        return sendSyncGroupRequest(requestBuilder);//发送sync请求
    }

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions) {//定制分区方案
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);//根据分区分配策略找出对应的分区器
        if (assignor == null)//找不到抛出异常
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

        Set<String> allSubscribedTopics = new HashSet<>();//记录所有主题
        Map<String, Subscription> subscriptions = new HashMap<>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {//将服务器返回所有组成员订阅的主题反序列化出来
            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }

        // the leader will begin watching for changes to any of the topics the group is interested in,
        // which ensures that all metadata changes will eventually be seen
        this.subscriptions.groupSubscribe(allSubscribedTopics);//只有leader才能定制分配方案,所以这里也只有leader才能保存组内所有消费者的订阅的主题的分配情况,由groupSubscription成员变量保存
        metadata.setTopics(this.subscriptions.groupSubscription());//通知元数据,有新成员加入组,需要刷新元数据

        // update metadata (if needed) and keep track of the metadata used for assignment so that
        // we can check after rebalance completion whether anything has changed
        if (!client.ensureFreshMetadata(time.timer(Long.MAX_VALUE))) throw new TimeoutException();//等待刷新元数据,超时抛异常

        isLeader = true;//设置该消费者为leader,只有leader才能进入到此方法

        log.debug("Performing assignment using strategy {} with subscriptions {}", assignor.name(), subscriptions);

        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);//根据获取到的分区器进行分配

        // user-customized assignor may have created some topics that are not in the subscription list
        // and assign their partitions to the members; in this case we would like to update the leader's
        // own metadata with the newly added topics so that it will not trigger a subsequent rebalance
        // when these topics gets updated from metadata refresh.
        //
        // TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol
        //       we may need to modify the PartitionAssignor API to better support this case.
        Set<String> assignedTopics = new HashSet<>();//记录了已分配出去的主题
        for (Assignment assigned : assignment.values()) {
            for (TopicPartition tp : assigned.partitions())//循环已分配的分区
                assignedTopics.add(tp.topic());//将该分区中主题塞到assignedTopics
        }

        if (!assignedTopics.containsAll(allSubscribedTopics)) {//如果已分配的主题中不包含所有的主题,那么肯定有一部分主题没有分配出去
            Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
            notAssignedTopics.removeAll(assignedTopics);//算出为分配的主题,将已提醒的方式打印到日志中
            log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
        }

        if (!allSubscribedTopics.containsAll(assignedTopics)) {//如果所有的主题中不包含已分配的主题,那么有未订阅的主题被分配了,这种情况应该是正则匹配动态分配分区情况???
            Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
            newlyAddedTopics.removeAll(allSubscribedTopics);//算出那些未订阅但已分配的主题,并打印日志
            log.info("The following not-subscribed topics are assigned, and their metadata will be " +
                    "fetched from the brokers: {}", newlyAddedTopics);

            allSubscribedTopics.addAll(assignedTopics);//将这部分也添加到所有订阅的主题中
            this.subscriptions.groupSubscribe(allSubscribedTopics);//从新更新groupSubscription变量
            metadata.setTopics(this.subscriptions.groupSubscription());//从新通知元数据,刷新元数据
            if (!client.ensureFreshMetadata(time.timer(Long.MAX_VALUE))) throw new TimeoutException();//等待刷新元数据,超时抛异常

        }

        assignmentSnapshot = metadataSnapshot;//将新的分配分区的元数据的快照赋给assignmentSnapshot保存

        log.debug("Finished assignment for group: {}", assignment);

        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {//序列化分配方案,用于网络发送
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }

        return groupAssignment;
    }


  1. 完成分配结果的处理
    @Override
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
        // only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
        if (!isLeader)//非leader
            assignmentSnapshot = null;//设置assignmentSnapshot为null,也就是leader才需要维护assignmentSnapshot

        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);//根据分区策略获取对应的分区器
        if (assignor == null)//如果获取到分区器为空,抛异常
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);//反序服务器响应的分区结果,这里意味这每个消费者都已经从知道了自己的分区结果
        subscriptions.assignFromSubscribed(assignment.partitions());//将分配结果保存到订阅状态器中,由订阅状态器SubscriptionState的assignment变量进行维护

        // check if the assignment contains some topics that were not in the original
        // subscription, if yes we will obey what leader has decided and add these topics
        // into the subscriptions as long as they still match the subscribed pattern
        //
        // TODO this part of the logic should be removed once we allow regex on leader assign
        Set<String> addedTopics = new HashSet<>();
        //this is a copy because its handed to listener below
        Set<TopicPartition> assignedPartitions = new HashSet<>(subscriptions.assignedPartitions());
        for (TopicPartition tp : assignedPartitions) {
            if (!joinedSubscription.contains(tp.topic()))//原有的已订阅的主题不包含新的以分配的主题
                addedTopics.add(tp.topic());//记录新增加的主题
        }

        if (!addedTopics.isEmpty()) {//新订阅的主题不为空
            Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
            Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
            newSubscription.addAll(addedTopics);
            newJoinedSubscription.addAll(addedTopics);//合并新旧主题

            this.subscriptions.subscribeFromPattern(newSubscription);
            this.joinedSubscription = newJoinedSubscription;//设置当前已订阅的主题
        }

        // Update the metadata to include the full group subscription. The leader will trigger a rebalance
        // if there are any metadata changes affecting any of the consumed partitions (whether or not this
        // instance is subscribed to the topics).
        this.metadata.setTopics(subscriptions.groupSubscription());//对于leader来说有可能因为组内的消费者的主题发生变化而导致需要更新metadata

        // give the assignor a chance to update internal state based on the received assignment
        assignor.onAssignment(assignment);//钩子方法,用户自定义分配完分区后的回调

        // reschedule the auto commit starting from now
        if (autoCommitEnabled)//如果是自动commit
            this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);//更新自动提交的时间

        // execute the user's callback after rebalance
        ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
        log.info("Setting newly assigned partitions {}", assignedPartitions);
        try {
            listener.onPartitionsAssigned(assignedPartitions);//reblance后的回调,默认空实现,由用户自定义实现
        } catch (WakeupException | InterruptException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} failed on partition assignment", listener.getClass().getName(), e);
        }
    }
  1. 是否需要自动commit
    public void maybeAutoCommitOffsetsAsync(long now) {
        if (autoCommitEnabled) {//如果是自动commit
            nextAutoCommitTimer.update(now);//更新当前时间,判断是否过期
            if (nextAutoCommitTimer.isExpired()) {//是否到了自动commit的时间,也就是自动commit时间过期
                nextAutoCommitTimer.reset(autoCommitIntervalMs);//重置下一次自动commit的时间
                doAutoCommitOffsetsAsync();//执行commit操作
            }
        }
    }

    private void doAutoCommitOffsetsAsync() {
        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();//获取所有已消费的偏移量
        log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);

        commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {//提交偏移量
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    if (exception instanceof RetriableException) {
                        log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
                                exception);
                        nextAutoCommitTimer.updateAndReset(retryBackoffMs);//重试
                    } else {
                        log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
                    }
                } else {
                    log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
                }
            }
        });
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        invokeCompletedOffsetCommitCallbacks();//提交偏移量时回调方法,默认为空实现

        if (!coordinatorUnknown()) {//如果coordinator节点没宕机
            doCommitOffsetsAsync(offsets, callback);//发起提交偏移量请求
        } else {
            // we don't know the current coordinator, so try to find it and then send the commit
            // or fail (we don't want recursive retries which can cause offset commits to arrive
            // out of order). Note that there may be multiple offset commits chained to the same
            // coordinator lookup request. This is fine because the listeners will be invoked in
            // the same order that they were added. Note also that AbstractCoordinator prevents
            // multiple concurrent coordinator lookup requests.
            pendingAsyncCommits.incrementAndGet();//记录等待提交commit次数加1,close的时候会处理掉这些待提交的commit
            lookupCoordinator().addListener(new RequestFutureListener<Void>() {//重新查找一个新的coordinator后再次提交
                @Override
                public void onSuccess(Void value) {
                    pendingAsyncCommits.decrementAndGet();//记录等待提交commit次数减1
                    doCommitOffsetsAsync(offsets, callback);//发起提交偏移量请求
                    client.pollNoWakeup();//不可中断的发送网络IO
                }

                @Override
                public void onFailure(RuntimeException e) {
                    pendingAsyncCommits.decrementAndGet();//记录等待提交commit次数减1
                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
                            new RetriableCommitFailedException(e)));//将异常结果放到completedOffsetCommits队列
                }
            });
        }

        // ensure the commit has a chance to be transmitted (without blocking on its completion).
        // Note that commits are treated as heartbeats by the coordinator, so there is no need to
        // explicitly allow heartbeats through delayed task execution.
        client.pollNoWakeup();//不可中断的发送网络IO
    }

    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);//构造请求并发送unsent缓冲队列
        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;//回调方法,默认是DefaultOffsetCommitCallback,只显示了提交异常时打印日志
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);//拦截器回调

                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
            }

            @Override
            public void onFailure(RuntimeException e) {
                Exception commitException = e;

                if (e instanceof RetriableException)
                    commitException = new RetriableCommitFailedException(e);

                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
            }
        });
    }

可以看到commit时都会将其放到completedOffsetCommits缓冲队列中,这个队列只有在invokeCompletedOffsetCommitCallbacks方法中被消费

    void invokeCompletedOffsetCommitCallbacks() {
        while (true) {
            OffsetCommitCompletion completion = completedOffsetCommits.poll();
            if (completion == null)
                break;
            completion.invoke();//回调,默认是DefaultOffsetCommitCallback,只有提交异常时打印日志,用户可以自定义回调方法覆盖它实现自己的逻辑
        }
    }

    private static class OffsetCommitCompletion {
        private final OffsetCommitCallback callback;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final Exception exception;

        private OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            this.callback = callback;
            this.offsets = offsets;
            this.exception = exception;
        }

        public void invoke() {
            if (callback != null)
                callback.onComplete(offsets, exception);
        }
    }

    private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null)//提交异常时打印日志
                log.error("Offset commit with offsets {} failed", offsets, exception);
        }
    }
⚠️ **GitHub.com Fallback** ⚠️