type
status
date
slug
summary
tags
category
password
1、概述
一个典型的 Kafka 集群结构如下图所示:

- Zookeeper:为 Kafka 提供一致性服务,存储元数据等关键信息。
- Controller:Kafka 的核心组件,主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。Kafka 集群中始终只有一个 Controller。
- Broker:真正存储日志数据的服务器,通常有多个,受 Cotroller 统一管理。
注意,Kafka 在 2.7.2 版本之后逐渐去除对 Zookeeper 的依赖,在 2.8.0 版本之后完全去除了 Zookeeper,引入了基于 Raft 协议的 KRaft 模式。在此模式下,集群中的 Controller 不再只有一个,而是由于一部分 Broker 通过配置文件被指定为 Controller,剩下的则为 Broker。这些 Controller 提供过去由 Zookeeper 提供的共识服务,并且所有的元数据都将存储在 Kafka 主题中并在内部进行管理。
使用 KRaft 的优点是:
- Kafka 不用再依赖外部框架,能够做到独立运行。Controller 此时的角色类似于 Redis 的 Sentinel,它的本质仍然是一个 Kafka 实例。
- Controller 管理集群时,不再需要从 Zookeeper 中先读取数据,因此集群的性能得到一定的提升。
- 由于不再依赖 Zookeeper,Kafka 集群扩展时不用再受到 Zookeeper 读写能力的限制。
- Controller 不再动态选举,而是由配置文件规定。这样可以有针对性的加强 Controller 节点的配置,而不是像以前一样对随机 Controller 节点的高负载束手无策。
可能是出于对稳定性和升级成本的考虑,我接触过的大部分公司使用的还是 2.7.2 版本之前的 Kafka,也就是依赖 Zookeeper 的版本。本文介绍的也是这个版本的 Kafka,对 KRaft 模式感兴趣的同学可以另外找资料进行了解。
2、Zookeeper
在 Kafka 集群中 ZooKeeper 扮演着核心的协调者角色,主要负责集群的元数据管理、消费者组协调、控制器选举、分区和副本状态协调等关键功能。
Zookeeper 存储的元数据目录如下所示:

- admin: 存储管理员接口操作的相关信息,主要为 topic 删除事件,分区迁移事件,优先副本选举,信息(一般为临时节点)
- brokers: 主要存储 broker 相关的信息,broker 节点以及节点上的 topic 相关信息
- cluster: 存储 kafka 集群信息
- config: 存储 broker,client,topic,user 以及 changer 相关的配置信息
- consumers: 存放消费者相关信息(一般为空)
- controller: 用于存放控制节点信息(注意: 该节点是一个临时节点,用于controller节点注册)
- controller_epoch: 用于存放 controller 节点当前的年龄
- isr_change_notification: 临时节点,用于存储 isr 的变更通知。当有 isr 进行变动时,会用于事件通知,可进行 watch 获取集群 isr 状态变更。
- latest_producer_id_block: 该节点用于存储处理事务相关的 pid 范围
- log_dir_event_notification: 日志目录事件通知
以下是各个目录对应的具体作用:
2.1 元数据状态管理
2.1.1 Broker注册和监控
每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到
/brokers/ids
下创建属于自己的节点,如 /brokers/ids/[0...N]
。 Kafka 使用了全局唯一的数字来指代每个 Broker 服务器,不同的 Broker 必须使用不同的 Broker ID 进行注册,创建完节点后,每个 Broker 就会将自己的IP 地址和端口信息记录到该节点中去。其中,Broker 创建的节点类型是临时节点,一旦 Broker 宕机,则对应的临时节点也会被自动删除。
2.1.2 记录Topic和分区的分布情况
同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护,由专门的节点来记录。
Kafka 中每个 Topic 都会以
/brokers/topics/[topic_name]
的形式被记录。Broker 服务器启动后,会到对应 Topic 节点 /brokers/topics
上注册自己的 Broker ID 并写入针对该 Topic 的分区总数,如 /brokers/topics/login/3->2
,这个节点表示 Broker ID 为 3 的一个 Broker 服务器,对于 login 这个 Topic 的消息,提供了 2 个分区进行消息存储,同样,这个分区节点也是临时节点。2.1.3 记录Consumer和分区的关系
在 Kafka 中,规定了每个消息分区只能被同一个消费组的一个消费者进行消费。每个消费者一旦确定了对一个消息分区的消费权力,就会到所属消费组节点下创建临时节点
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
,其中 [broker_id-partition_id]
就是一个消息分区的标识,节点内容就是该消息分区上消费者的 Consumer ID。2.2 消费者组协调
2.2.3 Consumer注册
每个 Consumer Group 注册的时候,Kafka都会为其分配一个全局唯一的 Group ID,然后创建节点路径为
/consumers/{group_id}
,其节点下有两个子节点,分别为 [ids, owners]
。ids
:记录该消费组中当前正在消费的消费者;
owners
:记录该消费组消费的 Topic 信息;
每个 Consumer 注册的时候,Kafka 同样会为每个消费者分配一个 Consumer ID(通常是
Hostname:UUID
),然后到所属消费者组节点下创建临时节点 /consumers/{group_id}/ids/consumer_id
2.2.2 生产者负载均衡
由于同一个 Topic 消息会被分区并将其分布在多个 Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上,那么如何实现生产者的负载均衡,Kafka 支持传统的四层负载均衡,也支持 Zookeeper 方式实现负载均衡。
2.2.3 消费者负载均衡
与生产者类似,Kafka 中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的 Broker 服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的 Topic 下面的消息,互不干扰。
每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对
/consumers/[group_id]/ids
节点注册子节点变化的 Watcher 监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。还对 Broker 服务器变化注册监听,即对 /broker/ids/[0-N]
中的节点进行监听,如果发现 Broker 服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。2.2.4 Consumer消费进度Offset记录(已废弃)
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度 Offset 记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset 在 Zookeeper 中由一个专门节点进行记录,节点路径为
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
,节点内容就是 Offset 的值。在 Kafka 2.0 以后,Offset 信息不再记录于 ZooKeeper,而是保存于 Broker 的 Topic 中,路径是
__consumer_offsets(/brokers/topics/__consumer_offsets)
2.3 控制器选举
Kafka集群需要一个主控制器(Controller)来管理分区和副本的状态(如分区 Leader 选举、副本同步等)。
所有Broker通过ZooKeeper的临时节点
/controller
竞争选举,第一个成功创建该节点的 Broker 成为 Controller。若 Controller 崩溃,ZooKeeper 会触发重新选举。2.4 分区和副本状态协调
2.4.1 ISR管理
分区的 ISR 列表(同步中的副本)由 Controller 监控并同步到 ZooKeeper(
/brokers/topics/[topic]/partitions/[partition]/state
),确保数据一致性。2.4.2 分区Leader选举
- 当某个分区的 Leader 副本失效时,Zookeeper会检测到并触发重新选举,Controller 会通过 watch 机制通知 ISR 中其他副本开始选举过程。
- 提供分布式锁确保同一时间只有一个副本尝试成为 Leader,实现"先到先得"的选举原则,避免脑裂问题。
- 成功选举后,新的 Leader 信息会写入 Zookeeper,所有副本从 Zookeeper 读取最新的 Leader 信息。
3、Controller
控制器组件(Controller)是 Kafka 的大脑,作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。
Kafka 集群启动时多个 Broker 会竞争成为 Controller,第一个会在 Zookeeper 中创建
/controller
临时(EPHEMERAL)节点成功的 Broker 会被指定为 Controlle;创建失败的 Broker 则表示竞选失败,在内存中保存当前 Controller 的 broker_id 值。集群在运行过程中始终只有一个 Controller。Controller 本身也是一个 Broker,所以也会提供 Broker 的数据服务,只是要负责一些额外的工作。Controller 负责的工作有:
- 主题管理:包括创建、删除、增加分区等,换句话说,执行
kafka-topics
脚本时,大部分的后台工作都是控制器来完成的。
- 分区重分配: 当一个新的 broker 刚加入集群时,不会自动地分担己有 topic 的负载,它只会对后续新增的 topic 生效。如果要让新增 broker 为己有的 topic 服务,用户必须手动地调整现有的 topic 的分区分布,将一部分分区搬移到新增 broker 上,根据
kafka-reassign-partitions
脚本提供的对已有主题分区进行细粒度的分配功能,这就是所谓的分区重分配 reassignment。
- 集群 Broker 管理:包括自动检测新增 Broker、Broker 主动关闭、Broker 故障。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的:
- Watch 功能:控制器组件会利用Watch 机制检查 ZooKeeper 的
/brokers/ids
节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在/brokers
下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。 - 临时节点:每个 Broker 启动后,会在
/brokers/ids
下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了。
- Preferred Leader 选举:Preferred Leader 选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。Kafka 集群长时间运行中,Broker 的宕机或崩溃是不可避免的,Leader 就会发生转移,即使 Broker 重新回来,这上面的副本也不会是 Leader了。在众多 Leader 的转移过程中,就会产生 Leader 不均衡现象,可能一小部分 Broker 上有大量的 Leader,影响了整个集群的性能,所以需要平衡一下 Leader 的分配,这就需要 Preferred Leader 选举。
- 数据服务:包括元数据的存储和推送更新。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
当你觉得 Controller 出现问题时,比如主题无法删除了,或者重分区 hang 住了,你不用重启 Broker 或 Controller。有一个简单快速的方式是,去 ZooKeeper 中手动删除
/controller
节点。具体命令是 rmr /controller
。这样做的好处是,既可以引发 Controller 的重选举,又可以避免重启 Broker 导致的消息处理中断。3.1 Controller原理
这一部分简单了解即可。

Controller 内部结构主要由以下 5 部分组成:
- ZK 监听器。
- 定时任务:举个例子,假设我们将
auto.leader.rebalance.enable
参数设为 true,那么就会启动名为auto-leader-rebalance-task
的定时任务来自动维护最优 Replica 的平衡。
- Controller上下文:在 Controller 初始化阶段,从 Zookeeper 中已存储的数据建立,并在 Controller 的生命周期中一直维护。包含集群 Broker 可达性信息,与所有Topic、Partition、Replica的状态信息。
- 事件队列:本质上为 FIFO 的阻塞队列(LinkedBlockingQueue),承载各个监听器、定时任务投递过来的状态变更信息,这些信息都包装为事件。
- 事件处理线程:顾名思义,只有单线程,用来处理各个事件,并将它们的结果反映到 Controller 上下文,以及异步地 propagate 到各个 Broker 中。使用单线程的好处是无需关心多线程的同步,无锁机制可以提升性能。
Controller 工作原理:
- Controller 在选举成功之后会读取 Zookeeper 中各个节点的数据来初始化上下文信息(ControllerContext),然后管理和监控这些上下文信息。
- 当监听器、定时任务或者是其他事件(比如ControlledShutdown)触发时,都会读取或者更新控制器中的上下文信息,Controller 需要将这些变更信息同步到其他普通的 Broker节点中。
- Controller 使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到 LinkedBlockingQueue 中,然后使用一个专用的线程(ControllerEventThread)按照FIFO(First Input First Output, 先入先出)的原则顺序处理各个事件,这样可以不需要锁机制就可以在多线程间维护线程安全。
3.2 Controller故障转移(Failover)
当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到并选举出新的控制器来代替之前失败的控制器。这个过程就被称为Failover,该过程是自动完成的,无需你手动干预。

- 前面我们提到 Controller 会创建临时节点
/controller
,当 Controller 宕机之后,ZooKeeper通过 Watch 机制感知到并删除了/controller
临时节点。之后,所有存活的 Broker 开始竞选新的 Controller。
- Broker 3 最终赢得了选举,成功地在 ZooKeeper上 重建了
/controller
节点。之后,Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中。至此,Controller 的 Failover 完成,可以行使正常的工作职责了。
3.3 Controller脑裂问题
脑裂问题是分布式系统中经常出现的现象,脑裂问题的表现就是同一个集群出现了两个主节点,同时对外提供写服务,客户端不知道该向哪个主节点进行写入,这两个主节点的数据可能无法进行合并,最终会导致数据混乱和数据丢失等问题。
Kafka 中的脑裂问题是由于网络或其他原因导致多个 Broker 认为自己是 Controller,从而导致元数据不一致和分区状态混乱的问题。
Kafka 中发生脑裂问题的两种原因:
- Controller 进行 Full GC 停顿时间太长超过 zookeeper session timeout 出现假死。
- Controller 所在 Broker 网络出现故障。
例如下面这种场景:
- Broker 3 被选举为 Controller,在正常工作了一段时间时候,进入了长时间的 GC 暂停。它的 ZooKeeper 会话过期了,之前注册的
/controller
节点被删除。

- 集群中其他 Broker 会收到 Zookeeper 的这一通知,每个 Broker 都试图尝试成为新的 Controller。假设 Broker2 速度比较快,成为了最新的 Controller Broker。此时,每个 Broker 会收到 Broker2 成为新的 Controller的通知,但是由于 Broker3 正在进行 "stop the world" 的 GC,可能不会收到 Broker2 成为最新的 Controller 的通知。

- 等到 Broker3 的 GC 完成之后,仍会认为自己是集群的 Controller,在 Broker3 的眼中好像什么都没有发生一样。这时候集群内同时出现 Broker2 和 Broker3 两个 Controller。

Kafka 通过 epoch 机制来解决脑裂问题。
- 每次控制器选举时,ZooKeeper会递增
/controller_epoch
的纪元编号(epoch number),epoch number 是单调递增的数字,第一次选出 Controller 时,epoch number 值为 1,如果再次选出新的 Controller,则 epoch number 将为 2,依次单调递增。
- 其他 Broker 在知道当前 epoch number 后,Broker 只接收最大的 epoch number 的 Controller 发来的消息。如果收到由 Controller 发出的包含较旧(较小)的 epoch number 的消息,就会忽略它们。通过这种方式确保集群中只有一个 Controller 的命令会被其他 Broker 所接受。

回到上面这个场景
- Broker3 向 Broker1 发出命令,让 Broker1 上的某个分区副本成为 Leader,该消息的 epoch number 值为 1。
- 于此同时,Broker2 也向 Broker1 发送了相同的命令,不同的是,该消息的 epoch number 值为 2。
- Broker1 发现 Broker2 发送的命令的 epoch number 较大,所以只听从 Broker2 的命令,会忽略 Broker3 的命令。即使集群里面仍然可能存在两个 Controller,但是只有一个 Controller 的命令会被其他 Broker 接受,从而避免了脑裂问题带来的混乱。
- Author:mcbilla
- URL:http://mcbilla.com/article/38d043c2-9153-4b22-981c-4b835cd37a10
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts