type
status
date
slug
summary
tags
category
icon
password

1、概述

我们之前谈到过,Kafka是有主题概念的,而每个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
在实际生产环境中,每台 Broker 都可能保存有各个主题下不同分区的不同副本,因此,单个 Broker 上存有成百上千个副本的现象是非常正常的。
接下来我们来看一张图,它展示的是一个有 3 台 Broker 的 Kafka 集群上的副本分布情况。从这张图中,我们可以看到,主题 1 分区 0 的 3 个副本分散在 3 台Broker上,其他主题分区的副本也都散落在不同的 Broker 上,从而实现数据冗余。
notion image

2、副本角色

为了保证同一个分区的多个副本数据保持一致,Kafka 采用基于领导者(Leader-based)的副本机制
notion image
  • 在 Kafka 中,副本分成两类:领导者副本(Leader Replica)追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为 Leader 副本,其余的副本自动称为 Follower 副本。
  • Follower 副本是不对外提供服务的。这就是说,任何一个 Follower 副本都不能响应消费者和生产者的读写请求。所有的请求都必须由 Leader 副本来处理,或者说,所有的读写请求都必须发往 Leader 副本所在的 Broker,由该 Broker 负责处理。Follower 副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
  • 当 Leader 副本挂掉了,或者说 Leader 副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的 Leader 选举,从 Follower 副本中选一个作为新的 Leader。老 Leader 副本重启回来后,只能作为 Follower 副本加入到集群中。
对于客户端用户而言,Kafka 的 Follower 副本没有任何作用,它既不能像 MySQL 那样帮助 Leader 副本“扛读”,也不能实现将某些副本放到离客户端近的地方来改善数据局部性。既然如此,Kafka 为什么要这样设计呢?其实这种副本机制有两个方面的好处。
  1. 方便实现“Read-your-writes”。所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许 Follower 副本对外提供服务,由于副本同步是异步的,因此有可能出现 Follower 副本还没有从 Leader 副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
  1. 方便实现单调读(Monotonic Reads)。什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。如果允许 Follower 副本提供读服务,那么假设当前有 2 个 Follower 副本 F1 和 F2,它们异步地拉取 Leader 副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。

3、副本的同步机制

3.1 In-sync Replicas(ISR)概念

前面我们提到当 Leader 副本挂了,会从 Follower 副本中选出新的 Leader。为了确定新 Leader 候选人的范围,Kafka 引入了 ISR(in-sync replica)的概念。ISR 是 Kafka 为某个分区维护的一组同步集合,即每个分区都有自己的一个 ISR 集合(包括 Leader 本身),由 Leader 负责维护。ISR 的特点:
  • 处于 ISR 集合中的副本,意味着 Follower 副本与 Leader 副本保持同步状态,这也意味着只有处于 ISR 集合中的副本才有资格被选举为 Leader。
  • 一条 Kafka 消息,只有被 ISR 中的副本都接收到,才被视为“已同步”状态。
被踢出 ISR 的副本,会加入 OSR 集合:
  • OSR:非同步副本,Follower 同步延迟超过阈值,从 ISR 踢出,然后加入 OSR。
  • AR:所有副本,即 AR = ISR + OSR
例如一个分区有 5 个副本,但是 ISR 列表可能只有 3 个副本,其他 2 个副本由于延迟太高等原因被踢出 ISR 列表,加入 OSR 列表。
那么什么情况下副本会被踢出 ISR 副本呢?这个由 Broker 端参数 replica.lag.time.max.ms 的参数值控制。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后Leader副本的时间不连续超过 10 秒,那么 Kafka 就认为该Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。如果超过了这个阈值,Kafka 就会将该副本“踢出”ISR。
倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的。

3.2 消息的写入和同步过程

Kafka 里面消息同步有三个非常重要的配置参数,这三个参数对于不丢失消息起到了很大的作用。
  • acks :Producer 端参数,用来告诉 Producer 消息是否写入成功,有三个值:
    • acks = all/-1 : 表示 Producer 发送消息后, 要等 ISR 中的最少数量的副本同步消息成功后,才收到这条消息写入成功的响应。这个选项持久性最好,延时性最差。通常搭配 min.insync.replicas 参数使用。
    • acks = 1 :表示 Producer 发送消息后,只需要等 Leader 副本写入数据成功后,就可以收到这条消息写入成功的响应。这个是 Kafka 的默认选项,提供了较好的持久性和较低的延迟性。
    • acks = 0:Producer 只要把消息发出去,不管发送出去的数据有没有同步完成,都认为这个消息发送成功了。这个选项提供了最低的延迟,但是持久性最差,当服务器发生故障时,就很可能发生数据丢失。
  • min.insync.replicas :Broker 端参数,ISR 列表中最小同步副本数,表示消息至少被写入到多少个副本才算是 “已提交”。默认值是 1,也就意味着一旦消息被写入 Leader 端即被认为是“已提交”。这个参数只有在 acks = all/-1 时才有效。
  • replication.factor :Broker 端参数,用来设置分区的副本数。这个值为 1 表示只有一个副本,也就是只有 Leader;这个值为 2 表示有两个副本,包括一个 Leader 和一个 Follower。默认值是 3,表示每个分区有 1 个 Leader 副本和 2 个 Follower 副本。
💡
这几个参数的取值,是 Kafka 性能和可靠性之间平衡的关键参数。Kafka 官方提供的典型场景,在有三个 Broker 的集群里面,设置参数值如下,可以同时兼顾系统可用性和写入性能。 replication.factor=3 min.insync.replicas=2 acks=all
结合 ISR 机制和上面这几个参数,Producer 发送一条消息的写入过程如下:
  1. 先通过 ZooKeeper get /brokers/topics/partitions/2/state找到该 Partition 的 Leader ,无论该 Topic 的 replication.factor 设置为多少,Producer 只将该消息发送到该 Partition 的 Leader。
  1. Leader 会将该消息写入其本地 Log。每个 Follower 都从 Leader pull 数据。通过这种方式保证 Follower 存储的数据顺序与 Leader 一致。
  1. Follower 在收到该消息并写入其 Log 后,向 Leader 发送 ACK。
  1. 根据 min.insync.replicas 参数配置,当 Leader 收到了 ISR 中最小副本数量的 Replica 的 ACK 之后,该消息就被认为已经 commit 了,Leader 向 Producer 发送 ACK。

3.3 High Watermark(HW)机制

3.3.1 HW机制是什么

回顾上面 Producer 写入消息的过程,我们发现一个问题,当消息写入了 Leader 副本之后,但还没有完全同步到所有 Follower 副本之前(例如只同步到其中某一个 Follower 副本),Leader 副本就挂了,新的 Leader 副本被选出来之后,这条消息该如何处理?Kafka 是如何解决 Leader 副本变更时消息不会出错?这就需要用到高水位(High Watermark)机制了。
首先,我们要明确一下基本的定义:什么是高水位?或者说什么是水位?水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的:
在时刻 T,任意创建时间(Event Time)为T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
“Streaming System”一书则是这样表述水位的:
水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。
为了帮助你更好地理解水位,我借助这本书里的一张图来说明一下。
notion image
图中标注“Completed”的蓝色部分代表已完成的工作,标注“In-Flight”的红色部分代表正在进行中的工作,两者的边界就是水位线
在 Kafka 的世界中,水位的概念有一点不同。Kafka 的水位不是时间戳,更与时间无关。它是和位置信息绑定的,具体来说,它是用消息位移来表述的。下面这个就是某个分区 Leader 副本的高水位图。
notion image
图中有两个很重要的概念:
  • 高水位:在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息,消费者只能消费已提交消息,即图中位移小于 8 的所有消息。位移值等于高水位的消息都属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。
  • 日志末端位移:即 Log End Offset(LEO),表示副本写入下一条消息的位移值。注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15。显然,介于高水位和 LEO 之间的消息就属于未提交消息。这也说明:同一个副本对象,其高水位值不会大于LEO值
高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是Leader副本。只不过Leader副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其Leader副本的高水位

3.3.2 HW的更新机制

现在,我们知道了每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker上,还保存了其他 Follower 副本的 LEO 值。我们一起来看看下面这张图。
notion image
在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。
Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。
为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。HW 机制的同步过程如下图所示:
notion image
首先是初始状态,所有值都是 0。
notion image
  1. 生产者向 Leader 发送了一条消息,Leader 副本成功将消息写入了本地磁盘,更新 LEO = 1。
  1. Follower 向 Leader 发起 fetch 请求,拉取位移值是 0 的消息(fetchOffset = 0)。Follower 在同步完消息也成功地更新 LEO=1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的 HW 依然是 0,还没有被更新,它们需要在下一轮的拉取中被更新。
  1. Follower 再次向 Leader 发起 fetch 请求,由于位移值是 0 的消息已经拉取成功,因此 Follower 这次请求拉取的是位移值是 1 的消息(fetchOffset = 1)。Leader 副本接收到此请求后,先更新远程副本 Remote LEO = 1,然后更新 Leader 的高水位值 HW = 1,最后将已更新过的 HW 值(此时为 1)发送给 Follower 副本。
  1. Follower 接收到以后,也更新自己的高水位值 HW = 1。至此,一次完整的消息同步周期就结束了。
如此大费周章地确定高水位 HW 的值,那么这个 HW 值又有什么作用呢?根据上面的消息写入过程,消息在写入的过程中,会出现消息被写入 Leader 但还没有同步到 Follower 上的中间状态,这种状态的消息明显是还不能被消费者消费的。HW 值的作用就是用来定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。以下面这个例子为例
notion image
  • 在某个时刻,Follower1 的同步完全跟上了 Leader ,同步了消息 3 和消息 4,而 Follower2 只同步了消息 3,这时候 Leader 的 LEO = 5,Follower1 的 LEO = 5,Follower2 的 LEO = 4,那么当前分区的 HW 取最小值 4,此时消费者可以消费到 offset 为 0~3 之间的消息。
  • 当所有副本都成功写入消息 3 和消息 4 之后,整个分区的 HW 和 LEO 都变为 5,因此消费者可以消费到 offset 为 4 的消息了。
假如配置 min.insync.replica=2,这样只需要一个 Follower 复制消息成功,本次消息就算写入成功,但此时消息还不能被消费;当两个 Follower写入成功,此时消息就才能被正常消费。

3.4 Leader Epoch机制

HW 机制既界定了消息的对外可见性,又实现了异步的副本同步机,看似很完美,但是在 Leader 切换的情况下,会存在数据丢失以及数据不一致的问题。假设有副本 A 和 B,其中 B 为 Leader 副本,A 为 Follower 副本,最开始 HW 值都是 1:
notion image
  1. A 进行第二段 fetch 请求,B 收到请求后将 HW 更新为 2,并发送响应。
  1. A 还没处理完响应就崩溃了,即 Follower 没有及时更新 HW 值(此时 A 上的 HW 值还是 1)。A 重启之后,会自动将 LEO 值调整到之前的 HW 值也就是 1,然后 A 会进行日志截断,删除 offsets = 1 的消息,接着会向 B 发送 fetch 请求。
  1. 很不幸的是此时 B 也发生宕机了,A 被选举为新的分区 Leader。
  1. 当 B 重启后,会降级成为 A 的 Follower,然后从 向 A 发送 fetch 请求,从 A 中拿到 HW 值(这个值是 1),并更新本地 HW 值,此时 B 的 HW 值被调整为 1(之前是 2),这时 B 会做日志截断,删除 offsets = 1 的消息。
  1. 至此,offsets = 1 的消息在两个副本上被永久地删除了。
严格来说,这个场景发生的前提是 Broker 端参数 min.insync.replicas 设置为 1。根本原因就是 Follower 同步消息有两轮 fetch 操作,Leader 中保存的 remote LEO 值的更新在第二轮 fetch 请求才能完成,这过程中如果发生了 Leader 切换,就会发生数据丢失以及数据不一致的问题。基于此,Kafka 在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
Kafka 在每个副本目录下都创建一个 leader-epoch-checkpoint 文件,用于保存 Leader 的 epoch 信息。它的格式为 (epoch offset),由两部分数据组成:
  1. Epoch:版本号,一个单调增加的正整数。每当 Leader 发生变更时,epoch 版本都会加 1。
  1. Start Offset:每一代 Leader 副本在该 Epoch 值上写入的第一条消息的位移。
假设现在有两个 Leader Epoch:
  • (0, 0) 是起始 Leader Epoch ,这一代的版本号是 0,起始位移是 0,开始保存消息。
  • 在保存了 120 条消息之后,Leader 发生了变更,版本号增加到 1。offset = 120 这条消息既是 Epoch = 1 的 Start Offset,也是 Epoch = 0 的 Last Offset。
  • 之后产生了新的 Leader Epoch (1, 120) ,这一代的版本号是 1,起始位移是 120。
Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,当 Leader 副本写入消息到磁盘时,Broker 会更新这部分缓存,然后定期地将这些信息持久化到 leader-epoch-checkpoint 文件中。每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移。我们看一下是怎么通过这种方式规避数据丢失和不一致的问题。
notion image
  1. 场景和之前大致是类似的,A 还没收到第二轮 fetch 请求响应前就崩溃了,此时 A 的 HW = 1,LEO = 2,B 的 HW = 2,LEO = 2。
  1. 引用 Leader Epoch 机制后,A 在重启之后,会发送一个特殊的请求 LeaderEpochRequest 请求给 B。
  1. B 会返回一个 LastOffset 值给 A。LastOffset 的取值方式:如果 Follower last epoch = Leader last epoch,则 LastOffset = Leader LEO,否则取大于 Follower last epoch 中最小的 Leader epoch 的 offset 值。假设 Follower last epoch = 1,此时 Leader 有 (1, 20)(2, 80)(3, 120) 三个 Leader epoch,则 LastOffset = 80。在上面这个例子里,B 和 A 的 last epoch 相等(都是 0),所以 B 返回的 LastOffset = 2。
  1. A 获取到 LastOffset 值之后会判断自身的 LEO 值是否大于 LastOffset,如果是的话则从 LastOffset 截断日志。在这个例子里,B 拿到的 LastOffset = 2 后,它本身的 LEO = 2,两者相等,所以并不需要进行日志截断。
  1. 同样的,当 B 重启回来后,执行和 A 相同的逻辑判断,发现也不用进行日志截断。至此 offsets = 1 的消息在两个副本中都得以保存。
  1. 后续在 A 中生成了新的 Leader Epoch (1, 2) ,之后 A 会使用这个新的 Leader Epoch 帮助判断后续是否执行日志截断操作。
Leader Epoch 机制是对 HW 机制的改进,副本是否执行日志截断不再依赖于高水位进行判断,可以解决 Leader 切换特殊情况下的数据丢失问题。
Kafka系列:日志消息Kafka系列:分区机制
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏