消费者——消息抓取器 - 969251639/study GitHub Wiki
消息抓取器Fetcher顾名思义就是kafka消费者用来拉取消息的组件,每个消费者持有一个Fetcher
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
...
this.fetcher = new Fetcher<>(
logContext,
this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricsRegistry.fetcherMetrics,
this.time,
this.retryBackoffMs,
this.requestTimeoutMs,
isolationLevel);
...
}
public Fetcher(LogContext logContext,
ConsumerNetworkClient client,
int minBytes,
int maxBytes,
int maxWaitMs,
int fetchSize,
int maxPollRecords,
boolean checkCrcs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
FetcherMetricsRegistry metricsRegistry,
Time time,
long retryBackoffMs,
long requestTimeoutMs,
IsolationLevel isolationLevel) {
this.log = logContext.logger(Fetcher.class);
this.logContext = logContext;
this.time = time;//当前时间
this.client = client;//网络客户端
this.metadata = metadata;//集群元数据
this.subscriptions = subscriptions;//主题订阅器
this.minBytes = minBytes;//服务器收到消费者的Fetcher后不会立即返回,当累计到至少minBytes个字节时才响应,提高网络吞吐量,由fetch.min.bytes配置项控制,默认1字节
this.maxBytes = maxBytes;//拉取消息的最大字节数,由fetch.max.bytes配置项控制,默认50 * 1024 * 1024=52428800字节=50M
this.maxWaitMs = maxWaitMs;//等待响应的最大时长,服务器会根据此参数决定何时响应,也就是服务器最多阻塞maxWaitMs毫秒后返回,由fetch.max.wait.ms配置项控制,默认500毫秒
this.fetchSize = fetchSize;//每次Fetcher最大字节数,由max.partition.fetch.bytes配置项控制,默认1 * 1024 * 1024=1048576字节=1M
this.maxPollRecords = maxPollRecords;//每次拉取的最大消息数,由max.poll.records配置项控制,默认500条记录
this.checkCrcs = checkCrcs;//是否crc校验???由check.crcs配置项控制
this.keyDeserializer = keyDeserializer;//key反序列化
this.valueDeserializer = valueDeserializer;//value反序列化
this.completedFetches = new ConcurrentLinkedQueue<>();//收集拉取消息的队列
this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
this.retryBackoffMs = retryBackoffMs;//每次拉取的最大间隔时间
this.requestTimeoutMs = requestTimeoutMs;//请求超时时间
this.isolationLevel = isolationLevel;//隔离级别
this.sessionHandlers = new HashMap<>();//会话控制器
subscriptions.addListener(this);
}
消费者每次轮询时都会尝试去拉取消息
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
...
// poll for new data until the timeout expires
do {
...
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);//拉取消息
if (!records.isEmpty()) {//如果拉取到消息
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {//尝试继续提前发送拉取下一批消息,这样下一次就不用等到下次轮询时才去拉取等待响应,提高响应效率
client.pollNoWakeup();//发送请求
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));//再返回拉取到的消息前回调拦截器的onConsume方法
}
} while (timer.notExpired());//如果这次拉取还没超时,则继续拉取,直到有数据返回或者超时
return ConsumerRecords.empty();
} finally {
release();
}
}
可以看到,对于消费者来说,拉取的处理都是委托给Fetcher组件来处理,这里的拉取其实挺讲究的,首先先调用了pollForFetches方法先去处理上一次的拉取结果,如果有的话
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
//计算阻塞时长
long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
//拉取消息,如果上次还没有处理完也会先在这里返回
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {//如果拉取到消息,那么立即返回
return records;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();//发送拉取请求
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});//发起拉消息,注意这里的第二个参数是用来控制阻塞
timer.update(pollTimer.currentTimeMs());
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();//获取拉取结果
}
这个方法其实就是真个拉取的核心过程,首先看拉取器Fetcher是如何去拉取结果的
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();//记录每个分区已拉取的记录的映射
int recordsRemaining = maxPollRecords;//记录还剩多少拉取数要拉取,初始化为最大拉取数
try {
while (recordsRemaining > 0) {//如果剩余拉取数大于0,则一直询换,这道剩余拉取数等于0,或者拉取队列为空
//nextInLineRecords记录了下一个拉取的消息记录,当每次拉取完一条消息后会调用drain方法将isFetched置为true
//nextInLineRecords == null,第一次拉取的情况
//nextInLineRecords.isFetched,从队列取出一条消息后的情况
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
CompletedFetch completedFetch = completedFetches.peek();//出队,但数据还在队列里,没有删除,这里之所以不能用poll出队,是因为在下面的parseCompletedFetch方法中有可能出错,而导致消息丢失
if (completedFetch == null) break;//如果队列没数据了,跳出循环
try {
nextInLineRecords = parseCompletedFetch(completedFetch);//将消息解析成PartitionRecords对象
} catch (Exception e) {
// Remove a completedFetch upon a parse with exception if (1) it contains no records, and
// (2) there are no fetched records with actual content preceding this exception.
// The first condition ensures that the completedFetches is not stuck with the same completedFetch
// in cases such as the TopicAuthorizationException, and the second condition ensures that no
// potential data loss due to an exception in a following record.
FetchResponse.PartitionData partition = completedFetch.partitionData;
if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
completedFetches.poll();//如果消息的内容是空的也出队,这是非正常数据
}
throw e;
}
completedFetches.poll();//如果上面的解析没问题,从队列中移除
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);//将从队列中获取的到数据进行真正的解析,也就是做反序列化操作等
TopicPartition partition = nextInLineRecords.partition;
if (!records.isEmpty()) {
//下面的操作无非就是不断的对同一个分区进行消息叠加在一起,最后形成每个分区下的消息列表的映射map
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();//重新计算消息拉取剩余数
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
fetchedRecords方法无非就两个步骤,第一个步骤,也就是if分支,从拉取到结果缓存队列completedFetch中获取消息,调用parseCompletedFetch方法将其封装成PartitionRecords对象,也就是nextInLineRecords变量,然后在第二个步骤,也就是else部分,调用fetchRecords方法对nextInLineRecords进行解析,解析过程中又会调用nextInLineRecords的drain方法将nextInLineRecords的isFetched设置为true,这样下次循环又回到第一个步骤中,以此类推,一直循环,直至满足退出循环的两个条件————1. 超出了最大拉取数,2. 消息缓存队列没数据了
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
PartitionRecords partitionRecords = null;
Errors error = partition.error;
try {
if (!subscriptions.isFetchable(tp)) {//是否忽略该分区的拉取
// this can happen when a rebalance happened or a partition consumption paused
// while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (error == Errors.NONE) {//正常的拉取响应结果
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long position = subscriptions.position(tp);//下一次待消费偏移量
if (position == null || position != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
partition.records.sizeInBytes(), tp, position);
Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
partitionRecords = new PartitionRecords(tp, completedFetch, batches);//将拉取的结果包装成PartitionRecords
if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
if (completedFetch.responseVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +
" and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
"newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
recordTooLargePartitions);
} else {
// This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
fetchOffset + ". Received a non-empty fetch response from the server, but no " +
"complete records were found.");
}
}
if (partition.highWatermark >= 0) {//在这里消费者开始更新HW
log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
if (partition.logStartOffset >= 0) {//在这里消费者开始更新LSO
log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
}
if (partition.lastStableOffset >= 0) {//在这里消费者开始更新最后一次提交的偏移量
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
subscriptions.requestOffsetReset(tp);
} else {
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally {
if (partitionRecords == null)
completedFetch.metricAggregator.record(tp, 0, 0);
if (error != Errors.NONE)
// we move the partition to the end if there was an error. This way, it's more likely that partitions for
// the same topic can remain together (allowing for more efficient serialization).
subscriptions.movePartitionToEnd(tp);
}
return partitionRecords;
}
private List<ConsumerRecord<K, V>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned",
partitionRecords.partition);
} else if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
partitionRecords.partition);
} else {
long position = subscriptions.position(partitionRecords.partition);//下一次待消费偏移量
if (partitionRecords.nextFetchOffset == position) {//判断下一次要拉取的偏移量是否与当前的待拉取的便宜量相等,不等则表示有可能重复消费,跳过,继续调用drain方法进行下一次消息的拉取
//这里会做反序列化的底层io操作,最后将消息封装成ConsumerRecord
List<ConsumerRecord<K, V>> partRecords = partitionRecords.fetchRecords(maxRecords);
long nextOffset = partitionRecords.nextFetchOffset;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
subscriptions.position(partitionRecords.partition, nextOffset);//更新下一次待拉取的偏移量
Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
Long lead = subscriptions.partitionLead(partitionRecords.partition);
if (lead != null) {
this.sensors.recordPartitionLead(partitionRecords.partition, lead);
}
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition, partitionRecords.nextFetchOffset, position);
}
}
partitionRecords.drain();//将partitionRecords中的isFetched属性设置为true,继续抓取下一条消息
return emptyList();
}
回到pollForFetches方法继续往下走,调用了fetcher的sendFetches方法发起拉取请求
public synchronized int sendFetches() {
//发起抓取消息前检验下每个节点的网络情况,看是否需要更新元数据,然后包装成待发送的节点的请求数据的map后返回
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {//循环每个节点
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget());//封装抓取消息的请求体
if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
client.send(fetchTarget, request)//发起请求
.addListener(new RequestFutureListener<ClientResponse>() {//响应回调
@Override
public void onSuccess(ClientResponse resp) {//响应成功
synchronized (Fetcher.this) {
FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {//检查响应是否有问题
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));//将响应消息放到completedFetches缓存队列中
}
sensors.fetchLatency.record(resp.requestLatencyMs());
}
}
@Override
public void onFailure(RuntimeException e) {//响应失败
synchronized (Fetcher.this) {
FetchSessionHandler handler = sessionHandler(fetchTarget.id());
if (handler != null) {
handler.handleError(e);//处理失败的响应
}
}
}
});
}
return fetchRequestMap.size();
}
可以看到最重要的一步就是将响应的消息放到completedFetches缓存队列中,这样在处理拉取结果时就能通过该队列去获取消息数据了,注意上面的send方法并没有发生网络IO,只是将请求写到了unsent缓冲队列中,接下来会调用poll方法进行发起真正的网络IO请求
client.poll(pollTimer, () -> {//第二个参数控制是否阻塞,如果有数据在缓存队列待处理,那么不需要阻塞,直接返回,这样就可以继续往下处理,否则拉取的时候阻塞一段时间,等待数据返回
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
最后再次处理拉取的结果并返回
return fetcher.fetchedRecords();