Producer Metadata 数据结构与读取、更新策略 - tenji/ks GitHub Wiki

Producer Metadata 数据结构与读取、更新策略

一、Metadata 与 Cluster 数据结构

Metadata这个类被 client 线程和后台 sender 所共享,它只保存了所有 topic 的部分数据,当我们请求一个它上面没有的 topic meta 时,它会通过发送 metadata update 来更新 meta 信息,如果 topic meta 过期策略是允许的,那么任何 topic 过期的话都会被从集合中移除。

public final class Metadata {
    private static final Logger log = LoggerFactory.getLogger(Metadata.class);

    public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
    private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;

    private final long refreshBackoffMs; // metadata 更新失败时,为避免频繁更新 meta,最小的间隔时间,默认 100ms
    private final long metadataExpireMs; // metadata 的过期时间, 默认 60,000ms
    private int version; // 每更新成功1次,version自增1,主要是用于判断 metadata 是否更新
    private long lastRefreshMs; // 最近一次更新时的时间(包含更新失败的情况)
    private long lastSuccessfulRefreshMs; // 最近一次成功更新的时间(如果每次都成功的话,与前面的值相等, 否则,lastSuccessulRefreshMs < lastRefreshMs)
    private Cluster cluster; // 集群中一些 topic 的信息
    private boolean needUpdate; // 是都需要更新 metadata
    /* Topics with expiry time */
    private final Map<String, Long> topics; // topic 与其过期时间的对应关系
    private final List<Listener> listeners; // 事件监控者
    private final ClusterResourceListeners clusterResourceListeners; //当接收到 metadata 更新时, ClusterResourceListeners的列表
    private boolean needMetadataForAllTopics; // 是否强制更新所有的 metadata
    private final boolean topicExpiryEnabled; // 默认为 true, Producer 会定时移除过期的 topic,consumer 则不会移除
}

关于 topic 的详细信息(leader 所在节点、replica 所在节点、isr 列表)都是在Cluster实例中保存的。

// 并不是一个全集,metadata的主要组成部分
public final class Cluster {

    // 从命名直接就看出了各个变量的用途
    private final boolean isBootstrapConfigured;
    private final List<Node> nodes; // node 列表
    private final Set<String> unauthorizedTopics; // 未认证的 topic 列表
    private final Set<String> internalTopics; // 内置的 topic 列表
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; // partition 的详细信息
    private final Map<String, List<PartitionInfo>> partitionsByTopic; // topic 与 partition 的对应关系
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; //  可用(leader 不为 null)的 topic 与 partition 的对应关系
    private final Map<Integer, List<PartitionInfo>> partitionsByNode; // node 与 partition 的对应关系
    private final Map<Integer, Node> nodesById; // node 与 id 的对应关系
    private final ClusterResource clusterResource;
}


// org.apache.kafka.common.PartitionInfo
// topic-partition: 包含 topic、partition、leader、replicas、isr
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
}

Cluster实例主要是保存:

  1. broker.id 与node的对应关系;
  2. topic 与 partition (PartitionInfo)的对应关系;
  3. node 与 partition (PartitionInfo)的对应关系。

二、Metadata 更新流程

Producer 在调用dosend()方法时,第一步就是通过waitOnMetadata方法获取该 topic 的 metadata 信息。

// 等待 metadata 的更新
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    metadata.add(topic); // 在 metadata 中添加 topic 后,如果 metadata 中没有这个 topic 的 meta,那么 metadata 的更新标志设置为了 true
    Cluster cluster = metadata.fetch();
    Integer partitionsCount = cluster.partitionCountForTopic(topic); // 如果 topic 已经存在 meta 中,则返回该 topic 的 partition 数,否则返回 null

    // 当前 metadata 中如果已经有这个 topic 的 meta 的话,就直接返回
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;

    // 发送 metadata 请求,直到获取了这个 topic 的 metadata 或者请求超时
    do {
        log.trace("Requesting metadata update for topic {}.", topic);
        int version = metadata.requestUpdate(); // 返回当前版本号,初始值为0,每次更新时会自增,并将 needUpdate 设置为 true
        sender.wakeup();// 唤起 sender,发送 metadata 请求
        try {
            metadata.awaitUpdate(version, remainingWaitMs); // 等待 metadata 的更新
        } catch (TimeoutException ex) {
            // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        }
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs)
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); // 超时
        if (cluster.unauthorizedTopics().contains(topic)) // 认证失败,对当前 topic 没有 Write 权限
            throw new TopicAuthorizationException(topic);
        remainingWaitMs = maxWaitMs - elapsed;
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null); // 不停循环,直到 partitionsCount 不为 null(即直到 metadata 中已经包含了这个 topic 的相关信息)

    if (partition != null && partition >= partitionsCount) {
        throw new KafkaException(
                String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
    }

    return new ClusterAndWaitTime(cluster, elapsed);
}

如果 metadata 中不存在这个 topic 的 metadata,那么就请求更新 metadata,如果 metadata 没有更新的话,方法就一直处在 do ... while 的循环之中,在循环之中,主要做以下操作:

  1. metadata.requestUpdate() 将 metadata 的 needUpdate 变量设置为 true(强制更新),并返回当前的版本号(version),通过版本号来判断 metadata 是否完成更新;
  2. sender.wakeup() 唤醒 sender 线程,sender 线程又会去唤醒 NetworkClient 线程,NetworkClient 线程进行一些实际的操作(后面详细介绍);
  3. metadata.awaitUpdate(version, remainingWaitMs) 等待 metadata 的更新。
// 更新 metadata 信息(根据当前 version 值来判断)
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
    if (maxWaitMs < 0) {
        throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
    }
    long begin = System.currentTimeMillis();
    long remainingWaitMs = maxWaitMs;
    while (this.version <= lastVersion) { // 不断循环,直到 metadata 更新成功,version 自增
        if (remainingWaitMs != 0)
            wait(remainingWaitMs); // 阻塞线程,等待 metadata 的更新
        long elapsed = System.currentTimeMillis() - begin;
        if (elapsed >= maxWaitMs) // timeout
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        remainingWaitMs = maxWaitMs - elapsed;
    }
}

在 Metadata.awaitUpdate() 方法中,线程会阻塞在 while 循环中,直到 metadata 更新成功或者 timeout。

从前面可以看出,此时 Producer 线程会阻塞在两个 while 循环中,直到 metadata 信息更新,那么 metadata 是如何更新的呢?如果有印象的话,前面应该已经介绍过了,主要是通过 sender.wakeup() 来唤醒 sender 线程,间接唤醒 NetworkClient 线程,NetworkClient 线程来负责发送 Metadata 请求,并处理 Server 端的响应。

sender 线程会调用 NetworkClient.poll() 方法进行实际的操作,其源码如下:

public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now); // 判断是否需要更新 meta,如果需要就更新(请求更新 metadata 的地方)
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleAbortedSends(responses);
    handleCompletedSends(responses, updatedNow); // 通过 selector 中获取 Server 端的 response
    handleCompletedReceives(responses, updatedNow); // 在返回的 handler 中,会处理 metadata 的更新
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);

    // invoke callbacks
    for (ClientResponse response : responses) {
        try {
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }
    return responses;
}

在这个方法中,主要会以下操作:

  • metadataUpdater.maybeUpdate(now):判断是否需要更新 Metadata,如果需要更新的话,先与 Broker 建立连接,然后发送更新 metadata 的请求;
  • 处理 Server 端的一些响应,这里主要讨论的是 handleCompletedReceives(responses, updatedNow) 方法,它会处理 Server 端返回的 Metadata 结果。

先看一下 metadataUpdater.maybeUpdate() 的具体实现:

public long maybeUpdate(long now) {
    // should we update our metadata?
    // metadata 是否应该更新
    long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);// metadata 下次更新的时间(需要判断是强制更新还是 metadata 过期更新,前者是立马更新,后者是计算 metadata 的过期时间)
    // 如果一条 metadata 的 fetch 请求还未从 server 收到恢复,那么时间设置为 waitForMetadataFetch(默认30s)
    long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;

    long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
    if (metadataTimeout > 0) { // 时间未到时,直接返回下次应该更新的时间
        return metadataTimeout;
    }

    Node node = leastLoadedNode(now); // 选择一个连接数最小的节点
    if (node == null) {
        log.debug("Give up sending metadata request since no node is available");
        return reconnectBackoffMs;
    }

    return maybeUpdate(now, node); // 可以发送 metadata 请求的话,就发送 metadata 请求
}

/**
 * Add a metadata request to the list of sends if we can make one
 */
// 判断是否可以发送请求,可以的话将 metadata 请求加入到发送列表中
private long maybeUpdate(long now, Node node) {
    String nodeConnectionId = node.idString();

    if (canSendRequest(nodeConnectionId)) { // 通道已经 ready 并且支持发送更多的请求
        this.metadataFetchInProgress = true; // 准备开始发送数据,将 metadataFetchInProgress 置为 true
        MetadataRequest.Builder metadataRequest; // 创建 metadata 请求
        if (metadata.needMetadataForAllTopics()) // 强制更新所有 topic 的 metadata(虽然默认不会更新所有 topic 的 metadata 信息,但是每个 Broker 会保存所有 topic 的 meta 信息)
            metadataRequest = MetadataRequest.Builder.allTopics();
        else // 只更新 metadata 中的 topics 列表(列表中的 topics 由 metadata.add() 得到)
            metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));


        log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
        sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); // 发送 metadata 请求
        return requestTimeoutMs;
    }

    // If there's any connection establishment underway, wait until it completes. This prevents
    // the client from unnecessarily connecting to additional nodes while a previous connection
    // attempt has not been completed.
    if (isAnyNodeConnecting()) { // 如果 client 正在与任何一个 node 的连接状态是 connecting,那么就进行等待
        // Strictly the timeout we should return here is "connect timeout", but as we don't
        // have such application level configuration, using reconnect backoff instead.
        return reconnectBackoffMs;
    }

    if (connectionStates.canConnect(nodeConnectionId, now)) { // 如果没有连接这个 node,那就初始化连接
        // we don't have a connection to this node right now, make one
        log.debug("Initialize connection to node {} for sending metadata request", node.id());
        initiateConnect(node, now); // 初始化连接
        return reconnectBackoffMs;
    }
    return Long.MAX_VALUE;
}

// 发送 Metadata 请求   
private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
                                     String nodeConnectionId, long now) {
    ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);// 创建 metadata 请求
    doSend(clientRequest, true, now);
}

所以,每次 Producer 请求更新 metadata 时,会有以下几种情况:

  1. 如果 node 可以发送请求,则直接发送请求;
  2. 如果该 node 正在建立连接,则直接返回;
  3. 如果该 node 还没建立连接,则向 broker 初始化链接。

而 KafkaProducer 线程之前是一直阻塞在两个 while 循环中,直到 metadata 更新

  1. sender 线程第一次调用 poll() 方法时,初始化与 node 的连接;
  2. sender 线程第二次调用 poll() 方法时,发送 Metadata 请求;
  3. sender 线程第三次调用 poll() 方法时,获取 metadataResponse,并更新 metadata。

经过上述 sender 线程三次调用 poll()方法,所请求的 metadata 信息才会得到更新,此时 Producer 线程也不会再阻塞,开始发送消息。

NetworkClient 接收到 Server 端对 Metadata 请求的响应后,更新 Metadata 信息。

// 处理任何已经完成的接收响应
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        InFlightRequest req = inFlightRequests.completeNext(source);
        AbstractResponse body = parseResponse(receive.payload(), req.header);
        log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body);
        if (req.isInternalRequest && body instanceof MetadataResponse) // 如果是 meta 响应
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); // 如果是其他响应
        else
            responses.add(req.completed(body, now));
    }
}

// 处理 Server 端对 Metadata 请求处理后的 response
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
    this.metadataFetchInProgress = false;
    Cluster cluster = response.cluster();
    // check if any topics metadata failed to get updated
    Map<String, Errors> errors = response.errors();
    if (!errors.isEmpty())
        log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);

    // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
    // created which means we will get errors and no nodes until it exists
    if (cluster.nodes().size() > 0) {
        this.metadata.update(cluster, now); // 更新 meta 信息
    } else { // 如果 metadata 中 node 信息无效,则不更新 metadata 信息
        log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
        this.metadata.failedUpdate(now);
    }
}

三、Metadata 失效检测

  • initiateConnect 失败时

org.apache.kafka.clients.NetworkClient类:

private void initiateConnect(Node node, long now) {
    String nodeConnectionId = node.idString();
    try {
        log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
        this.connectionStates.connecting(nodeConnectionId, now);
        selector.connect(nodeConnectionId,
                         new InetSocketAddress(node.host(), node.port()),
                         this.socketSendBuffer,
                         this.socketReceiveBuffer);
    } catch (IOException e) {
        /* attempt failed, we'll try again after the backoff */
        connectionStates.disconnected(nodeConnectionId, now);
        /* maybe the problem is our metadata, update it */
        metadataUpdater.requestUpdate(); // 尝试更新 Metadata
        log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
    }
}
  • Poll 有连接断开时

org.apache.kafka.clients.NetworkClient类:

private void handleDisconnections(List<ClientResponse> responses, long now) {
    for (String node : this.selector.disconnected()) {
        log.debug("Node {} disconnected.", node);
        processDisconnection(responses, node, now);
    }
    // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
    if (this.selector.disconnected().size() > 0)
        metadataUpdater.requestUpdate(); // 尝试更新 Metadata
}
  • Poll 有连接超时时

org.apache.kafka.clients.NetworkClient类:

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
    List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
    for (String nodeId : nodeIds) {
        // close connection to the node
        this.selector.close(nodeId);
        log.debug("Disconnecting from node {} due to request timeout.", nodeId);
        processDisconnection(responses, nodeId, now);
    }

    // we disconnected, so we should probably refresh our metadata
    if (!nodeIds.isEmpty())
        metadataUpdater.requestUpdate(); // 尝试更新 Metadata
}
  • 生产时有部分 Partition 的 Leader 找不到

org.apache.kafka.clients.producer.internals.Sender类:

void run(long now) {
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate(); // 尝试更新 Metadata
    }
  • 返回的 Response 有异常时

org.apache.kafka.clients.producer.internals.Sender类:

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
    int correlationId = response.requestHeader().correlationId();
    if (response.wasDisconnected()) {
        log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
        for (RecordBatch batch : batches.values())
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
    } else if (response.versionMismatch() != null) {
        log.warn("Cancelled request {} due to a version mismatch with node {}",
                response, response.destination(), response.versionMismatch());
        for (RecordBatch batch : batches.values())
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST), correlationId, now);
    } else {
        ...
    }
}
private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now) {
    Errors error = response.error;
    if (error != Errors.NONE && canRetry(batch, error)) {
        // retry
        log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                 correlationId,
                 batch.topicPartition,
                 this.retries - batch.attempts - 1,
                 error);
        this.accumulator.reenqueue(batch, now);
        this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
    } else {
        RuntimeException exception;
        if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
            exception = new TopicAuthorizationException(batch.topicPartition.topic());
        else
            exception = error.exception();
        // tell the user the result of their request
        batch.done(response.baseOffset, response.logAppendTime, exception);
        this.accumulator.deallocate(batch);
        if (error != Errors.NONE)
            this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
    }
    if (error.exception() instanceof InvalidMetadataException) {
        if (error.exception() instanceof UnknownTopicOrPartitionException)
            log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                    "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
        metadata.requestUpdate(); // 尝试更新 Metadata
    }

    // Unmute the completed partition.
    if (guaranteeMessageOrder)
        this.accumulator.unmutePartition(batch.topicPartition);
}

总而言之,发生各式各样的异常,数据不同步,都认为 metadata 可能出问题了,都尝试去更新。

另外,Metadata 更新的特点:

  1. 更新请求 MetadataRequest 是 nio 异步发送的,在 poll 的返回中,处理 MetadataResponse 的时候,才真正更新 Metadata;
  2. 更新的时候,是从 metadata 保存的所有 Node,或者说 Broker 中,选负载最小的那个,也就是当前接收请求最少的那个。向其发送 MetadataRequest 请求,获取新的 Cluster 对象。

参考链接

⚠️ **GitHub.com Fallback** ⚠️