type
status
date
slug
summary
tags
category
password

1、日志

日志是分区在 Broker 磁盘上的物理存储文件。它不是一个巨大的文件,而是一组顺序读写段文件(Segment) 的集合。

1.1 日志分段

每个日志段实际上由以下文件组成:
  1. .log 文件真正的数据文件。存储消息(Message)本身、偏移量(Offset)、时间戳、压缩编码等信息。文件以该日志段基础偏移量(Base Offset,即第一条消息的偏移量) 命名。例如 00000000000000000000.log
  1. .index 文件偏移量索引文件。用于根据偏移量快速定位消息在 .log 文件中的物理位置
  1. .timeindex 文件时间戳索引文件。用于根据时间戳快速定位消息的偏移量,是 kafka-topics.sh --time 等操作的基础。
为什么要分段?
  • 避免大文件:单个文件过大不利于维护和查找。
  • 便于清理:Kafka 的日志清理策略(如根据时间或大小删除旧数据)可以以段为单位进行,删除旧段文件,避免影响新段文件。。
  • 提高效率:大部分读取操作发生在最新的活动段,与旧段分开,提高了效率。
  • 加速查找:配合索引文件,可以实现消息的快速二分查找,而无需扫描整个大文件。

1.2 索引的工作原理

索引是 Kafka 实现低延迟查询的关键。它们都是稀疏索引(Sparse Index),并非每条消息都建索引,而是隔一段数据建一个索引项,以在查找速度和索引文件大小之间取得平衡。
偏移量索引 (.index)
  • 结构:每个条目包含两个字段:
    • 相对偏移量(4字节):存储的是相对于该索引文件基础偏移量(Base Offset)的差值。这样做可以只用 4 字节存储,而不是 8 字节,节省空间。
    • 物理位置(4字节):对应消息在 .log 文件中的起始字节位置。
  • 查找过程(例如查找 offset=252的消息)
      1. 首先找到它可能所在的日志段(比如基础偏移量为 250 的段)。
      1. 在 00000000000000000250.index 文件中进行二分查找,找到小于等于 252 的最大索引条目。假设找到的条目是 (相对偏移量=2, 位置=1400)
      1. 计算绝对偏移量:250 + 2 = 252
      1. 然后从 .log 文件的 1400 字节位置开始顺序扫描,直到找到 offset=252 的消息。
时间戳索引 (.timeindex)
  • 结构:每个条目包含两个字段:
    • 时间戳(8字节)
    • 相对偏移量(4字节)
  • 查找过程(例如查找大于等于时间戳 T 的第一条消息)
      1. 在 .timeindex 文件中二分查找,找到时间戳 >= T 的索引条目,得到其相对偏移量。
      1. 再通过偏移量索引找到该消息在 .log 文件中的具体位置。

1.3 日志追加

活动段(Active Segment)是当前正在被写入的那个段。Kafka 的日志是 “只追加” 的。这意味着:
  • 消息只能被顺序地追加到活动日志段(Active Segment)的末尾。
  • 绝不允许修改或删除中间的某条消息
为什么只追加?
  • 极致的磁盘顺序I/O:即使是机械硬盘,顺序读写的性能也远高于随机读写。这是 Kafka 高吞吐量的基石。
  • 并发简化:不需要复杂的锁机制来保证中间数据的并发修改安全,只需要在尾部追加时加锁,大大提高了并发能力。
  • 保证有序性:严格保证了消息在分区内的顺序。
删除不是真正的“删除”
当我们说“删除”旧数据时,并不是去 .log 文件里抹掉某些消息,而是直接删除整个旧的、不再需要的日志段文件。这是一种非常高效的操作。

1.4 日志清理策略

Kafka 的消息存储在磁盘中,消息不可能无限制存储,因此 Kafka 提供了两种日志消息清理策略:
  • 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段(LogSegment)。
  • 日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
在 Kafka 的 Broker 端或 Topic 配置参数开启日志清理功能:
  • log.cleaner.enable:设置为 true 开启日志清理功能,默认为 true。
  • log.cleanup.policy:日志清理策略,有以下选项
    • delete:日志删除,默认策略
    • compact :日志压缩。
    • delete,compact :同时支持日志删除和日志压缩两种策略。

1.4.1 日志删除策略

当 Broker 端或者 Topic 没有显式设置 log.cleanup.policy 参数,或者设置 log.cleanup.policy=delete 时,将会采用日志删除策略。Broker 端可以设置参数log.retention.check.interval.ms 用来配置检测日志删除操作的周期,默认值为 300000(5分钟)。删除日志分段时:
  1. 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
  1. 将日志分段文件添加上 .deleted 的后缀(也包括日志分段对应的索引文件)
  1. Kafka 的后台定时任务会定期删除这些 .deleted 为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数来设置,默认值为 60000,即 1 分钟。
Kafka 共有三种日志删除策略:
  • 基于时间的删除策略
  • 基于文件大小的删除策略
  • 基于日志文件起始偏移量的删除策略
基于时间的删除策略
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retenionMs) 来寻找可删除的日志分段文件集合。阈值 retenionMs 可以通过 Broker 端参数设置:
  • log.retention.hour
  • log.retention.minutes
  • log.retention.ms
如果同时设置了多个 retenionMs 相关的参数,优先级:log.retention.ms > log.retention.minutes > log.retention.hour
默认情况下,设置了 log.retention.hours=168 ,即默认日志的保留时间为 168 小时,相当于保留 7 天。
基于文件大小的删除策略
日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段文件集合。阈值 retentionSize 可以通过 Broker 端参数设置:
  • log.retention.bytes
  • log.segment.bytes
默认情况下,log.retention.bytes=-1,表示无穷大(表示的所有日志的总大小);log.segment.bytes=1073741824,即 1G。
基于日志文件起始偏移量的删除策略
每个 segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

1.4.2 日志压缩策略

日志压缩(Log Compaction)是默认的日志删除(Log Retention)之外的一种清理过时数据的方式。 Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
notion image
  • Log Compaction 执行后,offset 将不再连续,但依然可以查询 Segment
  • Log Compaction 执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction 会生成一个新的 Segment 文件
  • Log Compaction 是针对 key 的,在使用的时候注意每个消息的 key 不为空
  • 基于Log Compaction可以保留 key 的最新更新,可以基于 Log Compaction 来恢复消费者的最新状态

2、消息

2.1 消息格式

Kafka 的日志消息层次分为两层:消息集(message set)以及消息(message)。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。一个消息集合包含若干条日志项 (record item),而日志项才是真正封装消息的地方。
随着 Kafka 的迅猛发展,其日志消息格式也在不断升级改进中,Kafka 的日志格式总共经历了3 个大版本:V0,V1 和 V2 版本。
  • V0 格式:0.10.0 版本以前。
  • V1 格式:从 V0.10.0 开始到 V0.11.0 版本,比 V0 版本就多了一个 timestamp 字段。
  • V2 格式:从 0.11.0.0 版本开始,进行较多优化:
    • 将 CRC 值从消息中移除,被抽取到消息批次中。
    • 增加了 procuder id、producer epoch、序列号等信息主要是为了支持幂等性以及事务消息的。
    • 使用增量形式来保存时间戳和位移。
    • 消息批次最小为 61 字节,比 V0、V1 版本要大很多,但是在批量消息发送场景下,会提供发送效率,降低使用空间。1
以最新的 V2 格式为例,格式内容如下图所示:
notion image
首先看看RecordBatch的关键字段:
  • first offset:表示当前RecordBatch的起始偏移量。
  • length:计算从partition leader epoch到末尾的长度。
  • partition leader epoch:分区leader纪元,可以看做是分区leader的版本号或者更新次数。
  • magic:消息格式版本号,v2版本是2。
  • crc32:crc32校验值。
  • attributes:消息属性,这里占用2个字节。低三位表示压缩格式,第4位表示时间戳类型,第5位表示此RecordBatch是否在事务中,第6位表示是否为控制消息。
  • last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要用于broker确保RecordBatch中Recoord组装的正确性。
  • first timestamp:RecordBatch中第一条Record的时间戳。
  • max timestamp:RecordBatch中最大的时间戳。一般情况是最后一条Record的时间戳。
  • producer id:PID,用来支持事务和幂等。暂不解释。
  • producer epoch:用来支持事务和幂等。暂不解释。
  • first sequeue:用来支持事务和幂等。暂不解释。
  • records count:RecordBatch中record的个数。
  • records:消息记录集合
Record的关键字段:
  • length:消息总长度
  • attributes:弃用。这里仍然占用了1B大小,供未来扩展。
  • timestamp delta:时间戳增量。
  • offset delta:偏移量增量。保存与RecordBatch起始偏移量的差值。
  • key length:消息key长度。
  • key value:消息key的值。
  • value length:消息体的长度。
  • value:消息体的值。
  • headers:消息头。用来支持应用级别的扩展。
Header的关键字段:
  • header key length:消息头key的长度。
  • header key:消息头key的值。
  • header value length:消息头值的长度。
  • header value:消息头的值。

2.2 消息压缩

上面我们提到 Kafka 操作一般是基于消息集合的,所以消息的压缩和解压缩也是基于消息集合。
注意消息的压缩和日志文件的压缩是两个不同应用场景,前者是为了减少网络传输的带宽,而后者则是一种日志清理策略。
消息压缩和解压缩的过程用一句话总结:Producer 端压缩,Broker 端保持,Consumer 端解压缩。

2.2.1 何时压缩

Kafka中,压缩可能会发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type 参数即表示启动指定类型的压缩算法。如果配置为 uncompressed,则表示不压缩。
表明 Producer 的压缩算法使用的是 gzip。这样 Producer 启动后生产的每个消息集合都是经过 gzip 压缩过的,故而能很好地节省网络传输带以及 Broker 端的磁盘占用。
大部分情况下,Broker 从 Producer 端接收到消息后仅仅是原封不动地保存,而不会对其进行任何修改。但有两种例外的情况会让 Broker 端重新压缩消息。
  • 第一种情况,Broker 端指定了和 Producer 端不同的压缩算法。 Broker端也有一个参数叫 compression.type,和 Producer端的参数设置一样。这个参数的默认值是 producer,表示 Broker端会尊重 Producer端使用的压缩算法。但是一旦在 Broker端 设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩/解压缩操 作,导致 Broker端CPU使用率飙升。比 Broker 端接收到 gzip 压缩消息后,Broker 端指定了 snappy 压缩算法,这样 Broker 只能解压缩然后使用 snappy 重新压缩一遍。
  • 第二种情况,Broker 端发生了消息格式变化。 所谓的消息格式变化主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程就会涉及到消息的解压和重新压缩。一般情况下这种消息格式的转换对性能是有很大的影响的,除了这里讲的压缩外,还会让 Kafka 丧失了引以为豪的 Zero Copy 特性。所以尽量保证消息格式的统一,这样不仅可以避免不必要的解压缩/重新压缩,对提升其他方面的性能也很有裨益。

2.2.2 何时解压缩

通常来说解压缩发生在消费者程序中。Kafka 会将启用了哪种压缩算法封装进消息集合中(RecordBatchattributes 属性),这样当 Consumer 读取到集合时,它自然就知道了这些消息使用了哪种压缩算法。

2.2.3 各种压缩算法对比

评估一个压缩算法的优劣,主要有两个指标:压缩比、压缩/解压缩吞吐量。
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:gzip、snappy 和 lz4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比。
对于 Kafka 测试而言
  • 在吞吐方面:lz4 > snappy > zstd > gzip;
  • 在压缩比方面: zstd > lz4 > gzip > snappy。
物理资源占用:
  • 带宽:使用 snappy 算法占用的网络带宽资源最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
  • CPU:在 CPU 使用率方面,各个算法表现得差不 多,只是在压缩时 snappy 使用的 CPU 较多一些,而在解压缩时 gzip 算法则可能使用更多的CPU。
因此,正常情况下三种压缩算法的推荐排序为:lz4 > gzip > snappy。 经过长时间的现网运行试验,发现在大多数情况下上面的模型是没问题的。但是在某些极端情况下 lz4 压缩算法会导致 CPU 负载增大。 经分析是业务的源数据内容不一样,导致压缩算法的性能表现不一样。 故建议对 CPU 指标敏感的用户采用更为稳定的 snappy 压缩算法。

2.3 顺序消息

Kafka 只能保证单个分区内的消息是有序的,不保证全局消息的顺序性。对于需要严格控制消息顺序的场景,可以使用以下策略:
  • 单分区方案:可以保证发送到该 Topic 的所有消息的顺序性。但是完全丧失了 Kafka 的并行处理和水平扩展能力,吞吐量极低,不推荐使用。
    • 创建只有一个 Partition 的 Topic
    • 全局使用一个 Producer
    • 全局使用一个 Consumer(并严格到一个消费线程)
  • 使用消息 Key(推荐):生产者发送消息时,可以指定一个 Key。Kafka 通过 Key 的哈希值来决定消息应该被发送到哪个分区。所有具有相同 Key 的消息都会被发送到同一个分区
    • 在生产中,例如以订单 ID 作为消息的 Key。这样,所有相同订单 ID 的消息都会被路由到同一个分区。
  • 指定 partition:生产者在发送消息构建 ProducerRecord 时,可以直接传入一个 partition 参数,指定 partition 的值,将需要顺序处理的所有消息发送到同一个分区。
    • 在生产中,可以为每个用户指定 partition(例如用户 ID 对分区数取余,得到的值就是 partition),这样可以保证该用户的所有消息都发送到同一个分区。这样既可以为 Topic 设置多个分区提高吞吐量,也可以保证单个用户的消息顺序性。
另外,消息 Producer 在发送消息是按 Batch 批量发送的,虽然 A 和 B 消息在缓存里面是顺序的,但是由于存在未知的确认关系,有可能存在 A 发送失败,B 发送成功,A 需要重试的时候顺序关系就变成了 BA。为了解决这个问题,Producer 提供以下参数支持:
  • max.in.flight.requests.per.connection。这个参数的作用是在发送阻塞前对于每个连接,正在发送但是发送状态未知的最大消息数量。如果设置大于 1,那么就有可能存在有发送失败的情况下,因为重试发送导致的消息乱序问题。所以我们应该将其设置为 1,保证在后一条消息发送前,前一条的消息状态已经是可知的。
Kafka系列:集群管理(Zookeeper、Controller)Kafka系列:分区和副本机制(Partition、ISR、HW)
Loading...