type
status
date
slug
summary
tags
category
password
1、概述
Kafka 中主题(Topic)是承载真实数据的逻辑容器。在主题之下还分为若干个分区(Partition),也就是说 Kafka 的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。如下图所示:

Kafka 为什么要设计分区的概念,而不是直接使用主题?
分区的作用是提供负载均衡的能力,实现消息的并行生产和消费。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
在其他分布式系统中也有使用分区的概念,但是叫法有所不同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。
副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上。
下图是有 3 台 Broker 的 Kafka 集群上的副本分布情况。我们可以看到,主题 1 分区 0 的 3 个副本分散在 3 台Broker上,其他主题分区的副本也都散落在不同的 Broker 上,从而实现数据冗余。

Kafka 为什么要在分区的基础上再设计一个副本的概念?
副本的作用是提高系统的可用性。在主分区所在的 Broker 宕机的情况下,仍然不会发生数据丢失,且通过副本快速选主来恢复系统可用性。
2、分区机制
2.1 分区的基本概念
每个主题(Topic)可以被分成一到多个分区(Partition),每个分区是一个有序的、不可变的消息序列。分区是 Kafka 水平扩展的基础单位,不同分区可以分布在不同的 Broker 上。
分区的作用是:
- 提高吞吐量:生产者可以同时向多个分区写入数据,消费者可以组成消费者组并行消费不同分区,读写操作可以并行发生在不同分区上。
- 数据负载均衡:分区数据可以均匀分布在集群的不同 Broker 上,避免单个 Broker 成为性能瓶颈。也可以通过增加分区数量可以提高 Topic 的整体处理能力。
2.2 分区的物理存储
每个分区在磁盘上表现为一个目录。具体存储目录在
server.properties
文件里面设置,找到 log.dirs
配置项,如下所示:这里的
/kafka/data
就是 Kafka 的物理数据目录,在该目录下可以看到以下目录:每个分区在物理上对应一个文件夹,以
topicName_partitionIndex
的命名方式命名。以上目录包含以下分区:__consumer_offsets
:这是 Kafka 自动创建的 topic,用于保存 consumer 的消费 offset 信息,一共有 50 个分区。旧版 Kafka 是将这些信息保存 Zookeeper 里面的,但由于 Zookeeper 并不适合做频繁写入的操作,所以新版 Kafka 将其保存在内部 topic。
test_topic1-2.5d2d9e867e31489d9a4577dafa9d81b0-delete
:用户自定义的主题test_topic1
的第 2 个分区,但是带了delete
后缀,说明对该主题执行了删除操作,状态已经被标记为marked for deletion
,但还没有进行真正删除。
test_topic2-2
:用户自定义的主题test_topic2
的第 2 个分区。
进入
test_topic2-2
目录,包含以下文件一个 partition 由一个或者多个 segment 所构成的。每个 segment 中则保存了真实的消息数据。segment 达到一定大小之后会生成新的 segment,旧的 segment 到达过期时间后会被删除。segment 可以理解为一个逻辑概念,每个 segment 在物理上分为三个文件:
.log
文件:实际消息存储文件,存储的每条数据就是 message。每个 message 都有一个连续的序列号叫做 offset ,用于 partition 唯一标识一条消息。文件名基于第一条消息的 offset(如00000000000000000000.log
)。
.index
文件:位移索引文件,实现相对 offset 与物理地址的映射,用于快速定位消息在.log
文件中的位置。其中文件名为当前 segment + message 的最小 offset。index 每行数据分为两部分:- offset:相对 offset 表示 该 message 相对于其所属数据文件中最小的 offset 的大小。相对 offset 表示消息相对于 baseOffSet 的偏移量,例如分段后的一个日志文件的 baseOffset 是32450,它的文件名就是 32450.log,那么 offset 为 32455 的消息在相对 offset 就是 32455-32450 = 5。
- position:表示该条 message 在数据文件中的绝对物理位置。
.timeindex
文件:时间索引文件,自 0.10.0.1 开始新增的索引,实现相对 offset 和时间戳的映射。

.index
和 .timeindex
这两个索引文件均采用了稀疏索引的方式,每隔一定字节的数据建立一条索引,这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。如下图所示,以 index 文件中的
6,1407
为例,在数据文件中表示第 6 个 message(在全局 partition 表示第 368775 个 message),以及该消息的物理偏移地址为 1407。
假如我么想要读取
offset=1066
的 message:- 通过二分查找文件列表,快速定位到具体的
.index
和.log
文件。因为文件名是当前 segement 的 message 的最小 offset,是假如索引文件依次为 00000000000000000000.index、00000000000000001018.index、00000000000000002042.index 等,当 offset=1066 时定位到 00000000000000001018.index。
- 找到具体的文件后,先计算 offset 在index 中的相对 offset,查找最接近的小于等于相对 offset 的元数据对应物理位置 position, 即 .log 文件中的物理偏移地址。 然后在 log 文件中通过顺序查找到具体的 offset。offset = 1066 在 00000000000000001018.index 中的相对 offset 为 48,查找最接近 48 且小于等于 48 的 index 行假如为 (46, 1467),通过 position 1467 在 log 文件中定位到具体 message,然后通过顺序查找查找 offset 为 1066 的消息。
稀疏索引的缺点是没有建立索引的 Message 也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
2.3 分区策略
Kafka的分区策略决定了消息如何被分配到不同的分区中,以及如何消费分区中的消息。分为两种类型:
- 生产者分区策略:决定生产者将消息发送到哪个分区。
- 消费者分区策略:决定消费者组中的消费者如何分配分区进行消费。
2.3.1 生产者分区策略
生产者分区策略是决定生产者将消息发送到哪个分区的算法。生产端将消息发送给 Broker 之前,会将 producer 发送的数据封装成一个 ProducerRecord 对象。是否依赖分区器看 partition 字段有无指定。

- 如果消息 ProducerRecord 指定了 partition 字段,那么就不需要分区器,直接将指明的值直接作为 partiton 值。
- 如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
总结:Kafka 生产者默认使用以下策略(按优先级顺序):
- 指定分区(partition):如果消息在发送时明确指定了目标分区(partition),直接使用该分区。
- 指定消息键(key):如果消息在发送时没有指明 partition,但是设置了消息键 key,则对 key 进行哈希(默认使用 Murmur2 算法),然后根据哈希值对分区总数取模,来决定消息进入哪个分区。
- 默认分区策略:如果消息在发送时没有指明 partition 值,又没有设置 key 值,则使用默认分区策略(
org.apache.kafka.clients.producer.internals.DefaultPartitioner
)。 - 轮询(Round-Robin) :在 Kafka 2.4 版本之前,生产者会使用轮询(Round-Robin) 策略,将消息依次发送到每个分区以实现绝对均匀的负载。具体实现:第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与分区数取余得到 partition 值,这样可以确保消息均匀分布在所有分区上。
- 粘性分区(Sticky Partitioning):从 Kafka 2.4+ 开始,默认策略改为粘性策略。它会随机选择一个分区,并在一段时间内(或直到批次已满)将所有无 Key 的消息都“粘”在这个分区上。当批次准备好发送或完成后,它会再随机选择另一个分区并“粘”上去。
- 为什么改为粘性分区? 为了提高性能。轮询策略会导致消息被分散到大量的小批次中,增加了网络请求的开销。而粘性策略在短时间内将消息积累到同一个分区的同一个批次中,能形成更大的批次,减少请求次数,显著提升吞吐量。
- 自定义分区策略:如果消息在发送时没有指明 partition 值,且指定了自定义的分区类,则按自定义分区器来得到 partition 值。用户可以实现
org.apache.kafka.clients.producer.Partitioner
接口自定义分区器。
然后在配置文件里面指定自定义分区类的全路径类名。
在这里补充一下,每条消息根据分区策略被 append 到对应 partition 文件的最后,属于顺序写,因此效率非常高。

2.3.2 消费者分区策略
消费者以组的名义订阅主题,消费者组一个消费者会负责消费一到多个分区。消费者分区策略就是用来定义消费者组中的消费者如何分配一个 Topic 下面的所有分区。
Kafka 中着三种消费分区分配策略(通过
partition.assignment.strategy
来设置):1、Range Assignor(范围分配器):早期默认策略,按每个主题进行分配。
- 实现原理:
- 对同一个主题的所有分区按字典序排序
- 将同一个消费者组内的所有消费者按字典序排序(例如
consumer-1
,consumer-2
)。 - 计算每个消费者应该分配的分区数:分区总数除以消费者线程总数。如果除不尽,那么排在前面的消费者线程将会多消费一个分区。 假设
n=分区数/消费者数量
,m=分区数%消费者数量
,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量 - m)个消费者每个分配 n 个分区。

- 优点:实现简单。
- 缺点:容易造成分配不均匀。可以看出, C1 消费者比 C2 消费者共多消费了 2 个分区。针对 1 个 topic 而言,C1 消费者多消费 1 个分区。如果有 N 多个 topic,那么每个 topic 消费者 C1 都将多消费 1 个分区,topic 越多,C1 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了。
2、RoundRobin Assignor(轮询分配器):这个策略尝试将所有主题的所有分区作为一个整体进行轮询分配,以实现更均匀的负载。
- 实现原理:
- 将所有主题的所有分区按照字典序排序。
- 将同一个消费者组内的所有消费者按字典序排序(例如
consumer-1
,consumer-2
)。 - 通过轮询方式逐个将分区以此分配给每个消费者。
- RoundRobin 策略会尽可能保证每个消费者消费的 partion 数量一致。有两种情况:
- 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。 举例,假设消费组中有 2 个消费者 C0 和 C1,都订阅了主题 T1 和 T2,并且每个主题都有3个分区,那么最终的分配结果为:
- 如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候此消费者将分配不到这个 topic 的任何分区。 举例,假设消费组内有 3 个消费者C0、C1 和 C2,它们共订阅了3个主题:T1、T2、T3,这 3 个主题分别有 1、2、3 个分区,即整个消费组订阅了 T1p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。具体而言,消费者 C0 订阅的是主题 T1,消费者 C1 订阅的是主题 T1 和 T2,消费者 C2 订阅的是主题 T1、T2 和 T3,那么最终的分配结果为:


- 优点:在大多数情况下,分区分配比 Range 策略更均匀。
- 缺点:严重依赖于所有消费者订阅相同主题列表这一前提。如果组内消费者订阅的主题不同,分配结果可能非常不均匀,甚至会导致某些消费者被分配不到任何分区。
3、Sticky Assignor (粘性分配器):从 Kafka 0.11 版本开始引入,旨在同时具备均衡性和最大限度的减少重新分配带来的开销。
- 设计目标:
- 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
- 分区的分配尽可能的与上次分配的保持相同,只在必要时调整,减少分区重新分配带来的开销。当两者发生冲突时,第一个目标优先于第二个目标。
- 实现原理:
- 初始分配时,尽量做到均匀分配(类似 RoundRobin)。
- 发生重平衡时,它会尽力“粘住”上一次的分配结果。例如,如果一个消费者宕机,原本由它消费的分区会平均地分配给剩余存活的消费者,而不是全部重新分配。当这个消费者重新加入时,Kafka 会尽量将它之前消费的分区重新分配给它,从而减少分区在消费者间的“漂移”,提升系统稳定性。

- 举例(Rebalance 场景):
- 初始状态: 3个消费者 (C1, C2, C3) 消费 10个分区 (P0-P9),分配均衡。
- C3 宕机: 触发重平衡。
- RoundRobin: 可能会将 P0-P9 全部重新分配给 C1 和 C2。
- Sticky: 会将原本属于 C3 的分区(比如 P6-P9)平滑地转移给 C1 和 C2(例如 C1 分到 P6, P7,C2 分到 P8, P9),而 C1 和 C2 原本的分区 (P0-P5) 保持不变。
- C3 恢复: 再次触发重平衡。
- Sticky: 会尝试将之前属于 C3 的分区(P6-P9)物归原主,分配结果与初始状态完全一致,最大程度减少了不必要的分区移动。
- 优点:
- 分配均衡。
- 极大地减少了重平衡期间的分区停止服务时间,提升了系统的稳定性和性能。这是生产环境最推荐使用的策略。
2.4 分区的Rebalance
2.5 分区与顺序消息
3、副本机制
3.1 Replica机制
副本(Replica)是指分区的拷贝,这些拷贝会被复制到多个 Broker 上,从而提高分区的可用性。在没有副本的情况下,一个分区只存储在一台 Broker 上。如果这台 Broker 宕机了,那么这个分区就变得不可用,所有读写这个分区的客户端都会失败,直到 Broker 恢复。
Kafka 采用基于领导者(Leader-based)的副本机制。
- Leader 副本(Leader Replica):每个分区都有一个唯一的领导者副本。所有客户端的读写请求都必须发往这个 Leader 副本,它负责处理所有生产消费的 I/O 操作。
- Follower 副本(Follower Replica):除了 Leader 副本之外的其他副本都称为 Follower 副本,从 Leader 副本同步数据。Follower 副本是不对外提供服务的,它唯一的任务就是从 Leader 副本异步拉取消息,实现与 Leader 副本的同步。

Follower 副本为什么设计为不对外提供服务?
对于客户端用户而言,Kafka 的 Follower 副本没有任何作用,它既不能像 MySQL 那样帮助 Leader 副本“扛读”,也不能实现将某些副本放到离客户端近的地方来改善数据局部性。既然如此,Kafka 为什么要这样设计呢?简单来说就是为了保证数据的强一致性和避免脏读。
- 如果 Follower 对外提供服务:
- 场景:生产者发送一条消息到 Leader,Leader 将其写入本地日志,并同步给 Follower。Follower A 很快写入了,但 Follower B 因为网络问题还没写入。
- 此时,如果一个消费者从 Follower A 读取,它能读到这条新消息。但另一个消费者从 Follower B 读取,则读不到这条消息。
- 这就导致了不一致: 两个消费者看到了不同的数据状态。更糟糕的是,如果此时 Leader 突然宕机,而 Follower A 还没来得及被选举为新的 Leader(或者 Follower A 的数据是未提交的),那么这条消息实际上会丢失,但从 Follower A 读到它的消费者却以为消息存在。这就是脏读(Dirty Read)。
- Kafka 的解决方案:只从 Leader 读取
- 所有消费者都从一个唯一的、权威的数据源(Leader副本)读取。这样就保证了所有消费者看到的都是同一份已提交的数据视图,实现了线性一致性(Linearizability) 的读操作。消费者永远不会读到一条最终会消失的未提交消息。
3.2 ISR机制
前面我们提到当 Leader 副本挂了,会从 Follower 副本中选出新的 Leader。为了确定新 Leader 候选人的范围,Kafka 引入了 ISR(in-sync replica)的概念。
- ISR:与 Leader 副本保持同步的副本集合(包括 Leader 本身)。每个分区都有自己的一个 ISR 集合,由 Leader 负责维护。
- 这里的“同步”不是指 100% 完全同步,而是指在一定的阈值时间(由
replica.lag.time.max.ms
参数配置,默认 10 秒)内追上了 Leader 的进度。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。 - 处于 ISR 集合中的副本,意味着 Follower 副本与 Leader 副本保持同步状态,这也意味着只有处于 ISR 集合中的副本才有资格被选举为 Leader。
- Kafka 消息只有被 ISR 中的副本都接收到,才被视为“已同步”状态。
- ISR 是一个动态调整的集合,而非静态不变的。倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。
- OSR:非同步副本,Follower 副本落后 Leader 副本的时间间隔超过了阈值(由
replica.lag.time.max.ms
控制),Kafka 就会将该副本踢出 ISR,然后加入 OSR 集合。
- AR:所有副本,即
AR = ISR + OSR
。
例如一个分区有 5 个副本,但是 ISR 列表可能只有 3 个副本,其他 2 个副本由于延迟太高等原因被踢出 ISR 列表,加入 OSR 列表。
3.3 副本机制的工作流程
Kafka 副本机制的工作过程:
- Producer 发送消息:Producer 将消息发送到分区的 Leader 副本。具体过程是先通过 ZooKeeper
get /brokers/topics/partitions/2/state
找到该 Partition 的 Leader ,无论该 Topic 的replication.factor
设置为多少,Producer 只将该消息发送到该 Partition 的 Leader。
- Leader 写入消息:Leader 会将该消息以追加写的方式写入其本地 Log,这时候消息处于未 commit 的状态。
- Follower 同步消息:ISR 集合中的所有 Follower 通过拉取请求(Fetch Request)从 Leader 拉取消息(基于偏移量 offset)。Follower 在拉取到该消息并写入其本地 Log 后,向 Leader 发送 ACK。
- Leader 提交消息:当 Leader 收到了 ISR 中最小副本数量的 Follower 副本的 ACK 之后(由
acks
和min.insync.replicas
配置),该消息就被认为已经 commit 了,Leader 向 Producer 发送 ACK。
- Leader 更新 HW:Leader 会更新其 High Watermark(HW),HW 表示所有 ISR 副本已复制的消息的偏移量 offset,消费者只能读取到 HW 之前的消息。更新完 HW 之后消费者就可以看到这条消息并进行消费。
这里涉及到 Kafka 消息同步的三个非常重要的配置参数,这三个参数对于不丢失消息起到了很大的作用。
acks
:Producer 端参数,用来告诉 Producer 消息是否写入成功,有三个值:acks = all/-1
: Producer 发送消息后 要等 ISR 中的最少数量的副本(由min.insync.replicas
控制)同步消息成功后,才收到这条消息写入成功的响应。这个选项持久性最好,延时性最差。acks = 1
:Producer 发送消息后只需要等 Leader 副本写入数据成功后,就可以收到这条消息写入成功的响应。Kafka 的默认选项,提供了较好的持久性和较低的延迟性。适用于大多数场景。acks = 0
:Producer 发送消息后不等待任何确认。这个选项提供了最低的延迟,但是持久性最差,当服务器发生故障时,就很可能发生数据丢失。适用于允许少量数据丢失的高吞吐场景。
min.insync.replicas
:Broker 端参数,ISR 列表中最小同步副本数,表示消息至少被写入到多少个副本才算是 “已提交”。默认值是 1,也就意味着一旦消息被写入 Leader 端即被认为是“已提交”。这个参数只有在acks = all/-1
时才有效。
replication.factor
:Broker 端参数,用来设置分区的副本数。这个值为 1 表示只有一个副本,也就是只有 Leader;这个值为 2 表示有两个副本,包括一个 Leader 和一个 Follower。默认值是 3,表示每个分区有 1 个 Leader 副本和 2 个 Follower 副本。
这几个参数的取值,是 Kafka 性能和可靠性之间平衡的关键参数。Kafka 官方提供的典型场景,在有三个 Broker 的集群里面,设置参数值如下,可以同时兼顾系统可用性和写入性能。
replication.factor=3
min.insync.replicas=2
acks=all
3.4 Leader副本选举
触发选举的场景:
- 分区当前的 Leader 副本所在的 Broker 宕机(这是最常见的原因)。
- 分区副本进行重分配(例如,使用
kafka-reassign-partitions.sh
工具后)。
- 某个分区的 Leader 副本发生网络分区,与集群断开连接。
- 管理员手动删除了 Leader 副本。
选举过程:
- 触发:Controller 通过监视器(ZooKeeper)或元数据事件(KRaft)检测到某个 Broker 下线。
- 识别:Controller 确定在这个宕机的 Broker 上,有哪些分区是 Leader 分区(即受影响的分区列表)。
- 选举:对于每一个受影响的分区,Controller 会从该分区的 ISR 列表中挑选一个新的 Leader。
- 首选策略:默认情况下,Controller 会直接选择 ISR 列表中的第一个副本作为新 Leader。例如,如果一个分区的 ISR 是
[1, 2, 3]
(1 是原 Leader),那么当 Broker 1 宕机后,Broker 2 会成为新 Leader。这个列表的顺序是固定的,由分区分配决定。 - 这样做的目的是为了最大限度保证数据一致性,因为 ISR 中的副本都拥有最新的数据,可以立即提供服务而不会造成数据丢失。
- 更新:Controller 将新 Leader 和更新后的 ISR(移除了宕机的副本)信息写入 ZooKeeper 或元数据日志。
- 广播:Controller 将最新的元数据变化通知给所有存活的 Broker。
- 生效:生产者(Producers)和消费者(Consumers)从 Broker 获取到新的元数据后,会将请求发送到新的 Leader 副本。
如果某个分区的 所有 ISR 副本都宕机了(这是一个非常极端且危险的情况),此时该怎么办?Kafka 提供了一个配置选项
unclean.leader.election.enable
(默认是 false
):false
(推荐):宁可不可用,也绝不丢数据。Kafka 不会从非 ISR(即不同步的、滞后的)副本中选举 Leader。这意味着该分区将不可用,直到原 ISR 中至少有一个副本恢复并重新上线。这是为了优先保证数据一致性。
true
:宁可丢数据,也要保证可用性。允许从非 ISR(Out-of-Sync Replicas)副本中选举 Leader。这个被选中的副本可能缺少最近的一些消息,从而导致数据丢失。只有在可用性绝对优先于一致性的场景下才应启用此配置。
3.5 HW机制
3.5.1 HW的核心概念
HW(High Watermark,高水位)是一个分区级别的偏移量(Offset)标记,主要用于控制消费者能够读取的消息范围。换句话说,HW 之前的消息被认为是已提交的,而对消费者可见、可消费的。HW 及之后的消息,消费者是看不到的。
首先,我们要明确一下基本的定义:什么是高水位?或者说什么是水位?水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的:
在时刻 T,任意创建时间(Event Time)为T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
“Streaming System”一书则是这样表述水位的:
水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。
为了帮助你更好地理解水位,我借助这本书里的一张图来说明一下。

图中标注“Completed”的蓝色部分代表已完成的工作,标注“In-Flight”的红色部分代表正在进行中的工作,两者的边界就是水位线。
Kafka 中水位的概念有一点不同。Kafka 的水位不是时间戳,而是与消息位移相关,包含两个重要概念:
- LEO(Log End Offset):日志末端位移,每个副本最后一条消息的偏移量(Offset)+ 1。它代表了下一条待写入消息的位置。
- HW:高水位,分区所有 ISR 副本 LEO 的最小值。消费者只能消费 HW(不包含 HW 本身) 以下的消息。

上图中分为几个部分:
- 已提交消息:HW 以下的消息被认为是已提交消息,即图中位移小于 8 的所有消息。消费者只能消费 HW(不包含 HW 本身) 以下的消息。
- 未提交消息:介于 HW (包含) 和 LEO(不含) 之间的消息就属于未提交消息。即图中位移为 8 到 14 之间的消息。这也说明同一个副本对象中 HW 值不会大于 LEO 值。
- 日志末端位移(LEO):注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15。
3.5.2 HW的工作机制
HW 和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的 HW 和 LEO 值,通常情况下 Kafka 使用 Leader 副本的高水位来定义所在分区的 HW。换句话说,分区的 HW 就是其 Leader 副本的 HW。
实际上,在 Leader 副本所在的 Broker上,还保存了其他 Follower 副本的 LEO 值。我们一起来看看下面这张图。

在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。
Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。
为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。HW 的工作过程如下图所示:

首先是初始状态,所有值都是 0。

- 生产者写入:生产者向 Leader 发送了一条消息,Leader 副本成功将消息写入了本地磁盘,更新
LEO = 1
。
- 副本同步:Follower 向 Leader 发起 fetch 请求,拉取位移值是 0 的消息(fetchOffset = 0)。Follower 在同步完消息也成功地更新
LEO = 1
。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的 HW 依然是 0,还没有被更新,它们会在下一轮的拉取中被更新。
- Leader 更新 HW:Leader 根据所有 ISR 副本的 LEO 确定 HW(取所有 ISR 副本 LEO 的最小值)。Follower 再次向 Leader 发起 fetch 请求,由于位移值是 0 的消息已经拉取成功,因此 Follower 这次请求拉取的是位移值是 1 的消息(fetchOffset = 1)。Leader 副本接收到此请求后,先更新远程副本 Remote LEO = 1,然后更新 Leader 的高水位值
HW = 1
,最后将已更新过的 HW 值(此时为 1)发送给 Follower 副本。
- Follower 更新 HW:Follower 接收到以后,也更新自己的高水位值
HW = 1
。至此,一次完整的消息同步周期就结束了。
- 消费者可见性:消费者只能看到 HW(不含) 以下的消息,HW 以上的消息对消费者不可见。
在上面的消息写入过程中,会出现消息被写入 Leader 但还没有同步到 Follower 上的中间状态,这种状态的消息明显是还不能被消费者消费的。所以 HW 值的作用就是用来定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。以下面这个例子为例

- 在某个时刻,Follower1 的同步完全跟上了 Leader ,同步了消息 3 和消息 4,而 Follower2 只同步了消息 3,这时候 Leader 的 LEO = 5,Follower1 的 LEO = 5,Follower2 的 LEO = 4,那么当前分区的 HW 取 ISR 集合的 LEO 的最小值 4,此时消费者可以消费到 offset 为 0~3 之间的消息。
- 当所有副本都成功写入消息 3 和消息 4 之后,整个分区的 HW 和 LEO 都变为 5,因此消费者可以消费到 offset 为 4 的消息了。
假如配置
min.insync.replica = 2
,这样只需要一个 Follower 复制消息成功,本次消息就算写入成功,但此时消息还不能被消费;当两个 Follower 写入成功,此时消息就才能被正常消费。3.6 Leader Epoch机制
Leader Epoch 是 Kafka 0.11 引入的机制,用于解决 HW 机制在在 Leader 切换时可能出现的数据丢失和消息不一致问题。假设有副本 A 和 B,其中 B 为 Leader 副本,A 为 Follower 副本,最开始 HW 值都是 1:

- A 进行第二段 fetch 请求,B 收到请求后将 HW 更新为 2,并发送响应。
- A 还没处理完响应就崩溃了,即 Follower 没有及时更新 HW 值(此时 A 上的 HW 值还是 1)。A 重启之后,会自动将 LEO 值调整到之前的 HW 值也就是 1,然后 A 会进行日志截断,删除 offsets = 1 的消息,接着会向 B 发送 fetch 请求。
- 很不幸的是此时 B 也发生宕机了,A 被选举为新的分区 Leader。
- 当 B 重启后,会降级成为 A 的 Follower,然后从 向 A 发送 fetch 请求,从 A 中拿到 HW 值(这个值是 1),并更新本地 HW 值,此时 B 的 HW 值被调整为 1(之前是 2),这时 B 会做日志截断,删除 offsets = 1 的消息。
- 至此,offsets = 1 的消息在两个副本上被永久地删除了。
严格来说,这个场景发生的前提是 Broker 端参数
min.insync.replicas
设置为 1。根本原因就是 Follower 同步消息有两轮 fetch 操作,Leader 中保存的 remote LEO 值的更新在第二轮 fetch 请求才能完成,这过程中如果发生了 Leader 切换,就会发生数据丢失以及数据不一致的问题。基于此,Kafka 在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。Kafka 在每个副本目录下都创建一个
leader-epoch-checkpoint
文件,用于保存 Leader 的 epoch 信息。它的格式为 (epoch offset)
,由两部分数据组成:- Epoch:版本号,一个单调增加的正整数。每当 Leader 发生变更时,epoch 版本都会加 1。
- Start Offset:每一代 Leader 副本在该 Epoch 值上写入的第一条消息的位移。
假设现在有两个 Leader Epoch:
(0, 0)
是起始 Leader Epoch ,这一代的版本号是 0,起始位移是 0,开始保存消息。
- 在保存了 120 条消息之后,Leader 发生了变更,版本号增加到 1。offset = 120 这条消息既是 Epoch = 1 的 Start Offset,也是 Epoch = 0 的 Last Offset。
- 之后产生了新的 Leader Epoch
(1, 120)
,这一代的版本号是 1,起始位移是 120。
Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,当 Leader 副本写入消息到磁盘时,Broker 会更新这部分缓存,然后定期地将这些信息持久化到
leader-epoch-checkpoint
文件中。每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移。我们看一下是怎么通过这种方式规避数据丢失和不一致的问题。
- 场景和之前大致是类似的,A 还没收到第二轮 fetch 请求响应前就崩溃了,此时 A 的 HW = 1,LEO = 2,B 的 HW = 2,LEO = 2。
- 引用 Leader Epoch 机制后,A 在重启之后,会发送一个特殊的请求 LeaderEpochRequest 请求给 B。
- B 会返回一个 LastOffset 值给 A。LastOffset 的取值方式:如果 Follower last epoch = Leader last epoch,则 LastOffset = Leader LEO,否则取大于 Follower last epoch 中最小的 Leader epoch 的 offset 值。假设 Follower last epoch = 1,此时 Leader 有
(1, 20)
、(2, 80)
、(3, 120)
三个 Leader epoch,则 LastOffset = 80。在上面这个例子里,B 和 A 的 last epoch 相等(都是 0),所以 B 返回的 LastOffset = 2。
- A 获取到 LastOffset 值之后会判断自身的 LEO 值是否大于 LastOffset,如果是的话则从 LastOffset 截断日志。在这个例子里,B 拿到的 LastOffset = 2 后,它本身的 LEO = 2,两者相等,所以并不需要进行日志截断。
- 同样的,当 B 重启回来后,执行和 A 相同的逻辑判断,发现也不用进行日志截断。至此 offsets = 1 的消息在两个副本中都得以保存。
- 后续在 A 中生成了新的 Leader Epoch
(1, 2)
,之后 A 会使用这个新的 Leader Epoch 帮助判断后续是否执行日志截断操作。
Leader Epoch 机制是对 HW 机制的改进,副本是否执行日志截断不再依赖于高水位进行判断,可以解决 Leader 切换特殊情况下的数据丢失问题。
- Author:mcbilla
- URL:http://mcbilla.com/article/26c36ede-d379-4249-b28b-a42d0c5adf83
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts