Kafka Replica Protocol - tenji/ks GitHub Wiki

Kafka 副本备份机制

一、术语定义

  • Replica

    副本(Replica),Kafka 分区下有可能有很多个副本用于实现冗余,从而进一步实现高可用,从本质上来讲,Leader, Follower, ISR 都属于不同角色的副本。

  • Leader

    响应客户端读写的请求的副本。Leader 会追踪和维护 ISR 中所有 Follower 的滞后状态。如果滞后太多(数量滞后和时间滞后两个维度,可通过replica.lag.time.max.msreplica.lag.max.message这两个参数来配置),Leader 会把该副本从 ISR 中移除。被移除 ISR 的副本会一直追赶 Leader。Leader 写入数据后并不会 Commit,只有 ISR 列表中的所有 Folower 同步之后才会 Commit,把滞后的 Follower 移除 ISR 主要是避免写消息延迟。

  • Follower

    被动地备份 Leader 副本中的数据,不能响应客户端读写请求。

  • ISR (In-Sync Replicas)

    包含了 Leader 副本和所有与 Leader 副本保持同步的 Follower 副本,具体怎么判断是否与 Leader 同步请参考下文。设置 ISR 主要是为了 Broker 宕掉之后,重新选举分区的 Leader 从 ISR 列表中选择(通过配置unclean.leader.election.enable参数为true也可以强制从非 ISR 中选举分区的 Leader)。

  • LEO

    Log End Offset(日志末端位移),记录了该副本底层日志(Log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO = 10,那么表示该副本保存了 10 条消息,位移值范围是 [0, 9]。另外,Leader LEO 和 follower LEO 的更新是有区别的,细节请看下文。

  • HW

    High Watermark(水位值),对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(Replicated)。同理,Leader 副本和 Follower副本的 HW 更新是有区别的,细节请看下文。

我们使用下图来形象化地说明 LEO 和 HW 两者的关系:

HW vs LEO

上图中,HW 值是 7,表示位移是 0 ~ 7 的所有消息都已经处于“已备份状态”(Committed),而 LEO 值是 15,那么 8 ~ 14 的消息就是尚未完全备份(Fully Replicated)。为什么没有 15?因为刚才说过了,LEO 指向的是下一条消息到来时的位移,故上图使用虚线框表示。我们总说 Consumer 无法消费未提交消息。这句话如果用以上名词来解读的话,应该表述为:Consumer 无法消费分区下 Leader 副本中位移值大于分区 HW 的任何消息。这里需要特别注意分区 HW 就是 Leader 副本的 HW 值。

二、High Watermark 机制

既然副本分为 Leader 副本和 Follower 副本,而每个副本又都有 HW 和 LEO,那么它们是怎么被更新的呢?它们更新的机制又有什么区别呢?我们一一来分析下:

2.1 Follower 副本何时更新 LEO?

如前所述,Follower 副本只是被动地向 Leader 副本请求数据,具体表现为 Follower 副本不停地向 Leader 副本所在的 Broker 发送 FETCH 请求,一旦获取消息后写入自己的日志中进行备份。那么 Follower 副本的 LEO 是何时更新的呢?首先我必须言明,Kafka 有两套 Follower 副本 LEO:

  1. 一套 LEO 保存在 Follower 副本所在 Broker 的副本管理机中;
  2. 另一套 LEO 保存在 Leader 副本所在 Broker 的副本管理机中——换句话说,Leader 副本机器上保存了所有的 Follower 副本的 LEO。

为什么要保存两套?这是因为 Kafka 使用前者帮助 Follower 副本更新其 HW 值;而利用后者帮助 Leader 副本更新其 HW 使用。下面我们分别看下它们被更新的时机。

2.1.1 Follower 副本端的 Follower 副本 LEO 何时更新?

Follower 副本端的 LEO 值就是其底层日志的 LEO 值,也就是说每当新写入一条消息,其 LEO 值就会被更新(类似于 LEO += 1)。当 Follower 发送 FETCH 请求后,Leader 将数据返回给 Follower,此时 follower 开始向底层 Log 写数据,从而自动地更新 LEO 值。

2.1.2 Leader 副本端的 Follower 副本 LEO 何时更新?

Leader 副本端的 Follower 副本 LEO 的更新发生在 Leader 在处理 Follower FETCH 请求时。一旦 Leader 接收到 Follower 发送的 FETCH 请求,它首先会从自己的 Log 中读取相应的数据,但是在给 Follower 返回数据之前它先去更新 Follower 的 LEO (即上面所说的第二套 LEO)

2.2 Follower 副本何时更新 HW?

Follower 更新 HW 发生在其更新 LEO 之后,一旦 Follower 向 Log 写完数据,它会尝试更新它自己的 HW 值。具体算法就是比较当前 LEO 值与 FETCH 响应中 Leader 的 HW 值,取两者的小者作为新的 HW 值。这告诉我们一个事实:如果 Follower 的 LEO 值超过了 Leader 的 HW 值,那么 Follower HW 值也是不会越过 Leader HW 值的。

2.3 Leader 副本何时更新 LEO?

和 Follower 更新 LEO 道理相同,Leader 写 Log 时就会自动地更新它自己的 LEO 值。

2.4 Leader 副本何时更新 HW 值?

前面说过了,Leader 的 HW 值就是分区 HW 值,因此何时更新这个值是我们最关心的,因为它直接影响了分区数据对于 Consumer 的可见性 。以下4种情况下 Leader 会尝试去更新分区 HW - 切记是尝试,有可能因为不满足条件而不做任何更新:

  • 副本成为 Leader 副本时

    当某个副本成为了分区的 Leader 副本,Kafka 会尝试去更新分区 HW。这是显而易见的道理,毕竟分区 Leader 发生了变更,这个副本的状态是一定要检查的!不过,本文讨论的是当系统稳定后且正常工作时备份机制可能出现的问题,故这个条件不在我们的讨论之列。

  • Broker 出现崩溃导致副本被踢出 ISR 时

    若有 Broker 崩溃则必须查看下是否会波及此分区,因此检查下分区 HW 值是否需要更新是有必要的,本文不对这种情况做深入讨论。

  • Producer 向 Leader 副本写入消息时

    因为写入消息会更新 Leader 的 LEO,故有必要再查看下 HW 值是否也需要修改。

  • Leader 处理 Follower FETCH 请求时

    当 Leader 处理 Follower 的 FETCH 请求时首先会从底层的 Log 读取数据,之后会尝试更新分区 HW 值。

特别注意上面4个条件中的最后两个。它揭示了一个事实——当 Kafka Broker 都正常工作时,分区 HW 值的更新时机有两个:Leader 处理 Produce 请求时和 Leader 处理 FETCH 请求时。另外,Leader 是如何更新它的 HW 值的呢?前面说过了,Leader Broker 上保存了一套 Follower 副本的 LEO 以及它自己的 LEO。当尝试确定分区 HW 时,它会选出所有满足条件的副本,比较它们的 LEO(当然也包括 Leader 自己的 LEO),并选择最小的 LEO 值作为 HW 值。这里的满足条件主要是指副本要满足以下两个条件之一:

  • 处于 ISR 中
  • 副本 LEO 落后于 Leader LEO 的时长不大于replica.lag.time.max.ms参数值(默认是10s)

乍看上去好像这两个条件说得是一回事,毕竟 ISR 的定义就是第二个条件描述的那样。但某些情况下 Kafka 的确可能出现副本已经“追上”了 Leader 的进度,但却不在 ISR 中,比如某个从 Failure 中恢复的副本。如果 Kafka 只判断第一个条件的话,确定分区 HW 值时就不会考虑这些未在 ISR 中的副本,但这些副本已经具备了“立刻进入ISR”的资格,因此就可能出现分区 HW 值越过 ISR 中副本 LEO 的情况——这肯定是不允许的,因为分区 HW 实际上就是 ISR 中所有副本 LEO 的最小值。

好了,理论部分我觉得说的差不多了,下面举个实际的例子。我们假设有一个 Topic,单分区,副本因子是 2,即一个 Leader 副本和一个 Follower 副本。我们看下当 Producer 发送一条消息时,Broker 端的副本到底会发生什么事情以及分区 HW 是如何被更新的。

下图是初始状态,我们稍微解释一下:初始时 leader 和 follower 的 HW 和 LEO 都是 0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。Leader 中的 Remote LEO 指的就是 Leader 端保存的 Follower LEO,也被初始化成 0。此时,Producer 没有发送任何消息给 Leader,而 Follower 已经开始不断地给 Leader 发送 FETCH 请求了,但因为没有数据因此什么都不会发生。值得一提的是,follower发送过来的FETCH请求因为无数据而暂时会被寄存到 Leader 端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间 Producer 端发送过来数据,那么会 Kafka 会自动唤醒该 FETCH 请求,让 Leader 继续处理之。

虽然 Purgatory 不是本文的重点,但 FETCH 请求发送和 PRODUCE 请求处理的时机会影响我们的讨论。因此后续我们也将分两种情况来讨论分区 HW 的更新。

第一种情况:Follower 发送 FETCH 请求在 Leader 处理完 PRODUCE 请求之后

Producer 给该 Topic 分区发送了一条消息。此时的状态如下图所示:

如图所示,Leader 接收到 PRODUCE 请求主要做两件事情:

  1. 把消息写入写底层 Log(同时也就自动地更新了 Leader 的 LEO)

  2. 尝试更新 Leader HW 值(前面 Leader 副本何时更新 HW 值 中的第三个条件触发)

    我们已经假设此时 Follower 尚未发送 FETCH 请求,那么 Leader 端保存的 Remote LEO 依然是 0,因此 Leader 会比较它自己的 LEO 值和 Remote LEO 值,发现最小值是 0,与当前 HW 值相同,故不会更新分区 HW 值

所以,PRODUCE 请求处理完成后 leader 端的 HW 值依然是 0,而 LEO 是 1,Remote LEO 是 1。假设此时 Follower 发送了 FETCH 请求(或者说 Follower 早已发送了 FETCH 请求,只不过在 Broker 的请求队列中排队),那么状态变更如下图所示:

本例中当 Follower 发送 FETCH 请求时,Leader 端的处理依次是:

  1. 读取底层 Log 数据

  2. 更新 Remote LEO = 0

    为什么是 0? 因为此时 Follower 还没有写入这条消息。Leader 如何确认 Follower 还未写入呢?这是通过 Follower 发来的 FETCH 请求中的 Fetch Offset 来确定的

  3. 尝试更新分区 HW - 此时leader LEO = 1,remote LEO = 0,故分区 HW 值= min(leader LEO, follower remote LEO) = 0

  4. 把数据和当前分区 HW 值(依然是 0)发送给 Follower 副本

而 Follower 副本接收到 FETCH Response 后依次执行下列操作:

  1. 写入本地 Log(同时更新 Follower LEO)
  2. 更新 Follower HW - 比较本地 LEO 和当前 Leader HW 取小者,故 Follower HW = 0

此时,第一轮 FETCH RPC 结束,我们会发现虽然 Leader 和 Follower 都已经在 Log 中保存了这条消息,但分区 HW 值尚未被更新。实际上,它是在第二轮 FETCH RPC 中被更新的,如下图所示:

第二种情况:FETCH 请求保存在 Purgatory 中 PRODUCE 请求到来

这种情况实际上和第一种情况差不多。前面说过了,当 Leader 无法立即满足FECTH返回要求的时候(比如没有数据),那么该 FETCH 请求会被暂存到 Leader 端的 Purgatory 中,待时机成熟时会尝试再次处理它。不过 Kafka 不会无限期地将其缓存着,默认有个超时时间(500ms),一旦超时时间已过,则这个请求会被强制完成。不过我们要讨论的场景是在寄存期间,producer 发送 PRODUCE 请求从而使之满足了条件从而被唤醒。此时,Leader 端处理流程如下:

  1. Leader 写入本地 Log(同时自动更新 Leader LEO)
  2. 尝试唤醒在 Purgatory 中寄存的 FETCH 请求
  3. 尝试更新分区 HW

至于唤醒后的 FETCH 请求的处理与第一种情况完全一致,故这里不做详细展开了。

以上所有的东西其实就想说明一件事情:Kafka 使用 HW 值来决定副本备份的进度,而HW值的更新通常需要额外一轮 FETCH RPC 才能完成,故而这种设计是有问题的。它们可能引起的问题包括:

  • 备份数据丢失
  • 备份数据不一致

我们一一分析下:

三、High Watermark 机制可能引起的问题

3.1 数据丢失

3.2 数据离散

四、Leader Epoch(Kafka 0.11.0.0 版本之后引入)

参考

⚠️ **GitHub.com Fallback** ⚠️