type
status
date
slug
summary
tags
category
password
1、概述
Kafka 中主题(Topic)是承载真实数据的逻辑容器。在主题之下还分为若干个分区(Partition),也就是说 Kafka 的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。如下图所示:

Kafka 为什么要设计分区的概念,而不是直接使用主题?
分区的作用是提供负载均衡的能力,实现消息的并行生产和消费。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
在其他分布式系统中也有使用分区的概念,但是叫法有所不同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫 Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。
副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上。
下图是有 3 台 Broker 的 Kafka 集群上的副本分布情况。我们可以看到,主题 1 分区 0 的 3 个副本分散在 3 台Broker上,其他主题分区的副本也都散落在不同的 Broker 上,从而实现数据冗余。

Kafka 为什么要在分区的基础上再设计一个副本的概念?
副本的作用是提高系统的可用性。在主分区所在的 Broker 宕机的情况下,仍然不会发生数据丢失,且通过副本快速选主来恢复系统可用性。
2、分区机制
2.1 分区的基本概念
每个主题(Topic)可以被分成一到多个分区(Partition),每个分区是一个有序的、不可变的消息序列。分区是 Kafka 水平扩展的基础单位,不同分区可以分布在不同的 Broker 上。
分区的作用是:
- 提高吞吐量:生产者可以同时向多个分区写入数据,消费者可以组成消费者组并行消费不同分区,读写操作可以并行发生在不同分区上。
- 数据负载均衡:分区数据可以均匀分布在集群的不同 Broker 上,避免单个 Broker 成为性能瓶颈。也可以通过增加分区数量可以提高 Topic 的整体处理能力。
2.2 分区的物理存储
每个分区在磁盘上表现为一个目录。具体存储目录在
server.properties
文件里面设置,找到 log.dirs
配置项,如下所示:这里的
/kafka/data
就是 Kafka 的物理数据目录,在该目录下可以看到以下目录:每个分区在物理上对应一个文件夹,以
topicName_partitionIndex
的命名方式命名。以上目录包含以下分区:__consumer_offsets
:这是 Kafka 自动创建的 topic,用于保存 consumer 的消费 offset 信息,一共有 50 个分区。旧版 Kafka 是将这些信息保存 Zookeeper 里面的,但由于 Zookeeper 并不适合做频繁写入的操作,所以新版 Kafka 将其保存在内部 topic。
test_topic1-2.5d2d9e867e31489d9a4577dafa9d81b0-delete
:用户自定义的主题test_topic1
的第 2 个分区,但是带了delete
后缀,说明对该主题执行了删除操作,状态已经被标记为marked for deletion
,但还没有进行真正删除。
test_topic2-2
:用户自定义的主题test_topic2
的第 2 个分区。
进入
test_topic2-2
目录,包含以下文件一个 partition 由一个或者多个 segment 所构成的。每个 segment 中则保存了真实的消息数据。segment 达到一定大小之后会生成新的 segment,旧的 segment 到达过期时间后会被删除。segment 可以理解为一个逻辑概念,每个 segment 在物理上分为三个文件:
.log
文件:实际消息存储文件,存储的每条数据就是 message。每个 message 都有一个连续的序列号叫做 offset ,用于 partition 唯一标识一条消息。文件名基于第一条消息的 offset(如00000000000000000000.log
)。
.index
文件:位移索引文件,实现相对 offset 与物理地址的映射,用于快速定位消息在.log
文件中的位置。其中文件名为当前 segment 的 message 的最小 offset。index 每行数据分为 offset 和 position 两个部分:- offset:相对 offset 表示 该 message 相对于其所属数据文件中最小的 offset 的大小。相对 offset 表示消息相对于 baseOffSet 的偏移量,例如分段后的一个日志文件的 baseOffset 是32450,它的文件名就是 32450.log,那么 offset 为 32455 的消息在相对 offset 就是 32455-32450 = 5。
- position:表示该条 message 在数据文件中的绝对物理位置。
.timeindex
文件:时间索引文件,自 0.10.0.1 开始新增的索引,实现相对 offset 和时间戳的映射。

.index
和 .timeindex
这两个索引文件均采用了稀疏索引的方式,每隔一定字节的数据建立一条索引,这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。如下图所示,以 index 文件中的
6,1407
为例,在数据文件中表示第 6 个 message(在全局 partition 表示第 368775 个 message),以及该消息的物理偏移地址为 1407。
假如我么想要读取
offset=1066
的 message:- 通过二分查找文件列表,快速定位到具体的
.index
和.log
文件。因为文件名是当前 segement 的 message 的最小 offset,是假如索引文件依次为 00000000000000000000.index、00000000000000001018.index、00000000000000002042.index 等,当 offset=1066 时定位到 00000000000000001018.index|log。
- 找到具体的文件后,先计算 offset 在index 中的相对 offset,查找最接近的小于等于相对 offset 的元数据对应物理位置 position, 即 .log 文件中的物理偏移地址。 然后在 log 文件中通过顺序查找到具体的 offset。offset = 1066 在 00000000000000001018.index 中的相对 offset 为 48,查找最接近 48 且小于等于 48 的 index 行假如为 (46, 1467),通过 position 1467 在 log 文件中定位到具体 message,然后通过顺序查找查找 offset 为 1066 的消息。
稀疏索引的缺点是没有建立索引的 Message 也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
2.3 分区策略
Kafka的分区策略决定了消息如何被分配到不同的分区中,以及如何消费分区中的消息。分为两种类型:
- 生产者分区策略:决定生产者将消息发送到哪个分区。
- 消费者分区策略:决定消费者组中的消费者如何分配分区进行消费。
2.3.1 生产者分区策略
生产者分区策略是决定生产者将消息发送到哪个分区的算法。生产端将消息发送给 Broker 之前,会将 producer 发送的数据封装成一个 ProducerRecord 对象。是否依赖分区器看 partition 字段有无指定。

- 如果消息 ProducerRecord 指定了 partition 字段,那么就不需要分区器,直接将指明的值直接作为 partiton 值。
- 如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
Kafka 生产者的默认分区策略是:
- 默认分区策略(Round Robin):既没有 partition 值又没有 key 值的情况下,使用第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,这样可以确保消息均匀分布在所有分区上。Kafka 提供了默认的分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
。
- Key 哈希策略:没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。适用于相同 key 保证顺序性的场景。
- 固定分区策略:指明 partition 的情况下,直接将指明的值作为 partiton 值。
- 自定义分区策略:既没有 partition 值又指定了自定义的分区类,则按自定义分区器来得到 partition 值。用户也可以实现
org.apache.kafka.clients.producer.Partitioner
接口自定义分区器。
然后在配置文件里面指定自定义分区类的全路径类名。
在这里补充一下,每条消息根据分区策略被 append 到对应 partition 文件的最后,属于顺序写,因此效率非常高。

2.3.2 消费者分区策略
消费者以组的名义订阅主题,消费者组一个消费者会负责消费一到多个分区。消费者分区策略就是用来定义消费者组中的消费者如何分配分区进行消费。
Kafka 中着三种消费分区分配策略(通过
partition.assignment.strategy
来设置):- Range 策略:默认模式,按分区范围分配给消费者。首先对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用 Partitions 分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。 假设
n=分区数/消费者数量
,m=分区数%消费者数量
,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量 - m)个消费者每个分配 n 个分区。

可以看出, C1 消费者比 C2 消费者共多消费了 2 个分区。针对 1 个 topic 而言,C1 消费者多消费 1 个分区。如果有 N 多个 topic,那么每个 topic 消费者 C1 都将多消费 1 个分区,topic 越多,C1 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了。
- RoundRobin 策略:轮询分配,将消费组内所有消费者以及消费者所订阅的所有 topic 的 partition 按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobin 策略会尽可能保证每个消费者消费的 partion 数量一致。RoundRobin 的两种情况:
- 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。 举例,假设消费组中有 2 个消费者 C0 和 C1,都订阅了主题 T1 和 T2,并且每个主题都有3个分区,那么最终的分配结果为:
- 如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候此消费者将分配不到这个 topic 的任何分区。 举例,假设消费组内有 3 个消费者C0、C1 和 C2,它们共订阅了3个主题:T1、T2、T3,这 3 个主题分别有 1、2、3 个分区,即整个消费组订阅了 T1p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。具体而言,消费者 C0 订阅的是主题 T1,消费者 C1 订阅的是主题 T1 和 T2,消费者 C2 订阅的是主题 T1、T2 和 T3,那么最终的分配结果为:


- Sticky 策略:从 0.11.x 版本开始,尽可能保留之前的分配结果,只在必要时调整,减少分区重新分配带来的开销。它的作用有:
- 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
- 分区的分配尽可能的与上次分配的保持相同。 当两者发生冲突时,第一个目标优先于第二个目标。

这样初看上去似乎与采用 RoundRobin 策略所分配的结果相同,但是其优点是当发现分区重分配时,会尽可能的与上次分配的保持相同。 如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。Sticky 策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。
2.4 分区的Rebalance
2.5 分区与消息顺序
Kafka在以下级别提供顺序保证:
- 分区内有序:同一分区内的消息按写入顺序存储
- 跨分区无序:不同分区的消息不保证顺序
对于需要严格控制消息顺序的场景,可以使用以下策略:
- Topic 设置成单分区,可以保证发送到该 Topic 的所有消息的顺序性。
- 生产者在发送消息时指定 partition 的值,将需要顺序处理的所有消息发送到同一个分区。在生产中可以为每个用户指定 partition(例如用户号对分区数取余,得到的值就是 partition),这样可以保证该用户的所有消息都发送到同一个分区。这样既可以为 Topic 设置多个分区提高吞吐量,也可以保证单个用户的消息顺序性。
- 通过设置消息 Key,确保相同 Key 的消息进入同一分区。
3、副本机制
为了保证同一个分区的多个副本数据保持一致,Kafka 采用基于领导者(Leader-based)的副本机制。
- 副本(Replica):每个分区的数据会被复制到多个 Broker 上,这些复制品称为副本。
- Leader 副本:每个分区有一个 Leader 副本,负责处理所有读写请求。
- Follower 副本:其余的副本自动称为 Follower 副本,从 Leader 同步数据。Follower 副本是不对外提供服务的。也就是,所有的读写请求都必须发往 Leader 副本所在的 Broker,由该 Broker 负责处理。Follower 副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。

当 Leader 副本挂掉了,或者说 Leader 副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的 Leader 选举,从 Follower 副本中选一个作为新的 Leader。老 Leader 副本重启回来后,只能作为 Follower 副本加入到集群中。
对于客户端用户而言,Kafka 的 Follower 副本没有任何作用,它既不能像 MySQL 那样帮助 Leader 副本“扛读”,也不能实现将某些副本放到离客户端近的地方来改善数据局部性。既然如此,Kafka 为什么要这样设计呢?其实这种副本机制有两个方面的好处。
- 方便实现“Read-your-writes”。所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许 Follower 副本对外提供服务,由于副本同步是异步的,因此有可能出现 Follower 副本还没有从 Leader 副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
- 方便实现单调读(Monotonic Reads)。什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。如果允许 Follower 副本提供读服务,那么假设当前有 2 个 Follower 副本 F1 和 F2,它们异步地拉取 Leader 副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。
3.1 ISR机制
前面我们提到当 Leader 副本挂了,会从 Follower 副本中选出新的 Leader。为了确定新 Leader 候选人的范围,Kafka 引入了 ISR(in-sync replica)的概念。
- ISR:与分区 Leader 保持同步的副本集合(包括 Leader 本身)。每个分区都有自己的一个 ISR 集合,由 Leader 负责维护。处于 ISR 集合中的副本,意味着 Follower 副本与 Leader 副本保持同步状态,这也意味着只有处于 ISR 集合中的副本才有资格被选举为 Leader。另外 Kafka 消息只有被 ISR 中的副本都接收到,才被视为“已同步”状态。
- OSR:非同步副本,Follower 同步延迟超过阈值,从 ISR 踢出,然后加入 OSR 集合。
- AR:所有副本,即
AR = ISR + OSR
。
例如一个分区有 5 个副本,但是 ISR 列表可能只有 3 个副本,其他 2 个副本由于延迟太高等原因被踢出 ISR 列表,加入 OSR 列表。
那么什么情况下副本会被踢出 ISR 集合呢?这个由 Broker 端参数
replica.lag.time.max.ms
的参数值控制。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。如果超过了这个阈值,Kafka 就会将该副本“踢出”ISR。倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的。
3.2 消息的同步过程
Kafka 的消息同步过程主要通过副本机制和 ISR 机制来实现,过程如下:
- Producer 发送消息:
- Producer 将消息发送到分区的 Leader 副本。具体过程是先通过 ZooKeeper
get /brokers/topics/partitions/2/state
找到该 Partition 的 Leader ,无论该 Topic 的replication.factor
设置为多少,Producer 只将该消息发送到该 Partition 的 Leader。 - Leader 会将该消息以追加写的方式写入其本地 Log。
- Follower 同步消息:
- Follower 通过拉取请求(Fetch Request)从 Leader 拉取消息(基于偏移量 offset)。Follower 在拉取到该消息并写入其本地 Log 后,向 Leader 发送 ACK。
- Leader 会维护一个 High Watermark(HW),表示所有 ISR 副本已复制的消息的偏移量 offset,消费者只能读取到 HW 之前的消息。
- 确认提交:
- Leader 等待所有 ISR 副本确认收到消息(取决于
acks
和min.insync.replicas
配置)。当 Leader 收到了 ISR 中最小副本数量的 Replica 的 ACK 之后,该消息就被认为已经 commit 了,Leader 向 Producer 发送 ACK。
这里涉及到 Kafka 消息同步的三个非常重要的配置参数,这三个参数对于不丢失消息起到了很大的作用。
acks
:Producer 端参数,用来告诉 Producer 消息是否写入成功,有三个值:acks = all/-1
: Producer 发送消息后 要等 ISR 中的最少数量的副本(由min.insync.replicas
控制)同步消息成功后,才收到这条消息写入成功的响应。这个选项持久性最好,延时性最差。acks = 1
:Producer 发送消息后只需要等 Leader 副本写入数据成功后,就可以收到这条消息写入成功的响应。Kafka 的默认选项,提供了较好的持久性和较低的延迟性。适用于大多数场景。acks = 0
:Producer 发送消息后不等待任何确认。这个选项提供了最低的延迟,但是持久性最差,当服务器发生故障时,就很可能发生数据丢失。适用于允许少量数据丢失的高吞吐场景。
min.insync.replicas
:Broker 端参数,ISR 列表中最小同步副本数,表示消息至少被写入到多少个副本才算是 “已提交”。默认值是 1,也就意味着一旦消息被写入 Leader 端即被认为是“已提交”。这个参数只有在acks = all/-1
时才有效。
replication.factor
:Broker 端参数,用来设置分区的副本数。这个值为 1 表示只有一个副本,也就是只有 Leader;这个值为 2 表示有两个副本,包括一个 Leader 和一个 Follower。默认值是 3,表示每个分区有 1 个 Leader 副本和 2 个 Follower 副本。
这几个参数的取值,是 Kafka 性能和可靠性之间平衡的关键参数。Kafka 官方提供的典型场景,在有三个 Broker 的集群里面,设置参数值如下,可以同时兼顾系统可用性和写入性能。
replication.factor=3
min.insync.replicas=2
acks=all
3.3 HW机制
3.3.1 HW的核心概念
HW(High Watermark,高水位)机制是保证数据一致性和可靠性的重要机制,主要用于控制消费者能够读取的消息范围。
首先,我们要明确一下基本的定义:什么是高水位?或者说什么是水位?水位一词多用于流式处理领域,比如,Spark Streaming 或 Flink 框架中都有水位的概念。教科书中关于水位的经典定义通常是这样的:
在时刻 T,任意创建时间(Event Time)为T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。
“Streaming System”一书则是这样表述水位的:
水位是一个单调增加且表征最早未完成工作(oldest work not yet completed)的时间戳。
为了帮助你更好地理解水位,我借助这本书里的一张图来说明一下。

图中标注“Completed”的蓝色部分代表已完成的工作,标注“In-Flight”的红色部分代表正在进行中的工作,两者的边界就是水位线。
Kafka 中水位的概念有一点不同。Kafka 的水位不是时间戳,而是与消息位移相关,包含两个重要概念:
- LEO(Log End Offset):日志末端位移,每个副本最后一条消息的 offset。
- HW:高水位,分区所有 ISR 副本 LEO 的最小值。消费者只能消费 HW(不包含 HW 本身) 以下的消息。

上图中分为几个部分:
- 已提交消息:HW 以下的消息被认为是已提交消息,即图中位移小于 8 的所有消息。消费者只能消费 HW(不包含 HW 本身) 以下的消息。
- 未提交消息:介于 HW (包含) 和 LEO(不含) 之间的消息就属于未提交消息。即图中位移为 8 到 14 之间的消息。这也说明同一个副本对象中 HW 值不会大于 LEO 值。
- 日志末端位移:注意,数字 15 所在的方框是虚线,这就说明,这个副本当前只有 15 条消息,位移值是从 0 到 14,下一条新消息的位移是 15。
3.3.2 HW的工作机制
HW 和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的 HW 和 LEO 值,通常情况下 Kafka 使用 Leader 副本的高水位来定义所在分区的 HW。换句话说,分区的 HW 就是其 Leader 副本的 HW。
实际上,在 Leader 副本所在的 Broker上,还保存了其他 Follower 副本的 LEO 值。我们一起来看看下面这张图。

在这张图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。
Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值,也就是我在图中标记为灰色的部分。
为什么要在 Broker 0 上保存这些远程副本呢?其实,它们的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位。HW 的工作过程如下图所示:

首先是初始状态,所有值都是 0。

- 生产者写入:生产者向 Leader 发送了一条消息,Leader 副本成功将消息写入了本地磁盘,更新
LEO = 1
。
- 副本同步:Follower 向 Leader 发起 fetch 请求,拉取位移值是 0 的消息(fetchOffset = 0)。Follower 在同步完消息也成功地更新
LEO = 1
。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的 HW 依然是 0,还没有被更新,它们会在下一轮的拉取中被更新。
- Leader 更新 HW:Leader 根据所有 ISR 副本的 LEO 确定 HW(取所有 ISR 副本 LEO 的最小值)。Follower 再次向 Leader 发起 fetch 请求,由于位移值是 0 的消息已经拉取成功,因此 Follower 这次请求拉取的是位移值是 1 的消息(fetchOffset = 1)。Leader 副本接收到此请求后,先更新远程副本 Remote LEO = 1,然后更新 Leader 的高水位值
HW = 1
,最后将已更新过的 HW 值(此时为 1)发送给 Follower 副本。
- Follower 更新 HW:Follower 接收到以后,也更新自己的高水位值
HW = 1
。至此,一次完整的消息同步周期就结束了。
- 消费者可见性:消费者只能看到 HW(不含) 以下的消息,HW 以上的消息对消费者不可见。
在上面的消息写入过程中,会出现消息被写入 Leader 但还没有同步到 Follower 上的中间状态,这种状态的消息明显是还不能被消费者消费的。所以 HW 值的作用就是用来定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。以下面这个例子为例

- 在某个时刻,Follower1 的同步完全跟上了 Leader ,同步了消息 3 和消息 4,而 Follower2 只同步了消息 3,这时候 Leader 的 LEO = 5,Follower1 的 LEO = 5,Follower2 的 LEO = 4,那么当前分区的 HW 取 ISR 集合的 LEO 的最小值 4,此时消费者可以消费到 offset 为 0~3 之间的消息。
- 当所有副本都成功写入消息 3 和消息 4 之后,整个分区的 HW 和 LEO 都变为 5,因此消费者可以消费到 offset 为 4 的消息了。
假如配置
min.insync.replica = 2
,这样只需要一个 Follower 复制消息成功,本次消息就算写入成功,但此时消息还不能被消费;当两个 Follower 写入成功,此时消息就才能被正常消费。3.4 Leader Epoch机制
Leader Epoch 是 Kafka 0.11 引入的机制,用于解决 HW 机制在在 Leader 切换时可能出现的数据丢失和消息不一致问题。假设有副本 A 和 B,其中 B 为 Leader 副本,A 为 Follower 副本,最开始 HW 值都是 1:

- A 进行第二段 fetch 请求,B 收到请求后将 HW 更新为 2,并发送响应。
- A 还没处理完响应就崩溃了,即 Follower 没有及时更新 HW 值(此时 A 上的 HW 值还是 1)。A 重启之后,会自动将 LEO 值调整到之前的 HW 值也就是 1,然后 A 会进行日志截断,删除 offsets = 1 的消息,接着会向 B 发送 fetch 请求。
- 很不幸的是此时 B 也发生宕机了,A 被选举为新的分区 Leader。
- 当 B 重启后,会降级成为 A 的 Follower,然后从 向 A 发送 fetch 请求,从 A 中拿到 HW 值(这个值是 1),并更新本地 HW 值,此时 B 的 HW 值被调整为 1(之前是 2),这时 B 会做日志截断,删除 offsets = 1 的消息。
- 至此,offsets = 1 的消息在两个副本上被永久地删除了。
严格来说,这个场景发生的前提是 Broker 端参数
min.insync.replicas
设置为 1。根本原因就是 Follower 同步消息有两轮 fetch 操作,Leader 中保存的 remote LEO 值的更新在第二轮 fetch 请求才能完成,这过程中如果发生了 Leader 切换,就会发生数据丢失以及数据不一致的问题。基于此,Kafka 在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。Kafka 在每个副本目录下都创建一个
leader-epoch-checkpoint
文件,用于保存 Leader 的 epoch 信息。它的格式为 (epoch offset)
,由两部分数据组成:- Epoch:版本号,一个单调增加的正整数。每当 Leader 发生变更时,epoch 版本都会加 1。
- Start Offset:每一代 Leader 副本在该 Epoch 值上写入的第一条消息的位移。
假设现在有两个 Leader Epoch:
(0, 0)
是起始 Leader Epoch ,这一代的版本号是 0,起始位移是 0,开始保存消息。
- 在保存了 120 条消息之后,Leader 发生了变更,版本号增加到 1。offset = 120 这条消息既是 Epoch = 1 的 Start Offset,也是 Epoch = 0 的 Last Offset。
- 之后产生了新的 Leader Epoch
(1, 120)
,这一代的版本号是 1,起始位移是 120。
Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,当 Leader 副本写入消息到磁盘时,Broker 会更新这部分缓存,然后定期地将这些信息持久化到
leader-epoch-checkpoint
文件中。每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移。我们看一下是怎么通过这种方式规避数据丢失和不一致的问题。
- 场景和之前大致是类似的,A 还没收到第二轮 fetch 请求响应前就崩溃了,此时 A 的 HW = 1,LEO = 2,B 的 HW = 2,LEO = 2。
- 引用 Leader Epoch 机制后,A 在重启之后,会发送一个特殊的请求 LeaderEpochRequest 请求给 B。
- B 会返回一个 LastOffset 值给 A。LastOffset 的取值方式:如果 Follower last epoch = Leader last epoch,则 LastOffset = Leader LEO,否则取大于 Follower last epoch 中最小的 Leader epoch 的 offset 值。假设 Follower last epoch = 1,此时 Leader 有
(1, 20)
、(2, 80)
、(3, 120)
三个 Leader epoch,则 LastOffset = 80。在上面这个例子里,B 和 A 的 last epoch 相等(都是 0),所以 B 返回的 LastOffset = 2。
- A 获取到 LastOffset 值之后会判断自身的 LEO 值是否大于 LastOffset,如果是的话则从 LastOffset 截断日志。在这个例子里,B 拿到的 LastOffset = 2 后,它本身的 LEO = 2,两者相等,所以并不需要进行日志截断。
- 同样的,当 B 重启回来后,执行和 A 相同的逻辑判断,发现也不用进行日志截断。至此 offsets = 1 的消息在两个副本中都得以保存。
- 后续在 A 中生成了新的 Leader Epoch
(1, 2)
,之后 A 会使用这个新的 Leader Epoch 帮助判断后续是否执行日志截断操作。
Leader Epoch 机制是对 HW 机制的改进,副本是否执行日志截断不再依赖于高水位进行判断,可以解决 Leader 切换特殊情况下的数据丢失问题。
- Author:mcbilla
- URL:http://mcbilla.com/article/26c36ede-d379-4249-b28b-a42d0c5adf83
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts