消费者——消息订阅与抓取 - 969251639/study GitHub Wiki
KafkaConsumer是kafka消费者的实现类,它实现了Consumer接口,定义了kafka消费者的功能方法
消费者接口
public interface Consumer<K, V> extends Closeable {
//返回消费者分配到的分区
Set<TopicPartition> assignment();
//返回消费者已订阅的主题
Set<String> subscription();
//不带回调的订阅指定主题,并未消费者自动分配分区,已assign方法互斥
void subscribe(Collection<String> topics);
//带回调的订阅指定主题,并未消费者自动分配分区,已assign方法互斥
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
//手动订阅指定的topic,并且指定消费分区,主题和分区一起绑定在TopicPartition,与subscribe方法互斥
void assign(Collection<TopicPartition> partitions);
//带回调的订阅正则主题,并未消费者自动分配分区,已assign方法互斥
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
//不带回调的订阅正则主题,并未消费者自动分配分区,已assign方法互斥
void subscribe(Pattern pattern);
//取消订阅
void unsubscribe();
//拉取消息,拉取不到则阻塞timeout毫秒后返回,已过期
ConsumerRecords<K, V> poll(long timeout);
//拉取消息,拉取不到则阻塞timeout周期后返回
ConsumerRecords<K, V> poll(Duration timeout);
//同步commit
void commitSync();
//同步commit,最多阻塞timeout周期后返回
void commitSync(Duration timeout);
//指定偏移量同步commit
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
//指定偏移量同步commit,最多阻塞timeout周期后返回
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
//异步commit
void commitAsync();
//异步commit成功后回调callback方法
void commitAsync(OffsetCommitCallback callback);
//指定偏移量异步commit成功后回调callback方法
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
//指定从offset开始消费分区
void seek(TopicPartition partition, long offset);
//重头开始消费
void seekToBeginning(Collection<TopicPartition> partitions);
//从最后的偏移量开始消费
void seekToEnd(Collection<TopicPartition> partitions);
//当前消费的位置,即当前偏移量的大小
long position(TopicPartition partition);
//当前消费的位置,即当前偏移量的大小,最多阻塞timeout周期后返回
long position(TopicPartition partition, final Duration timeout);
//返回最后一次提交的偏移量
OffsetAndMetadata committed(TopicPartition partition);
//返回最后一次提交的偏移量,最多阻塞timeout周期后返回
OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);
//监控用
Map<MetricName, ? extends Metric> metrics();
//返回topic下的所有分区信息
List<PartitionInfo> partitionsFor(String topic);
//返回topic下的所有分区信息,最多阻塞timeout周期后返回
List<PartitionInfo> partitionsFor(String topic, Duration timeout);
//返回所有主题和分区的映射关系
Map<String, List<PartitionInfo>> listTopics();
//返回所有主题和分区的映射关系,最多阻塞timeout周期后返回
Map<String, List<PartitionInfo>> listTopics(Duration timeout);
//暂停消费,暂停后poll方法会返回null
Set<TopicPartition> paused();
//暂停消费指定的分区,暂停后poll方法会返回null
void pause(Collection<TopicPartition> partitions);
//恢复指定的分区继续消费
void resume(Collection<TopicPartition> partitions);
//返回大于等于指定时间戳的分区下的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
//返回大于等于指定时间戳的分区下的偏移量,最多阻塞timeout周期后返回
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
//返回每个分区的最开始一个偏移量的值的映射
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
//返回每个分区的最开始一个偏移量的值的映射,最多阻塞timeout周期后返回
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout);
//返回每个分区的最后一个偏移量的值的映射
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
//返回每个分区的最后一个偏移量的值的映射,最多阻塞timeout周期后返回
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
//关闭消费者
void close();
//关闭消费者,最多阻塞unit为单位的timeout后返回,已过期
@Deprecated
void close(long timeout, TimeUnit unit);
//关闭消费者,最多阻塞timeout周期后返回
void close(Duration timeout);
//唤醒消费者
void wakeup();
}
下面看下它的主要成员变量
clientId:设置clientId,如果没有配置那么则自动生成一个以consumer-为前缀的数字递增作为clientId,如第一个consumer:consumer-1,第二个consumer:consumer-2,以此类推(client.id)
groupId:设置组id,每个consumer都属于某一个消费组。而这个group.id就是指哪一个消费组id(group.id)
requestTimeoutMs:请求超时时间(request.timeout.ms)
defaultApiTimeoutMs:默认api超时时间?
retryBackoffMs:每次重试刷新metadata时的间隔时间,防止metadata请求过多(retry.backoff.ms)
interceptors:拦截器,和生产者一样,可以在消息消费之前或者commit之前或者close时进行回调
实例化消息Key和Value进行反序列化操作的Deserializer
keyDeserializer:键反序列化
valueDeserializer:值反序列化
clusterResourceListeners:进群资源变化的监听器
metadata:集群元数据
channelBuilder:nio通道(根据不同协议创建不同的channel,比如明文的PlaintextChannelBuilder,SSL的SslChannelBuilder)
isolationLevel:事务隔离级别
heartbeatIntervalMs:每隔多少毫秒心跳跟服务器心跳一次
netClient:NetworkClient网络客户端
client(ConsumerNetworkClient):消费者的网络客户端,包装了netClient
offsetResetStrategy:消费重置策略,共有3种
latest: 从最新的偏移量开始消费(默认)
earliest: 从最早的偏移量开始消费
none: ???
subscriptions:订阅器
assignors:分区控制
maxPollIntervalMs:poll消息的最大时间
sessionTimeoutMs:session超时时间
coordinator(ConsumerCoordinator):消费者控制器,与服务器的GroupCoodinator之间的通讯逻辑
fetcher(Fetcher):消息抓取器
kafka消费者的轻量级锁,内部很多方法都使用该锁来实现线程安全,控制多线程下kafkaconsumer的安全操作,它由currentThread 和refcount 两个变量来实现,currentThread 用来记录当前的操作线程,初始值为NO_CURRENT_THREAD(-1),refcount 用来记录重入次数
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// refcount is used to allow reentrant access by the thread who has acquired currentThread
private final AtomicInteger refcount = new AtomicInteger(0);
每次操作之前都需要aquire进行申请,操作完后需要release进行释放
private void acquireAndEnsureOpen() {
acquire();//申请
if (this.closed) {//如果消费者已关闭
release();//释放
throw new IllegalStateException("This consumer has already been closed.");
}
}
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))//如果当前线程id和持有操作的线程id不等,则cas的将初始状态下的NO_CURRENT_THREAD设置为当前线程id
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");//以上两个操作都不成功则抛出异常
refcount.incrementAndGet();//引用次数加1
}
private void release() {
if (refcount.decrementAndGet() == 0)//如果引用次数减1后返回0,重置操作线程为初始值
currentThread.set(NO_CURRENT_THREAD);
}
kafka言外之意就是希望kafka消费者的操作都是基于单线程的操作
消费者主要方法包括订阅主题,抓取消息,提交ack,设置起始的消费偏移量等
- 订阅:
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();//申请操作
try {
if (topics == null) {//如果订阅的主题集合为null,抛异常
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {//如果订阅的主题集合为空
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();//取消订阅
} else {
for (String topic : topics) {//检查订阅的主题中是否有包含空串
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
//kafka的分区由assignors控制,那么订阅的时候分到主题的那些分区都有assignors来实现,所以订阅时必须要有assignors,否则抛异常
throwIfNoAssignorsConfigured();//如果分区器没有配置,抛异常
fetcher.clearBufferedDataForUnassignedTopics(topics);//
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);//设置订阅主题和回调
metadata.setTopics(subscriptions.groupSubscription());//设置订阅数据到元数据,然后将needUpdate设置为true,强制更新元数据,告知服务器消费者的订阅情况
}
} finally {
release();//释放操作
}
}
- 抓取消息:
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
...
// poll for new data until the timeout expires
do {
...
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);//拉取消息
if (!records.isEmpty()) {//如果拉取到消息
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {//尝试继续提前发送拉取下一批消息,这样下一次就不用等到下次轮询时才去拉取等待响应,提高响应效率
client.pollNoWakeup();//发送请求
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));//再返回拉取到的消息前回调拦截器的onConsume方法
}
} while (timer.notExpired());//如果这次拉取还没超时,则继续拉取,直到有数据返回或者超时
return ConsumerRecords.empty();
} finally {
release();
}
}
- 提交ack:
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
log.debug("Committing offsets: {}", offsets);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);//提交ack,由coordinator组件控制
} finally {
release();
}
}
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();//提交前回调
if (!coordinatorUnknown()) {//如果没有宕机
doCommitOffsetsAsync(offsets, callback);//执行真正的回调
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
lookupCoordinator().addListener(new RequestFutureListener<Void>() {//重新查找一个新的coordinator后再提交
@Override
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
}
@Override
public void onFailure(RuntimeException e) {
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();//唤醒线程
}