type
status
date
slug
summary
tags
category
icon
password

1、消息丢失

我们先来看一条消息从发送到被消费的整个过程:
  1. Producer 端发送消息给 Kafka Broker 端。
  1. Kafka Broker 将消息进行同步并持久化数据。
  1. Consumer 端从 Kafka Broker 将消息拉取并进行消费。
上这三步中每一步都可能会出现丢失数据的情况, 那么 Kafka 到底在什么情况下才能保证消息不丢失呢?
一句话总结:Kafka 只对 「已提交」的消息做「最大限度的持久化保证不丢失」。
这句话包括两个核心要素:
  • 首先是 「已提交」的消息:当 Kafka 中 N 个 Broker 成功的收到一条消息并写入到日志文件后,它们会告诉 Producer 端这条消息已成功提交了,那么这时该消息在 Kafka 中就变成 "已提交消息" 了。这里的 N 个 Broker 主要取决于对 "已提交" 的定义, 这里可以选择只要一个 Broker 成功保存该消息就算已提交,也可以是所有 Broker 都成功保存该消息才算是已提交。
  • 其次是 「最大限度的持久化保证不丢失」,也就是说 Kafka 并不能保证在任何情况下都能做到数据不丢失。即 Kafka 不丢失数据是有前提条件的。假如这时你的消息保存在 N 个 Broker 上,那么前提条件就是这 N 个 Broker 中至少有1个是存活的,就可以保证你的消息不丢失。
我们来看一下针对以上三种情况,如果解决消息丢失的问题。

1.1 Producer端避免消息丢失

Producer 端为了提升发送效率,减少 IO 操作,并不是在调用 send() 方法之后立刻将消息发送出去,将消息缓存起来,在到达最大缓冲区大小,或者到达最大发送时间间隔后,再将消息合并成一个个 RecordBatch 批量发送出去。
notion image
所以 Producer 端消息丢失更多是因为消息根本就没有发送到 Kafka Broker 端。 导致 Producer 端消息没有发送成功有以下原因:
  • 网络原因:由于网络抖动导致数据根本就没发送到 Broker 端。
  • 数据原因:消息体太大超出 Broker 承受范围而导致 Broker 拒收消息。
解决方案如下:
  1. 使用带回调函数的方法 Producer.send(msg, callback) 发送消息。这样一旦发现发送失败, 就可以做针对性处理。不要使用 Producer.send(msg) 发送消息,该方法会立即返回,由于没有回调,可能因网络原因导致 Broker 并没有收到消息,此时就丢失了。
  1. 设置 acks=all。该参数设置为 all 表示有 N 个副本 Broker 全部收到消息,才认为是消息提交成功。N 值等于 Broker 端的参数 min.insync.replicas=N
  1. 设置重试次数 retries 大于 0。该参数表示 Producer 端发送消息的重试次数,在 Kafka 2.4 版本中默认设置为 Integer.MAX_VALUE ,说明会一直进行重试直到 Broker返回 Ack。另外:
    1. 如果需要保证发送消息的顺序性,配置如下: retries = Integer.MAX_VALUE, max.in.flight.requests.per.connection = 1 ,这样 Producer 端就会一直进行重试直到 Broker 端返回 Ack,同时只有一个连接向 Broker 发送数据保证了消息的顺序性。
    2. 重试时间 retry.backoff.ms 该参数表示消息发送超时后两次重试之间的间隔时间,默认值为 100ms。为了避免无效的频繁重试,推荐设置为 300ms。

1.2 Broker端避免消息丢失

Kafka Broker 集群接收到数据后会将数据进行持久化存储到磁盘,为了提高吞吐量和性能,采用的是「异步批量刷盘的策略」,也就是说按照一定的消息量和间隔时间进行刷盘。首先会将数据存储到「PageCache」 中,至于什么时候将 Cache 中的数据刷盘是由「操作系统」根据自己的策略决定或者调用 fsync 命令进行强制刷盘,如果此时 Broker 宕机 Crash 掉,且选举了一个落后 Leader Partition。很多的 Follower Partition 成为新的 Leader Partition,那么落后的消息数据就会丢失。
notion image
所以 Broker 端丢失数据的关键是因为使用了「异步批量刷盘」的策略。解决方案通过「多分区多副本」的方式来最大限度的保证数据不丢失。我们可以设置以下参数:
  1. unclean.leader.election.enable:该参数表示是否允许非 ISR 列表的 Follower 被选举为 Leader ,默认是 true。如果一个 Follower 不在 ISR 列表里面,通常是因为它的数据落后 Leader 太多,那么一旦它被选举为新的 Leader, 数据就会丢失,因此我们要将其设置为 false,防止此类情况发生。
  1. replication.factor:该参数表示分区副本的个数。建议设置 replication.factor >=3,这样如果 Leader 副本异常 Crash 掉,Follower 副本会被选举为新的 Leader 副本继续提供服务。
  1. min.insync.replicas:该参数表示 ISR 列表最少需要有多少个副本同步消息成功,该消息才算写入成功。建议设置 min.insync.replicas > 1,这样才可以提升消息持久性,保证数据不丢失。
我们 还需要确保一下 replication.factor > min.insync.replicas,如果相等,只要有一个副本异常Crash 掉,整个分区就无法正常工作了,因此推荐设置成:replication.factor = min.insync.replicas+1 ,最大限度保证系统可用性。

1.3 Consumer端避免消息丢失

Consumer 消费主要分为两个阶段:
  1. 获取元数据并从 Kafka Broker 集群拉取数据。
  1. 处理消息,并标记消息已经被消费,提交 Offset 记录。
既然 Consumer 拉取后消息最终是要提交 Offset, 那么这里就可能会丢数据
  1. 如果采用自动提交 Offset 的方式。在提交完 Offset 之后 Conumer 宕机了,在 Conumer 重启之后已经开始拉取下一条消息了,那么上一条消息就算丢了。
  1. 如果采用手动提交 Offset 的方式,这里有两种情况:
    1. 「先提交 Offset,后处理消息」,如果此时处理消息的时候异常宕机,由于 Offset 已经提交了,待 Consumer 重启后,会从之前已提交的 Offset 下一个位置重新开始消费, 之前未处理完成的消息不会被再次处理,对于该 Consumer 来说消息就丢失了。
    2. 「先处理消息,在进行提交 Offset」, 如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况,这里只能业务自己保证幂等性。
notion image
所以 Consumer 端丢失数据的关键是提交了 Offset,但是 Consumer 没有成功处理。解决方案如下:
  1. 采用手动提交位移的方式。设置 enable.auto.commit = false 。而且必须要先处理消息,处理成功之后再进行提交 Offset。
  1. 业务保证幂等性。消息重复消费的情况无法避免,业务必须自己进行幂等性控制。

2、消息重复

Kafka 对消息的默认保证是至少一次(at least once),所以 Kafka 是有可能会出现消息重复的。Broker 本身只存储消息,一般情况下并不会无缘无故产生重复的消息。所以 Kafka 的消息重复有可能发生在 Producer 端和 Consumer 端。

2.1 Producer端避免消息重复

发送消息如果配置了重试机制,比如由于网络波动,生产者未得到 Broker 收到了消息的响应,就会触发重试机制,3 秒后再次发送此消息。Broker 之前已经收到过这个消息,但生产者由于触发了重试机制,就导致了消息的重复发送。
解决办法是 Producer 支持精确一次性语义。参考这篇文章

2.2 Consumer端避免消息重复

消费者采用手动提交位移的方式消费消息,先处理消息,在进行提交 Offset。如果此时在提交之前发生异常宕机,由于没有提交成功 Offset, 待下次 Consumer 重启后还会从上次的 Offset 重新拉取消息,不会出现消息丢失的情况, 但是会出现重复消费的情况。
解决方案是 Consumer 本身进行幂等性控制。可以参考下面方案:
  1. 基于 Redis 的分布式锁:可以把订单 id 作为key,在消费消息时使用 Redis 对该 key 加锁,在 Offset 提交完成后,再在 Redis 中删除订单 id 的 key。
  1. 基于数据库的乐观锁:给数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1。

3、消息积压

3.1 实时/消费任务挂掉

问题原因:消费应用因为某种原因挂掉了,并且这个任务没有被监控程序监控发现通知。
解决方案:将任务纳入监控体系,当任务出现问题时,及时通知相关负责人处理。当然任务重启脚本也是要有的,还要求实时框架异常处理能力要强,避免数据不规范导致的不能重新拉起任务。拉起任务有两种处理方式:
  1. 任务重新启动后直接消费最新的消息,对于"滞后"的历史数据采用离线程序进行"补漏"。
  1. 任务启动从上次提交offset处开始消费处理

3.2 消费者消费能力不足

问题原因:下面三种情况都会导致 Broker 积压大量未消费消息:
  1. 发送方发送消息速度过快。
  1. Kafka分区数设置的不合理(太少)。分区数是 Kafka 并行度调优的最小单元,如果分区数设置的太少,会影响 Consumer 消费的吞吐量。
  1. 消费方处理消息过慢。
解决方案:
  1. 紧急处理:新增消费端程序,只负责转发消息,让其将收到的消息快速转发到其他 Topic(分区数是原来 Topic 的多倍),然后再启动多个消费者同时消费新主题的不同分区。
    1. notion image
  1. 在业务低峰期,增加 Partition 和 Consumer 数量,触发 Topic 的 Rebalance。

3.3 消息的Key不均匀导致分区间数据不均衡

问题原因:Producer 消息时可以为消息指定 Key,如果 Key 不均匀会出现 Kafka 分区间数据不均衡。
解决方案:修改 Key 的设置规则,例如给 Key 加随机后缀,解决数据倾斜的问题。

4、顺序消息

Kafka 只保证单分区内有序。基于这个前提,Kafka 在保证顺序消息有两种方案:
  • 全局顺序:全局顺序适合于性能要求不高,但是对消息顺序性要求非常高的场景,例如 binlog 日志传输,不能有任何的乱序。
    • 全局使用一个 Producer
    • Topic 只包含一个 Partition
    • 全局使用一个 Consumer(并严格到一个消费线程)
  • 局部顺序:大部分业务场景下,只需要保证消息局部有序即可。局部有序是指在某个业务功能场景下保证消息的发送和接收顺序是一致的。如:订单场景,要求订单的创建、付款、发货、收货、完成消息在同一订单下是有序发生的,即消费者在接收消息时需要保证在接收到订单发货前一定收到了订单创建和付款消息。局部有序的解决方案是:
    • 消息内容指定 Key,可以保证相同的 Key 会发到同一个 Partition,因为一个 Partition 只能被同一个 Consumer 消费,可以保证相同 Key 的消息被顺序消费。
另外,消息 Producer 在发送消息是按 Batch 批量发送的,虽然 A 和 B 消息在缓存里面是顺序的,但是由于存在未知的确认关系,有可能存在 A 发送失败,B 发送成功,A 需要重试的时候顺序关系就变成了 BA。为了解决这个问题,Producer 提供以下参数支持:
  • max.in.flight.requests.per.connection。这个参数的作用是在发送阻塞前对于每个连接,正在发送但是发送状态未知的最大消息数量。如果设置大于 1,那么就有可能存在有发送失败的情况下,因为重试发送导致的消息乱序问题。所以我们应该将其设置为 1,保证在后一条消息发送前,前一条的消息状态已经是可知的。

5、Kafka为什么这么快

这个是面试经典题,必须要掌握,原因总结如下:
  • 磁盘顺序读写:message 是不断追加到 Partition 本地磁盘文件末尾,这种基于磁盘的顺序读写性能很高。
  • 利用 Page Cache 技术:Kafka 利用了操作系统的 Page Cache,相当于直接操作系统自身的内存而不是 JVM 堆内存。这样对内存的使用效率更高,而且可以避免 GC 问题。
  • 零拷贝技术
    • Producer—>Broker:mmap。Producer 生产的数据存到 Broker,写入 Broker 上内核 mmap 技术映射的 Page Cache 区域,在内核空间就可以完成落盘,这样就不用再拷贝数据到用户空间,省去了内核空间到用户空间的拷贝开销。
    • Broker—>Consumer:sendfile。Broker 上的消息被 Consumer消费的时候,磁盘数据通过 DMA 拷贝到内核 Page Cache 后,使用 sendfile 直接通过 DMA 拷贝到 NIC Buffer,消除了 CPU 数据拷贝。
    • notion image
  • 批量发送:Kafka 发送消息不是单条发送的,而是使用批次批量发送,可以减少网络 IO 次数。
  • 批量压缩:Kafka支持批量压缩消息,减少网络传输的文件大小。批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。
Kafka 速度的秘诀在于,它把多条消息变成一个个批次,进行合理的批量压缩后再发送,减少网络 IO 损耗,写入数据的时候使用追加写的方式添加到 Partition 文件的末尾,并通过 mmap 提高响应速度;读取数据的时候配合 sendfile 直接暴力输出。
Logstash笔记Kafka系列:最佳实践
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏