消费者——心跳 - 969251639/study GitHub Wiki
消费者必须定时的发送心跳来告知服务器自己还活着,这样服务器就能感知到消费者的消费存在,否则服务器收不到消费者的心跳而认为消费者下线后会发生reblance操作,心跳由ConsumerCoordinator组件维护,创建消费者时会创建该组件
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
...
this.coordinator = new ConsumerCoordinator(logContext,
this.client,
groupId,
maxPollIntervalMs,
sessionTimeoutMs,
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
...
}
可以看到创建ConsumerCoordinator时会创建Heartbeat,Heartbeat记录了心跳基本信息,可以通过该组件快速的判断心跳的时间间隔等
public final class Heartbeat {
private final int sessionTimeoutMs;//session超时时间,session.timeout.ms配置项控制
private final int heartbeatIntervalMs;//心跳间隔时间,heartbeat.interval.ms配置项控制
private final int maxPollIntervalMs;//最大轮询间隔时间,max.poll.interval.ms配置项控制
private final long retryBackoffMs;//每次发送心跳的最少间隔时间,metadata.max.age.ms配置项控制
private final Time time;//当前时间
private final Timer heartbeatTimer;//心跳时间器
private final Timer sessionTimer;//session时间器
private final Timer pollTimer;//poll时间器
private volatile long lastHeartbeatSend;//记录上一次心跳发送的时间戳
public Heartbeat(Time time,
int sessionTimeoutMs,
int heartbeatIntervalMs,
int maxPollIntervalMs,
long retryBackoffMs) {
if (heartbeatIntervalMs >= sessionTimeoutMs)//心跳间隔时间不能大于等于会话时间,否则抛异常,会话级的肯定是要更高层
throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
this.time = time;
this.sessionTimeoutMs = sessionTimeoutMs;
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.maxPollIntervalMs = maxPollIntervalMs;
this.retryBackoffMs = retryBackoffMs;
this.heartbeatTimer = time.timer(heartbeatIntervalMs);
this.sessionTimer = time.timer(sessionTimeoutMs);
this.pollTimer = time.timer(maxPollIntervalMs);
}
private void update(long now) {//更新过期时间
heartbeatTimer.update(now);//更新心跳时间
sessionTimer.update(now);//更新session时间
pollTimer.update(now);//更新poll时间
}
public void poll(long now) {//每次poll时都需要更新时间
update(now);
pollTimer.reset(maxPollIntervalMs);//重置
}
public void sentHeartbeat(long now) {//发送心跳
this.lastHeartbeatSend = now;//记录最后一次发送的时间戳
update(now);//更新时间
heartbeatTimer.reset(heartbeatIntervalMs);//重置下一次心跳时间间隔
}
public void failHeartbeat() {//心跳失败
update(time.milliseconds());//重置为最初始的时间
heartbeatTimer.reset(retryBackoffMs);//重置下一次心跳时间间隔为下一次matedata的刷新间隔时间retryBackoffMs
}
public void receiveHeartbeat() {//收到心跳响应
update(time.milliseconds());//重置为最初始的时间
sessionTimer.reset(sessionTimeoutMs);//重置会话超时时间
}
public boolean shouldHeartbeat(long now) {//是否可以发送心跳
update(now);//更新当前时间
return heartbeatTimer.isExpired();//心跳是否过期
}
public long lastHeartbeatSend() {//返回最后一次发送心跳时间戳
return this.lastHeartbeatSend;
}
public long timeToNextHeartbeat(long now) {//返回下一次发送心跳的时间戳
update(now);//更新当前时间
return heartbeatTimer.remainingMs();
}
public boolean sessionTimeoutExpired(long now) {//会话是否过期
update(now);//更新当前时间
return sessionTimer.isExpired();
}
public void resetTimeouts() {//重置超时时间
update(time.milliseconds());//重置为最初始的时间
sessionTimer.reset(sessionTimeoutMs);//重置为session超时时间
pollTimer.reset(maxPollIntervalMs);//重置为poll间隔时间
heartbeatTimer.reset(heartbeatIntervalMs);//重置心跳间隔时间
}
public void resetSessionTimeout() {//重置会话超时时间
update(time.milliseconds());//重置为最初始的时间
sessionTimer.reset(sessionTimeoutMs);
}
public boolean pollTimeoutExpired(long now) {//poll是否超时
update(now);//更新当前时间
return pollTimer.isExpired();
}
public long lastPollTime() {//返回最后一次poll时间戳
return pollTimer.currentTimeMs();
}
}
Heartbeat 用了3个Timer来记录heartbeat,session,poll的时间控制,Timer是kafka封装的基础时间控制器
public class Timer {
private final Time time;//当前时间的time
private long startMs;//开始时间戳
private long currentTimeMs;//当前时间戳
private long deadlineMs;//到期时间戳
Timer(Time time, long timeoutMs) {
this.time = time;
update();//更新当前时间
reset(timeoutMs);//重置到期时间
}
public boolean isExpired() {//是否过期
return currentTimeMs >= deadlineMs;//如果当前时间大于等于到期时间,则过期
}
public boolean notExpired() {//是否没过期
return !isExpired();
}
public void updateAndReset(long timeoutMs) {//更新当前时间并重置到期时间时间
update();
reset(timeoutMs);
}
public void reset(long timeoutMs) {//重置到期时间
if (timeoutMs < 0)
throw new IllegalArgumentException("Invalid negative timeout " + timeoutMs);
this.startMs = this.currentTimeMs;//重新标记开始时间为当前时间作为起始时间
if (currentTimeMs > Long.MAX_VALUE - timeoutMs)//最大超时时间为Long.MAX_VALUE
this.deadlineMs = Long.MAX_VALUE;
else
this.deadlineMs = currentTimeMs + timeoutMs;//当前时间加上上次到期时间,也就是续期
}
public void update() {
update(time.milliseconds());//更新当前时间
}
public void update(long currentTimeMs) {//更新当前时间
this.currentTimeMs = Math.max(currentTimeMs, this.currentTimeMs);
}
public long remainingMs() {//返回剩余时间
return Math.max(0, deadlineMs - currentTimeMs);
}
public long currentTimeMs() {//返回当前时间
return currentTimeMs;
}
public long elapsedMs() {//返回已流逝的时间,也就是运行时长
return currentTimeMs - startMs;
}
public void sleep(long durationMs) {//休眠
long sleepDurationMs = Math.min(durationMs, remainingMs());//最大休眠时间为剩余时间
time.sleep(sleepDurationMs);//失眠
update();//休眠醒来后继续更新当前时间
}
}
这样,Heartbeat就可以通过Timer来简单控制各种时间的判断,接下来看发送心跳的线程。上面说过心跳由ConsumerCoordinator组件维护,ConsumerCoordinator又是继承了AbstractCoordinator抽象类,在AbstractCoordinator类中有一个成员变量heartbeatThread,就是用来运行心跳的线程。
public abstract class AbstractCoordinator implements Closeable {
...
private final Heartbeat heartbeat;
...
}
另外coordinator记录服务器对消费者的一个协调器,可以把它看做一个服务器节点Node
private class HeartbeatThread extends KafkaThread {//AbstractCoordinator的内部类,继承了KafkaThread,而KafkaThread 又继承了Thread,所以HeartbeatThread本身就是一个Thread,里面的run方法肯定是线程执行体
private boolean enabled = false;//是否可用标记
private boolean closed = false;//是否关闭
private AtomicReference<RuntimeException> failed = new AtomicReference<>(null);//记录心跳失败的异常,一个消费者一个线程,这里用Atomic没看懂???
private HeartbeatThread() {//构造方法
//设置线程名称,由kafka-coordinator-heartbeat-thread +groupId组成,并与守护线程方式运行(第二个参数控制是否是守护线程)
super(HEARTBEAT_THREAD_PREFIX + (groupId.isEmpty() ? "" : " | " + groupId), true);
}
public void enable() {//设置心跳可用
synchronized (AbstractCoordinator.this) {
log.debug("Enabling heartbeat thread");
this.enabled = true;//设置可用为true
heartbeat.resetTimeouts();//重置时间
AbstractCoordinator.this.notify();//唤醒等待心跳而休眠的线程(其实就是HeartbeatThread本身)
}
}
public void disable() {//设置心跳不可用
synchronized (AbstractCoordinator.this) {
log.debug("Disabling heartbeat thread");
this.enabled = false;//设置可用为false
}
}
public void close() {//关闭
synchronized (AbstractCoordinator.this) {
this.closed = true;//设置关闭标记为true
AbstractCoordinator.this.notify();//唤醒等待心跳而休眠的线程(其实就是HeartbeatThread本身)
}
}
private boolean hasFailed() {//是否心跳失败
return failed.get() != null;
}
private RuntimeException failureCause() {//获取心跳失败的异常
return failed.get();
}
@Override
public void run() {
try {
log.debug("Heartbeat thread started");
while (true) {//死循环
synchronized (AbstractCoordinator.this) {//上锁
if (closed)//如果处于关闭状态,直接返回结束该线程
return;
if (!enabled) {//如果心跳不可用,休眠等待被上面的enable方法或close方法唤醒后继续循环
AbstractCoordinator.this.wait();
continue;
}
//MemberState.STABLE表示消费者已经join到该消费组并正在正常的发送心跳中
if (state != MemberState.STABLE) {
// the group is not stable (perhaps because we left the group or because the coordinator
// kicked us out), so disable heartbeats and wait for the main thread to rejoin.
disable();//消费者如果还没有join到一个消费者组或者心跳不正常,设置不可用
continue;
}
client.pollNoWakeup();//不可中断的发送心跳请求
long now = time.milliseconds();
if (coordinatorUnknown()) {//如果服务端的coordinator是宕机的
//尝试阻塞的发送查找服务器的coordinator的并重连coordinator请求,得到响应后判断是否是查找失败
if (findCoordinatorFuture != null || lookupCoordinator().failed())
// the immediate future check ensures that we backoff properly in the case that no
// brokers are available to connect to.
AbstractCoordinator.this.wait(retryBackoffMs);//等待休眠
} else if (heartbeat.sessionTimeoutExpired(now)) {//如果是会话过期
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
markCoordinatorUnknown();//标记coordinator宕机,需要重连coordinator
} else if (heartbeat.pollTimeoutExpired(now)) {//如果是nio的poll超时
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
log.warn("This member will leave the group because consumer poll timeout has expired. This " +
"means the time between subsequent calls to poll() was longer than the configured " +
"max.poll.interval.ms, which typically implies that the poll loop is spending too " +
"much time processing messages. You can address this either by increasing " +
"max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " +
"with max.poll.records.");
maybeLeaveGroup();//可能需要退出消费者组
} else if (!heartbeat.shouldHeartbeat(now)) {//判断是否需要发送心跳
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);//不需要发送心跳,休眠
} else {
heartbeat.sentHeartbeat(now);//发送心跳,则更新心跳时间
//构建心跳请求HeartbeatRequest并将心跳请求写到消费者的unsent缓冲队列中等待发送,并添加监听器
sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat();//收到心跳,重置超时时间
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {//如果是因为Rebalance中发生异常
// it is valid to continue heartbeating while the group is rebalancing. This
// ensures that the coordinator keeps the member in the group for as long
// as the duration of the rebalance timeout. If we stop sending heartbeats,
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat();//收到心跳,重置超时时间
} else {
heartbeat.failHeartbeat();//心跳失败
// wake up the thread if it's sleeping to reschedule the heartbeat
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
} catch (AuthenticationException e) {
log.error("An authentication error occurred in the heartbeat thread", e);
this.failed.set(e);
} catch (GroupAuthorizationException e) {
log.error("A group authorization error occurred in the heartbeat thread", e);
this.failed.set(e);
} catch (InterruptedException | InterruptException e) {
Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread", e);
this.failed.set(new RuntimeException(e));
} catch (Throwable e) {
log.error("Heartbeat thread failed due to unexpected error", e);
if (e instanceof RuntimeException)
this.failed.set((RuntimeException) e);
else
this.failed.set(new RuntimeException(e));
} finally {
log.debug("Heartbeat thread has closed");
}
}
}
心跳的过程中会考虑到服务器的coordinator是否宕机,会话是否超时等情况,来逐一分析下这些场景
- coordinator服务宕机
public boolean coordinatorUnknown() {
return checkAndGetCoordinator() == null;//检查coordinator
}
protected synchronized Node checkAndGetCoordinator() {
if (coordinator != null && client.isUnavailable(coordinator)) {//coordinator不等于空且coordinator不可用则标记coordinator宕机
markCoordinatorUnknown(true);
return null;
}
return this.coordinator;
}
public boolean isUnavailable(Node node) {
lock.lock();
try {//coordinator服务连接失败
return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
} finally {
lock.unlock();
}
}
protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
if (this.coordinator != null) {
log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
Node oldCoordinator = this.coordinator;//记录旧的coordinator
// Mark the coordinator dead before disconnecting requests since the callbacks for any pending
// requests may attempt to do likewise. This also prevents new requests from being sent to the
// coordinator while the disconnect is in progress.
this.coordinator = null;//设置coordinator为空
// Disconnect from the coordinator to ensure that there are no in-flight requests remaining.
// Pending callbacks will be invoked with a DisconnectException on the next call to poll.
if (!isDisconnected)//如果是断开连接
client.disconnectAsync(oldCoordinator);//设置coordinator断开连接
}
}
如果是宕机情况,则会进行两个判断来确认是否需要休眠
findCoordinatorFuture != null || lookupCoordinator().failed()
findCoordinatorFuture != null表示已重新对服务器发送查找coordinator的请求,但可能还没有收到服务器的响应,这时候findCoordinatorFuture!=null,还没触发响应监听器的逻辑,接收到服务器的响应后会回调响应监听器,将findCoordinatorFuture 设置为null
lookupCoordinator方法会去服务器查找coordinator,找到后重连coordinator
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else
findCoordinatorFuture = sendFindCoordinatorRequest(node);//发起查找coordinator的请求
}
return findCoordinatorFuture;
}
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);//构造查找coordinator的请求
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());//发送请求到unsent缓冲队列等待发送,并设置FindCoordinatorResponseHandler作为响应回调处理的handler
}
private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received FindCoordinator response {}", resp);
clearFindCoordinatorFuture();//清掉findCoordinatorFuture,可以下次继续查找coordinator
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {//如果响应成功
synchronized (AbstractCoordinator.this) {
// use MAX_VALUE - node.id as the coordinator id to allow separate connections
// for the coordinator in the underlying network client layer
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.node().host(),
findCoordinatorResponse.node().port());//重置coordinator节点的服务器信息
log.info("Discovered group coordinator {}", coordinator);
client.tryConnect(coordinator);//根据重置后的节点信息,重连该服务器的coordinator
heartbeat.resetSessionTimeout();//重置会话超时时间
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));//设置失败的异常
} else {
log.debug("Group coordinator lookup failed: {}", error.message());
future.raise(error);//设置失败的异常
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {//响应失败
clearFindCoordinatorFuture();//清掉findCoordinatorFuture,可以下次继续查找coordinator
super.onFailure(e, future);//设置失败的异常
}
}
如果上面的两个判断成立则进行休眠一小段时间后重试
AbstractCoordinator.this.wait(retryBackoffMs);
- 会话超时
heartbeat.sessionTimeoutExpired(now)
如果是会话超时那么表示coordinator服务器宕机了,标记当前的coordinator服务器宕机
markCoordinatorUnknown();//标记coordinator服务器宕机
- 轮询超时,可以理解为消费者处理时长超时
heartbeat.pollTimeoutExpired(now)
如果是轮询超时,那么有肯能消费着本身出了问题,比如最大轮询100ms,而消费者却执行了200ms,很明显超出了消费者最大的处理能力范围,对于这种情况kafka直接将该消费者踢出消费者组,对于这种情况,可以通过加大max.poll.interval.ms的配置或者减少max.poll.records最大轮询数量来控制
public synchronized void maybeLeaveGroup() {
//如果coordinator没有宕机且不属于还没有消费者组且当前的“代”不属于初始“代”
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
log.info("Sending LeaveGroup request to coordinator {}", coordinator);
LeaveGroupRequest.Builder request =
new LeaveGroupRequest.Builder(groupId, generation.memberId);//构造退出消费者组的请求
client.send(coordinator, request)
.compose(new LeaveGroupResponseHandler());//发送到unsent缓冲队列等待发送
client.pollNoWakeup();//不可中断发送请求
}
resetGeneration();//重置当前“代”
}
protected synchronized void resetGeneration() {
this.generation = Generation.NO_GENERATION;//设置“代”为初始状态
this.rejoinNeeded = true;//标记需要重新申请加入消费者组
this.state = MemberState.UNJOINED;//状态设置为未加入消费者组
}
- 是否需要发送心跳
heartbeat.shouldHeartbeat(now))
上面的分析heartbeat的时候已经知道shouldHeartbeat会根据他的到期时间和当前时间判断心跳是否已过期,如果已过期,则发送心跳,否则休眠一小段时间,继续检测
AbstractCoordinator.this.wait(retryBackoffMs);//不需要发送心跳,休眠
分析完了整个心跳的执行逻辑后最后看下心跳的启动时机
kafka的消费者时pull模式,也就是说每次都需要消费者主动去从kafka服务器拉消息,而在第一次拉的时候会启动心跳线程
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
...
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}
...
}
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (!coordinator.poll(timer)) {
return false;
}
...
}
public boolean poll(Timer timer) {
...
if (!ensureActiveGroup(timer)) {
return false;
}
}
boolean ensureActiveGroup(final Timer timer) {
...
startHeartbeatThreadIfNeeded();
...
}
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();//启动心跳线程
}
}