type
status
date
slug
summary
tags
category
icon
password

1、概述

前面的文章我们提到 Kafka 的分区文件目录下保存着三类文件:
  • .log :真实的日志消息文件。
  • .index :偏移量索引文件。
  • .timeindex :时间戳索引文件。
两类索引文件均采用稀疏索引的方式,我们前面已经进行了简单介绍。这一篇我们深入了解一下日志消息文件 .log

2、日志格式

Kafka 的日志消息层次分为两层:消息集(message set)以及消息(message)。一个消息集合包含若干条日志项 (record item),而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
随着 Kafka 的迅猛发展,其日志消息格式也在不断升级改进中,Kafka 的日志格式总共经历了3 个大版本:V0,V1 和 V2 版本。
  • V0 格式:0.10.0 版本以前。
  • V1 格式:从 V0.10.0 开始到 V0.11.0 版本,比 V0 版本就多了一个 timestamp 字段。
  • V2 格式:从 0.11.0.0 版本开始,进行较多优化:
    • 将 CRC 值从消息中移除,被抽取到消息批次中。
    • 增加了 procuder id、producer epoch、序列号等信息主要是为了支持幂等性以及事务消息的。
    • 使用增量形式来保存时间戳和位移。
    • 消息批次最小为 61 字节,比 V0、V1 版本要大很多,但是在批量消息发送场景下,会提供发送效率,降低使用空间。
以最新的 V2 格式为例,格式内容如下图所示:
notion image
首先看看RecordBatch的关键字段:
  • first offset:表示当前RecordBatch的起始偏移量。
  • length:计算从partition leader epoch到末尾的长度。
  • partition leader epoch:分区leader纪元,可以看做是分区leader的版本号或者更新次数。
  • magic:消息格式版本号,v2版本是2。
  • crc32:crc32校验值。
  • attributes:消息属性,这里占用2个字节。低三位表示压缩格式,第4位表示时间戳类型,第5位表示此RecordBatch是否在事务中,第6位表示是否为控制消息。
  • last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要用于broker确保RecordBatch中Recoord组装的正确性。
  • first timestamp:RecordBatch中第一条Record的时间戳。
  • max timestamp:RecordBatch中最大的时间戳。一般情况是最后一条Record的时间戳。
  • producer id:PID,用来支持事务和幂等。暂不解释。
  • producer epoch:用来支持事务和幂等。暂不解释。
  • first sequeue:用来支持事务和幂等。暂不解释。
  • records count:RecordBatch中record的个数。
  • records:消息记录集合
Record的关键字段:
  • length:消息总长度
  • attributes:弃用。这里仍然占用了1B大小,供未来扩展。
  • timestamp delta:时间戳增量。
  • offset delta:偏移量增量。保存与RecordBatch起始偏移量的差值。
  • key length:消息key长度。
  • key value:消息key的值。
  • value length:消息体的长度。
  • value:消息体的值。
  • headers:消息头。用来支持应用级别的扩展。
Header的关键字段:
  • header key length:消息头key的长度。
  • header key:消息头key的值。
  • header value length:消息头值的长度。
  • header value:消息头的值。

3、消息压缩

上面我们提到 Kafka 操作一般是基于消息集合的,所以消息的压缩和解压缩也是基于消息集合。
注意消息的压缩和日志文件的压缩是两个不同应用场景,前者是为了减少网络传输的带宽,而后者则是一种日志清理策略。
消息压缩和解压缩的过程用一句话总结:Producer 端压缩,Broker 端保持,Consumer 端解压缩。

3.1 何时压缩?

Kafka中,压缩可能会发生在两个地方:生产者端和 Broker 端。
生产者程序中配置 compression.type 参数即表示启动指定类型的压缩算法。如果配置为 uncompressed,则表示不压缩。
表明 Producer 的压缩算法使用的是 gzip。这样 Producer 启动后生产的每个消息集合都是经过 gzip 压缩过的,故而能很好地节省网络传输带以及 Broker 端的磁盘占用。
大部分情况下,Broker 从 Producer 端接收到消息后仅仅是原封不动地保存,而不会对其进行任何修改。但有两种例外的情况会让 Broker 端重新压缩消息。
  • 第一种情况,Broker 端指定了和 Producer 端不同的压缩算法。 Broker端也有一个参数叫 compression.type,和 Producer端的参数设置一样。这个参数的默认值是 producer,表示 Broker端会尊重 Producer端使用的压缩算法。但是一旦在 Broker端 设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩/解压缩操 作,导致 Broker端CPU使用率飙升。比 Broker 端接收到 gzip 压缩消息后,Broker 端指定了 snappy 压缩算法,这样 Broker 只能解压缩然后使用 snappy 重新压缩一遍。
  • 第二种情况,Broker 端发生了消息格式变化。 所谓的消息格式变化主要是为了兼容老版本的消费者程序。在一个生产环境中,Kafka 集群中同时保存多种版本的消息格式非常常见。为了兼容老版本的格式,Broker 端会对新版本消息执行向老版本格式的转换。这个过程就会涉及到消息的解压和重新压缩。一般情况下这种消息格式的转换对性能是有很大的影响的,除了这里讲的压缩外,还会让 Kafka 丧失了引以为豪的 Zero Copy 特性。所以尽量保证消息格式的统一,这样不仅可以避免不必要的解压缩/重新压缩,对提升其他方面的性能也很有裨益。

3.2 何时解压缩?

通常来说解压缩发生在消费者程序中。Kafka 会将启用了哪种压缩算法封装进消息集合中(RecordBatchattributes 属性),这样当 Consumer 读取到集合时,它自然就知道了这些消息使用了哪种压缩算法。

3.3 各种压缩算法对比

评估一个压缩算法的优劣,主要有两个指标:压缩比、压缩/解压缩吞吐量。
在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:gzip、snappy 和 lz4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比。
对于 Kafka 测试而言
  • 在吞吐方面:lz4 > snappy > zstd > gzip;
  • 在压缩比方面: zstd > lz4 > gzip > snappy。
物理资源占用:
  • 带宽:使用 snappy 算法占用的网络带宽资源最多,zstd 最少,这是合理的,毕竟 zstd 就是要提供超高的压缩比;
  • CPU:在 CPU 使用率方面,各个算法表现得差不 多,只是在压缩时 snappy 使用的 CPU 较多一些,而在解压缩时 gzip 算法则可能使用更多的CPU。
因此,正常情况下三种压缩算法的推荐排序为:lz4 > gzip > snappy。 经过长时间的现网运行试验,发现在大多数情况下上面的模型是没问题的。但是在某些极端情况下 lz4 压缩算法会导致 CPU 负载增大。 经分析是业务的源数据内容不一样,导致压缩算法的性能表现不一样。 故建议对 CPU 指标敏感的用户采用更为稳定的 snappy 压缩算法。

4、日志清理策略

Kafka 的消息存储在磁盘中,消息不可能无限制存储,因此 Kafka 提供了两种日志清理策略:
  • 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段(LogSegment)。
  • 日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
在 Kafka 的 Broker 端或 Topic 配置参数开启日志清理功能:
  • log.cleaner.enable:设置为 true 开启日志清理功能,默认为 true。
  • log.cleanup.policy:日志清理策略,有以下选项
    • delete:日志删除,默认策略
    • compact :日志压缩。
    • delete,compact :同时支持日志删除和日志压缩两种策略。

4.1 日志删除策略

当 Broker 端或者 Topic 没有显式设置 log.cleanup.policy 参数,或者设置 log.cleanup.policy=delete 时,将会采用日志删除策略。Broker 端可以设置参数log.retention.check.interval.ms 用来配置检测日志删除操作的周期,默认值为 300000(5分钟)。删除日志分段时:
  1. 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
  1. 将日志分段文件添加上 .deleted 的后缀(也包括日志分段对应的索引文件)
  1. Kafka 的后台定时任务会定期删除这些 .deleted 为后缀的文件,这个任务的延迟执行时间可以通过 file.delete.delay.ms 参数来设置,默认值为 60000,即 1 分钟。
Kafka 共有三种日志删除策略:
  • 基于时间的删除策略
  • 基于文件大小的删除策略
  • 基于日志文件起始偏移量的删除策略

4.1.1 基于时间的删除策略

日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retenionMs) 来寻找可删除的日志分段文件集合。阈值 retenionMs 可以通过 Broker 端参数设置:
  • log.retention.hour
  • log.retention.minutes
  • log.retention.ms
如果同时设置了多个 retenionMs 相关的参数,优先级:log.retention.ms > log.retention.minutes > log.retention.hour
默认情况下,设置了 log.retention.hours=168 ,即默认日志的保留时间为 168 小时,相当于保留 7 天。

4.1.2 基于文件大小的删除策略

日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段文件集合。阈值 retentionSize 可以通过 Broker 端参数设置:
  • log.retention.bytes
  • log.segment.bytes
默认情况下,log.retention.bytes=-1,表示无穷大(表示的所有日志的总大小);log.segment.bytes=1073741824,即 1G。

4.1.3 基于日志文件起始偏移量的删除策略

每个 segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

4.2 日志压缩策略

日志压缩(Log Compaction)是默认的日志删除(Log Retention)之外的一种清理过时数据的方式。 Kafka 会定期将相同 key 的消息进行合并,只保留最新的 value 值。
notion image
  • Log Compaction 执行后,offset 将不再连续,但依然可以查询 Segment
  • Log Compaction 执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction 会生成一个新的 Segment 文件
  • Log Compaction 是针对 key 的,在使用的时候注意每个消息的 key 不为空
  • 基于Log Compaction可以保留 key 的最新更新,可以基于 Log Compaction 来恢复消费者的最新状态
Kafka系列:集群管理Kafka系列:副本机制
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏