type
status
date
slug
summary
tags
category
icon
password

1、概述

前面我们提到 Kafka 集群里面有 Broker、Producer 和 Consumer 三种角色,为什么要给 Consumer 单独开一个篇章进行介绍呢?因为 Consumer 涉及的内容比较多,这些内容都是生产中经常接触到,或者容易引发问题的地方。本文重点介绍以下几个知识点:
  • Consumer Group
  • __consumer_offsets
  • Consumer 位移提交
  • Rebalance

2、Consumer Group

Consumer Group 我们前面已经介绍过很多次了。用一句话概括就是:Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。Consumer Group 的特点。
  1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
  1. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  1. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。
  1. Consumer Group之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。
  1. 理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。如果 Consumer 实例数少于分区总数,某个 Consumer 实例会被分配多个分区进行消费;不推荐设置大于总分区数的 Consumer 实例,多余的实例将不会被分配任何分区,只会浪费资源。

3、__consumer_offsets

针对 Consumer Group,Kafka是怎么管理位移的呢?消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在 Kafka 中,这个位置信息有个专门的术语:位移(Offset)。
看上去该 Offset 就是一个数值而已,其实对于 Consumer Group 而言,它是一组 KV 对,Key是 分区,V对应 Consumer 消费该分区的最新位移。如果用 Java 来表示的话,你大致可以认为是这样的数据结构,即 Map,其中 TopicPartition 表示一个分区,而 Long 表示位移的类型。
老版本的 Consumer Group 把位移保存在ZooKeeper中。Apache ZooKeeper是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。现在比较流行的提法是将服务器节点做成无状态的,这样可以自由地扩缩容,实现超强的伸缩性。Kafka 最开始也是基于这样的考虑,才 将Consumer Group 位移保存在独立于 Kafka 集群之外的框架中。
不过,慢慢地人们发现了一个问题,即 ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社区渐渐有了这样的共识:将 Consumer 位移保存在 ZooKeeper 中是不合适的做法。
Kafka 从 0.9 版本开始,开始将位移保存在 Kafka 内部主题 __consumer_offsets。位移主题也是普通的 Kafka 主题,用户可以手动地创建它、修改它,甚至是删除它,但是不能向这个主题写入消息。因为一旦你写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。我们通常情况下并不需要管这个主题,Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息。
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。位移主题的分区数由 Broker 端参数 offsets.topic.num.partitions 确定,默认是 50。副本数则由 Broker 端另一个参数 offsets.topic.replication.factor 决定,默认值是 3。因此 Kafka 会自动创建一个 50 个分区的位移主题,副本数是 3。在 Kafka 的数据目录下会出现以下文件夹:
位移主题的每条消息格式大致如图所示:
notion image
可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id + topic + partition,而 value 就是 offset 的值。Kafka 对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions,从而将消费者组分散到不同的位移主题分区上。
来看看几个常见的问题:
1、如何确认消费者组在哪个__consumer_offsets?
答案是 Math.abs(groupID.hashCode()) % numPartitions
2、如果查找消费者组在位移分区数中的偏移量 offset?
先通过上面计算方式确认好消费者组所在的分区,假如szz-group消费组的偏移量信息存放在 __consumer_offsets_32中,执行以下命令
输出如下:
notion image
:: 前面是 key,由 消费组+Topic+分区数 确定;后面是 value,包含了消费组的偏移量信息等等

4、Consumer 位移提交

什么地方会用到位移主题呢?答案是 Consumer 在提交位移时会写入位移主题。目前 Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。

4.1 自动提交位移

Consumer 端有个参数 enable.auto.commit,如果设置为 true(默认为 true),Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。提交间隔由参数 auto.commit.interval.ms 来控制,默认是 5s。自动提交位移的优点就是省事,不需要担心位移提交的问题,但是丧失了灵活性和可控性。
自动提交位移还有另外一个缺点,即使当前位移主题没有消息可以消费了,位移主题中还是会不停地写入最新位移的消息。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。
Kafka 使用 Compact 策略来删除位移主题中的过期消息。Kafka 提供了专门的后台线程 Log Cleaner 定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。
notion image
很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

4.2 手动提交位移

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。我们可以把 enable.auto.commit 设置为 false,然后采用手动提交位移的方式来提交位移。
一旦设置了 false,作为 Consumer 应用开发的你就要承担起位移提交的责任。处理完了 poll() 方法返回的所有消息之后再提交位移,否则有可能出现消息丢失(消息没有处理完或出现异常)。
Kafka Consumer API 主要提供了两种手动提交位移的方式:

4.2.1 同步提交API

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,失败了会自动重试。例如下面的代码:
同步提交位移的问题在于,它可能会出现重复消费。在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

4.2.2 异步提交API

调用 commitAsync() 之后,会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。例如下面的代码:
异步提交位移的问题在于,它提交失败了不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义。

4.2.3 同步+异步提交

所以如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能达到最理想的效果,原因有两个:
  • 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  • 另外我们不希望程序总处于阻塞状态,影响 TPS。

4.2.4 批量提交

上面提到的手动位移提交,都是提交 poll 方法返回的所有消息的位移,比如 poll 方法一次返回了500条消息,当你处理完这 500 条消息之后,前面我们提到的各种方法会一次性地将这 500 条消息的位移一并处理。简单来说,就是直接提交最新一条消息的位移
但如果我想更加细粒度化地提交位移,该怎么办呢?设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。例如我们希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。
这时候就需要用到下面这两个批量提交的方法:
  • commitSync(Map<TopicPartition, OffsetAndMetadata>) 
  •  commitAsync(Map<TopicPartition, OffsetAndMetadata>)
这两个 API 可以实现批量提交一组位移信息,而不是一个个提交。以一个具体例子看,下面的代码实现了每 100 条消息提交一次:

5、Rebalance

下面来介绍一下大名鼎鼎的 Rebalance。Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅Topic的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

5.1 Coordinator

在 Rebalance 过程中,所有消费者实例共同参与,在协调者帮助下完成订阅分区的分配。这个协调者是 Kafka 中对应的术语是 Coordinator,由某台 Broker 担任,专为 Consumer Group 执行 Rebalance 以及提供提供位移管理和组成员管理等。
Kafka 里面 Coordinator 和 Controller 虽然都是 Broker 来担任的角色,但是两个不同的概念:
  • Controller 是针对于整个集群的,用于管理整个集群的元数据和状态,整个集群中只有一个 Broker 充当 Controller 的角色。
  • Coordinator 是针对 Consumer Group 的,每个 Consumer Group 都有一个 Coordinator。所在 Broker 启动时,都会创建和开启相应 Coordinator 组件,也就是说,所有 Broker 都有各自的 Coordinator 组件。Kafka 会为每个 Consumer Group 从 Broker 集群中选一个作为 Coordinator。
具体来讲,Consumer 在提交位移时,其实是向 Coordinator 所在 Broker 提交位移。同样地,当 Consumer 启动时,也是向 Coordinator 所在Broker发送请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据的管理操作。那么,Consumer 如何确认为它服务的 Coordinator 在那台 Broker 上呢?答案就在 __consumer_offsets 身上。
目前,Consumer Group 确定 Coordinator 所在 Broker 的算法有两步:
第一步:确定由位移主体的哪个分区来保存 Group 数据:
第二步:找出该分区 Leader 副本所在 Broker,该 Broker 即为 Coordinator。
在实际使用过程中,Consumer 应用程序,特别是 Java Consumer API,能够自动发现并连接正确的 Coordinator,我们不用操心这个问题。知晓这个算法的最大意义在于,能帮我们解决定位问题。当 Consumer Group 出现问题,需要快速排查 Broker 日志时,可以根据这个算法准确定位 Coordinator 对 应的 Broker,不必一台一台 Broker 地盲查。

5.2 Rebalance的弊端

Rebalance 的弊端:
  • 影响 Consumer 端TPS,因为一旦 Rebalance 开始,所有 Consumer 实例就会停止消费。
  • Rebalance 很慢,如果你的组成员很多,就一定会有这个痛点。
  • Rebalance 效率不高,即所有Group都要参与重新分配分区,通常不会考虑局部性原理,但局部性原理会大大提升系统性能,因为它能最大限度地减少 Rebalance 对剩余 Consumer 的冲击。
在真实的业务场景中,很多 Rebalance 都是计划外的,而客户端的 TPS 都是被这类原因拖慢的, 所以要尽量避免这类原因。
要避免 Rebalance,首先要知道 Rebalance 发生的时机,Rebalance 的触发条件有 3 个:
  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,或者是有 Consumer 实例崩溃被“踢出”组。
  1. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该Group订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  1. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
后面两个通常都是运维操作,一般难以避免,这里主要说说第 1 个时机该如何避免。一般碰到的 Rebalance 绝大部分都是第一个时机下发生的
Consumer实例增加的情况很好理解,当我们启动一个配置有相同 group.id 值的 Consumer 程序时,实际上就向这个 Group 添加一个实例。此时, Coordinator 会接纳这个新实例,将其加入组中,并重新分配分区。通常来说,增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。总之,它不属于我们要规避的那类“不必要Rebalance”。
我们更在意的是Group下实例数减少的情况,如果你就是要停掉某些 Consumer 实例,那自不必说。关键是在某些情况,Consumer 实例会被 Coordinator 错误地认为“已停止”,从而被“踢出” Group,从而导致 Rebalance。

5.3 如何避免Rebalance

那么,Coordinator 会在什么情况下认为实例已挂从而退其出组呢? 接下来了解一下 Consumer 提供了 3 个参数,它们都会影响 Rebalance 的发生:
  • session.timeout.ms,默认值10秒,即如果 Coordinator 没在 10 秒内收到 Group 下某 Consumer 实例的心跳,就会认为这个实例已挂,从而将其移出 Group,开启新一轮 Rebalance。可以说,这个参数决定了 Consumer 存活性的时间间隔。
  • heartbeat.interval.ms,这个值越小,Consumer 实例发送心跳请求的频率越高。但是频繁地发送心跳,会额外消耗带宽资源,好处是能更快知晓当前是否开启 Rebalance,因为 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的 响应体中。
  • max.poll.interval.ms,限定了两次 poll 调用的最大时间间隔,默认值5分钟。表示你的 Consumer 程序如果无法在 5min 内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 就会开启新一轮 Rebalance。
上面说的参数配置,对应使用过程中可能出现的两类“不必要的”Rebalance情况:
第一类,因为未及时发送心跳,导致 Consumer 被“踢出” Group 而引发的。所以你需要仔细设置前面两个参数,这里有最佳实践,即 session.timeout.ms=6sheartbeat.interval.ms=2s,要保证实例被判定“dead”之前,能够发送至少 3 轮心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms
第二类,Consumer 消费时间过长导致的 Rebalance。如某个用户,在他们的场景中,要将消息处理后的结果写入 MongoDB,这里 MongoDB 的一丁点不稳定,都会导致 Consumer 程序消费时长的增加。最好将这个参数设置成比你下游最大处理时长大一点。总之,你要为你的业务处理逻辑留下充足的时间。如果消费时间过慢,超过 max.poll.interval.ms设置的值(默认5分钟),未进行 poll 拉取消息,则会导致客户端主动离开队列,而引发 Rebalance。
这种情况可以通过 Kafka 运维平台的 Topic 流出流量排查原因,如果是每五分钟有一个流出的尖峰流量,表示消费端无法在 5 分钟内完成 poll 拉取的消息,那么就是两次 poll 拉取时间超过了五分钟,这种情况不仅会导致 Rebalance,还会导致重复消费的情况。
notion image
为了避免 Rebalance,可以进行下面的参数调整:
  • session.timeout.ms:Consumer 与 Coordinator 之间的会话超时时间可适当提高该参数值,需要大于消费一批数据的时间,但不要超过 30s,建议设置为 25s;而 0.10.2 及其之后的版本,保持默认值 10s 即可。
  • heartbeat.interval.ms :Consumer 向 Coordinator 发送心跳的时间间隔,保证实例被判定“dead”之前,能够发送至少 3 轮心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms,例如 session.timeout.ms 被设置为 10s,heartbeat.interval.ms=2s 设置为 3s。
  • max.poll.records:Consumer 一次从 Broker 中拉取的最大消息数,默认值为 500。如果消费过慢,可以适当降低该参数值。
  • max.poll.interval.ms: 两次 poll 调用的最大时间间隔,默认 5 分钟,如果消费过慢,,可以适当调大该参数值。
  • 尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
  • 减少 Group 订阅Topic的数量,一个 Group 订阅的 Topic 最好不要超过 5 个,建议一个 Group 只订阅一个 Topic。
如果按照上面的推荐数值恰当地设置了这几个参数,再去排查一下 Consumer 端的GC表现,比如是否出现频繁的 Full GC,从而引发 Rebalance,这也是一种常见现象。
Kafka系列:精确一次性语义Kafka系列:集群管理
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏