生产者——元数据 - 969251639/study GitHub Wiki
在创建kafka的生产者时会去生成metadata
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
}
创建Metadata的参数为
retryBackoffMs=retry.backoff.ms//更新metadata前等待时间
metadataExpireMs=metadata.max.age.ms//metadata的有效时间
allowAutoTopicCreation=true//允许自动创建topic,如果topic不存在
topicExpiryEnabled=true//主题允许过期???
clusterResourceListeners//资源监听器???
看下Metadata的构造函数
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;//刷新时间
this.metadataExpireMs = metadataExpireMs;//元数据过期时间,用于周期性更新元数据,由生产者的metadata.max.age.ms参数控制
this.allowAutoTopicCreation = allowAutoTopicCreation;//是否允许自动创建主题
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;//最后刷新元数据的时间
this.lastSuccessfulRefreshMs = 0L;//最后一次刷新成功的时间
this.version = 0;//当前版本,没刷新一次递增
this.cluster = Cluster.empty();//保存集群数据
this.needUpdate = false;//是否需要更新元数据的标识
this.topics = new HashMap<>();//获取到的集群主题
this.listeners = new ArrayList<>();//更新metadata时监听器
this.clusterResourceListeners = clusterResourceListeners;//资源监听器???
this.needMetadataForAllTopics = false;//主题允许过期???
this.isClosed = false;//是否已关闭
}
生产者创建完metadata之后会立马去调用metadata的update方法
public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
Objects.requireNonNull(newCluster, "cluster should not be null");
if (isClosed())//关闭抛异常
throw new IllegalStateException("Update requested after metadata close");
this.needUpdate = false;//是否需要更新
this.lastRefreshMs = now;//当前毫秒时间
this.lastSuccessfulRefreshMs = now;//当前毫秒时间
this.version += 1;//版本号单调递增
//?处理过期的主题???
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Long> entry = it.next();
long expireMs = entry.getValue();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
entry.setValue(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
it.remove();
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
}
}
}
for (Listener listener: listeners)//回调监听器的回调方法
listener.onMetadataUpdate(newCluster, unavailableTopics);
String previousClusterId = cluster.clusterResource().clusterId();
if (this.needMetadataForAllTopics) {//更新所有metadata?
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(newCluster);
} else {
this.cluster = newCluster;
}
// The bootstrap cluster is guaranteed not to have any useful information
if (!newCluster.isBootstrapConfigured()) {//回调资源监听器?
String newClusterId = newCluster.clusterResource().clusterId();
if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
log.info("Cluster ID: {}", newClusterId);
clusterResourceListeners.onUpdate(newCluster.clusterResource());
}
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
update的方法主要是更新metadata,也就是每次更新完metadata后都会调用update方法更新元数据,每次更新后都会让下面的四个成员变量来记录更新状态
this.needUpdate = false;//是否需要更新
this.lastRefreshMs = now;//记录最后一次刷新时间
this.lastSuccessfulRefreshMs = now;//记录最后一次成功刷新时间
this.version += 1;//版本号加1
然后最重要的一步是更新cluster这个集群信息的成员变量
this.cluster = newCluster;
最初始的时候在KafkaProducer的构造方法中是通过Cluster的bootstrap方法构造一个最初始的集群信息
public static Cluster bootstrap(List<InetSocketAddress> addresses) {
List<Node> nodes = new ArrayList<>();
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
return new Cluster(null, true, nodes, new ArrayList<>(0),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null);
}
metadata有一个很重要的参数是needUpdate用来控制metadata是否需要刷新,它主要由下面的方法控制为需要刷新metadata
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
那么metadata刚开始为空,它又是在什么时候刷新元数据到本地呢?kafka是通过第一次发送消息的时候,也就是kafka是懒加载的方式去加载元数据,在KafkaProducer的doSend方法中,会调用waitOnMetadata方法去阻塞等待元数据的响应
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
...
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
...
}
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();//获取集群信息
if (cluster.invalidTopics().contains(topic))//检查主题是否有效,默认invalidTopics为空
throw new InvalidTopicException(topic);
metadata.add(topic);//添加该主题到metadata
//根据主题获取主题下所有的分区的数量,刚开始返回的也是null
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
//如果获取到的分区数不为空且(用户没有指定分区号或分区号小于分区数)
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();//开始时间
long remainingWaitMs = maxWaitMs;//发送消息最大阻塞时间
long elapsed;//记录每次跟服务器交互锁消耗的时间
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
//根据分区号输出日志
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);//添加该主题到metadata
int version = metadata.requestUpdate();//返回当前版本号,初始值为0,每次更新时会自增,并将needUpdate设置为true
sender.wakeup();//唤起 sender,发送获取metadata请求
try {
//等待metadata的更新
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();//重新获取集群信息
elapsed = time.milliseconds() - begin;//broker消息返回后的时间减去开始时间得到剩余时间
if (elapsed >= maxWaitMs) {//是否超时
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
if (cluster.unauthorizedTopics().contains(topic))//认证失败,对当前 topic 没有 Write 权限
throw new TopicAuthorizationException(topic);
if (cluster.invalidTopics().contains(topic))//认证失败,topic是无效的
throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;//计算剩余的超时时间,下次循环用
partitionsCount = cluster.partitionCountForTopic(topic);//重新获取主题下的分区数
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));//如果该主题下的分区数为空或者(分区不为空且分区号大于等于分区数),重新循环获取元数据
return new ClusterAndWaitTime(cluster, elapsed);//elapsed表示跟服务器交互消耗掉的时间
}
接下来重点分析waitOnMetadata这个方法里面具体流程
waitOnMetadata有三个参数,分别是
String topic:主题
Integer partition:分区号
long maxWaitMs:最大阻塞时间
最开始先获取集群信息,然后检查有效性
Cluster cluster = metadata.fetch();//获取集群信息
if (cluster.invalidTopics().contains(topic))//检查主题是否有效,默认invalidTopics为空
throw new InvalidTopicException(topic);
metadata.add(topic);//添加该主题到metadata
最重要的是会将主题添加到metadata中缓存, metadata的add方法很有讲究
public synchronized void add(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");//参数topic不能为空
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
requestUpdateForNewTopics();
}
}
topics是HashMap结构,非线程安全,所以这个方法会上锁,然后里面调用put方法,key为topic,value为TOPIC_EXPIRY_NEEDS_UPDATE,是个定值,也就是metadata的topic缓存主要是用HashMap的key,然而HashMap的put方法如果是覆盖,那么会返回旧的value,否则返回null,所以如果第一次添加主题到metadata是,put返回null,那么会调用requestUpdateForNewTopics方法,里面会调用requestUpdate方法更新needUpdate变量为true,也就是说如果topics缓存没有,新增的topic都会使kafka去更新metadata,否则返回的是不等于null,则表示topics缓存已经存在,不需要更新metadata
private synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0;
requestUpdate();
}
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
总结起来就是如果put的时候是覆盖,也就是已经有了该主题在metadata中,那么会返回旧的值,这是不等于null不会设置needUpdate = true;
接下来会根据分区信息来判断是否需要真正的去更新matedata
//根据主题获取主题下所有的分区的数量,刚开始返回的也是null
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
//如果获取到的分区数不为空且(用户没有指定分区号或分区号小于分区数)
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
通过集群获取该主题下的所有分区数量,然后进行以下判断
如果该主题的分区数量不为空(已有分区了)且(分区号为空或者分区号小于分区数量)则表示broker的现有分区结构没有发生变化,不需要去更新matedata,直接返回现有的集群信息,且消耗的时间为0
private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) {
this.cluster = cluster;
this.waitedOnMetadataMs = waitedOnMetadataMs;
}
}
再往下走就是真正需要去刷新matedata的逻辑
首先调用requestUpdate设置needUpdate为true,然后唤醒sender线程发送刷新请求
int version = metadata.requestUpdate();//返回当前版本号,初始值为0,每次更新时会自增,并将needUpdate设置为true
sender.wakeup();//唤起 sender,发送获取metadata请求
发送请求后阻塞等待响应结果
metadata.awaitUpdate(version, remainingWaitMs);
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0)//是否超时
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;//剩余超时时间
while ((this.version <= lastVersion) && !isClosed()) {//当前版本号小于最后的版本号且metadata没有关闭
AuthenticationException ex = getAndClearAuthenticationException();//是否有授权
if (ex != null)
throw ex;
if (remainingWaitMs != 0)//如果还有剩余时间
wait(remainingWaitMs);//阻塞
long elapsed = System.currentTimeMillis() - begin;//计算等待消耗的时间
if (elapsed >= maxWaitMs)//如果等待消耗的时间大于等于阻塞时间,则肯定超时
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;//计算剩余的阻塞时间
}
if (isClosed())//是否关闭
throw new KafkaException("Requested metadata update after close");
}
接下来看如何发送metadata的请求
Sender是个线程,里面肯定是有run方法在执行
public void run() {
...
run(time.milliseconds());
...
}
void run(long now) {
...
client.poll(pollTimeout, now);//调用真正的io读写请求
}
@Override
public List<ClientResponse> poll(long timeout, long now) {
...
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//里面会进行nio对channel的读写操作
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
...
handleCompletedReceives(responses, updatedNow);//处理响应内容
...
return responses;
}
poll是NetworkClient组件,用来管理网络相关,初始化时会创建metadataUpdater,默认为DefaultMetadataUpdater
if (metadataUpdater == null) {
if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
this.metadataUpdater = new DefaultMetadataUpdater(metadata);
} else {
this.metadataUpdater = metadataUpdater;
}
所以最终是否需要发送metadata落在了maybeUpdate方法
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);//选择一个低负载节点
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
}
maybeUpdate通过方法名可以看出不一定会去刷新,需要经过以下处理
获取是否可以下一次的执行时间
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
}
public synchronized long timeToAllowUpdate(long nowMs) {
return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
}
先通过needUpdate判断是否需要强制update matedata,needUpdate=false则表示周期性更新,这是我们知道needUpdate肯定为true,timeToExpire=0,然后再进行一个判断,如果最后一次刷新的时间+上更新间隔大于当前时间大于0,则返回正数,否则也返回0,举个例子, 如果上一次更新matedata的时间是 lastRefreshMs=2019-04-28 00:00:00.000 refreshBackoffMs=100 nowMs=2019-04-28 00:00:00.010 那么如果nowMs大于lastRefreshMs+refreshBackoffMs,那么肯定超过更新间隔时间,是可以进行更新,否则还没有超过两次更新的间隔时间,最终timeToNextMetadataUpdate记录了是否可以更新的时间点,返回大于0,则可以更新,否则返回还差多少毫秒可以进行下次更新的毫秒数
long waitForMetadataFetch = this.metadataFetchInProgress ? defaultRequestTimeoutMs : 0;
metadataFetchInProgress用来记录是否有线程在进行update,如果有线程在update,那么返回defaultRequestTimeoutMs(请求超时毫秒数),否则返回0
最后根据timeToNextMetadataUpdate和waitForMetadataFetch决定是否可以update
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {metadataTimeout=0表示可以更新,否则返回最大的等待matedata更新超时时间
return metadataTimeout;
}
综上,KafkaProducer中的retry.backoff.ms和最后一次刷新时间控制能否刷新metadata
再往下走就是构造MetadataRequest,发送请求
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId, now)) {//节点能否发送,即该节点网络是否能通
this.metadataFetchInProgress = true;//记录已有线程在更新
MetadataRequest.Builder metadataRequest;
if (metadata.needMetadataForAllTopics())//如果需要更新所有topic
metadataRequest = MetadataRequest.Builder.allTopics();//构造更新所有topic的请求
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
metadata.allowAutoTopicCreation());//构造更新当前matedata存储的topic的请求
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);//发送请求
return defaultRequestTimeoutMs;
}
...
}
sendInternalMetadataRequest方法最终会将发布nio的写事件,然后多路复用监听到这个写事件后进行网络写操作,也就是真正的与broker服务交互
private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
...
doSend(clientRequest, isInternalRequest, now, builder.build(version));
...
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
...
selector.send(send);
...
}
public void send(Send send) {
...
channel.setSend(send);
...
}
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
@Override
public void addInterestOps(int ops) {
key.interestOps(key.interestOps() | ops);
}
之前说过,poll是io读写的地方,运行在Sender线程中
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
@Override
public void poll(long timeout) throws IOException {
...
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
...
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
...
if (channel.ready() && key.isWritable()) {
Send send;
try {
send = channel.write();
} catch (Exception e) {
sendFailed = true;
throw e;
}
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
...
}
发完请求后poll会进行一系列的handler处理响应结果
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
...
if (req.isInternalRequest && body instanceof MetadataResponse)//如果是Metadata的响应体
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
...
}
@Override
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
Cluster cluster = response.cluster();//服务器返回的最新集群信息
...
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, response.unavailableTopics(), now);//调用update方法刷新metadata的信息
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now, null);
}
}
分析完发送请求后回到waitOnMetadata方法继续分析剩余的部分
cluster = metadata.fetch();//从元数据获取最新的集群信息
elapsed = time.milliseconds() - begin;//等待消耗的时间
if (elapsed >= maxWaitMs) {//如果等待消耗的时间大于最大阻塞时间,肯定超时
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
if (cluster.unauthorizedTopics().contains(topic))//认证失败,对当前 topic 没有 Write 权限
throw new TopicAuthorizationException(topic);
if (cluster.invalidTopics().contains(topic))//认证失败,topic是无效的
throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;//计算剩余的超时时间,下次循环用
partitionsCount = cluster.partitionCountForTopic(topic);//重新获取主题下的分区数
然后每次获取玩数据后判断是否满足当前的分区条件
while (partitionsCount == null || (partition != null && partition >= partitionsCount))
如果该主题分区数为空,那么都不知道要发往哪里,继续循环获取metadata,直至超时或者已有分区
如果分区号不为空,且分区号大于等于分区数也继续循环,这里的意思是计算出来的分区号必须要小于分区数,举个例子
如果topic的分区数为3,那么计算出来的分区号必须是0,1,2,如果计算出来的分区号大于等于3,则说明集群发生了变化,需要重新刷新metadata
最后返回ClusterAndWaitTime,包含了最新的集群信息和获取刷新元数据所消耗的时间
private static class ClusterAndWaitTime {
final Cluster cluster;
final long waitedOnMetadataMs;
ClusterAndWaitTime(Cluster cluster, long waitedOnMetadataMs) {
this.cluster = cluster;
this.waitedOnMetadataMs = waitedOnMetadataMs;
}
}
Metadata 会在下面两种情况下进行更新
- KafkaProducer 第一次发送消息时强制更新,其他时间周期性更新,它会通过 Metadata 的 metadataExpireMs, lastSuccessfulRefreshMs 这2个字段来实现;
- 强制更新: 调用 Metadata.requestUpdate() 将 needUpdate 置成了 true 来强制更新。 在 NetworkClient 的 poll() 方法调用时,就会去检查这两种更新机制,只要达到其中一种,就行触发更新操作。 Metadata 的强制更新会在以下几种情况下进行:
- initConnect 方法调用时,初始化连接;
- poll() 方法中对 handleDisconnections() 方法调用来处理连接断开的情况,这时会触发强制更新;
- poll() 方法中对 handleTimedOutRequests() 来处理请求超时时;
- 发送消息时,如果无法找到 partition 的 leader;
- 处理 Producer 响应(handleProduceResponse),如果返回关于 Metadata 过期的异常,比如:没有 topic-partition 的相关 meta 或者client 没有权限获取其 metadata。
这样就简单的把matedata给分析完了,里面最重要的就是用cluster成员变量来保存当前的kafka的集群信息