type
status
date
slug
summary
tags
category
icon
password

1、概述

Kafka 中主题(Topic)是承载真实数据的逻辑容器。在主题之下还分为若干个分区,也就是说Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。如下图所示:
notion image
Kafka 要做这样的设计?为什么使用分区的概念而不是直接使用多个主题呢?
其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
不同的分布式系统对分区的叫法也不尽相同。比如在 Kafka 中叫分区,在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中则叫Region,在 Cassandra 中又被称作 vnode。从表面看起来它们实现原理可能不尽相同,但对底层分区(Partitioning)的整体思想却从未改变。
副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上。副本的作用是提高系统的可用性,在主分区所在的 Broker 宕机的情况下,仍然不会发生数据丢失,且通过副本快速选主来恢复系统可用性。

2、Partition

2.1 Partition的物理存储

Kafka 的物理数据的存储目录是在 server.properties 文件里面设置的,找到 log.dirs 配置项,如下所示:
这里的 /kafka/data 就是 Kafka 的物理数据目录,在该目录下可以看到以下文件:
每个分区在物理上对应一个文件夹,以 topicName_partitionIndex 的命名方式命名。以上目录包含以下分区:
  • __consumer_offsets:这是 Kafka 自动创建的 topic,用于保存 consumer 的消费 offset 信息,一共有 50 个分区。旧版 Kafka 是将这些信息保存 Zookeeper 里面的,但由于 Zookeeper 并不适合做频繁写入的操作,所以新版 Kafka 将其保存在内部 topic。
  • test_topic1-2.5d2d9e867e31489d9a4577dafa9d81b0-delete :用户自定义的主题 test_topic1 的第 2 个分区,但是带了 delete 后缀,说明对该主题执行了删除操作,状态已经被标记为 marked for deletion ,但还没有进行真正删除。
  • test_topic2-2 :用户自定义的主题 test_topic2 的第 2 个分区。
进入 test_topic2-2 目录,包含以下文件
一个 partition 由一个或者多个 segment 所构成的。每个 segment 中则保存了真实的消息数据。segment 达到一定大小之后会生成新的 segment,旧的 segment 到达过期时间后会被删除。segment 可以理解为一个逻辑概念,每个 segment 在物理上分为三个文件:
  • log(数据文件):后缀为 .log,存储的每一条数据就是 message。每个 message 都有一个连续的序列号叫做 offset ,用于 partition 唯一标识一条消息。
  • index(索引文件):后缀为 .index,实现相对 offset 与物理地址的映射,其中文件名为当前 segment 的 message 的最小 offset。index 每行数据分为 offset 和 position 两个部分:
    • 相对 offset:相对 offset 表示 该 message 相对于其所属数据文件中最小的 offset 的大小。相对 offset 表示消息相对于 baseOffSet 的偏移量,例如分段后的一个日志文件的 baseOffset 是32450,它的文件名就是 32450.log,那么 offset 为 32455 的消息在相对 offset 就是 32455-32450 = 5。 position:表示该条 message 在数据文件中的绝对物理位置。
  • time index(时间索引文件):后缀为“.timeindex”,自0.10.0.1开始新增的索引,实现相对 offset 和时间戳的映射。
notion image
其中 index 和 timeindex 这两个索引文件均采用了稀疏索引的方式,每隔一定字节的数据建立一条索引,这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
如下图所示,以 index 文件中的 6,1407 为例,在数据文件中表示第 6 个 message(在全局 partition 表示第 368775 个 message),以及该消息的物理偏移地址为 1407。
notion image
假如我么想要读取 offset=1066 的 message:
  1. 通过二分查找文件列表,快速定位到具体的 .index.log 文件。因为文件名是当前 segement 的 message 的最小 offset,是假如索引文件依次为 00000000000000000000.index、00000000000000001018.index、00000000000000002042.index 等,当 offset=1066 时定位到 00000000000000001018.index|log。
  1. 找到具体的文件后,先计算 offset 在index 中的相对 offset,查找最接近的小于等于相对 offset 的元数据对应物理位置 position, 即 .log 文件中的物理偏移地址。 然后在 log 文件中通过顺序查找到具体的 offset。offset = 1066 在 00000000000000001018.index 中的相对 offset 为 48,查找最接近 48 且小于等于 48 的 index 行假如为 (46, 1467),通过 position 1467 在 log 文件中定位到具体 message,然后通过顺序查找查找 offset 为 1066 的消息。
稀疏索引的缺点是没有建立索引的 Message 也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

2.2 Partition分区策略

2.2.1 生产者分区策略

生产者分区策略是决定生产者将消息发送到哪个分区的算法。生产端将消息发送给Broker之前,会将producer发送的数据封装成一个 ProducerRecord 对象。是否依赖分区器看 partition 字段有无指定。
notion image
  • 如果消息 ProducerRecord 指定了 partition 字段,那么就不需要分区器,直接将指明的值直接作为 partiton 值。
  • 如果消息 ProducerRecord 没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。
Kafka为我们提供了默认的分区器 org.apache.kafka.clients.producer.internals.DefaultPartitioner 。用户也可以实现 org.apache.kafka.clients.producer.Partitioner 接口自定义分区器。
然后在配置文件里面指定自定义分区类的全路径类名。
Kafka 生产者的默认分区策略是
  • 指明 partition 的情况下,直接将指明的值直接作为 partiton 值。
  • 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。
  • 既没有 partition 值又没有 key 值的情况下,使用第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的轮询(Round-Robin)策略,这样可以确保消息均匀分布在所有分区上。
  • 既没有 partition 值又指定了自定义的分区类,则按自定义分区器来得到 partition 值。
在这里补充一下,每条消息根据分区策略被 append 到对应 Partition 的最后,属于顺序写磁盘,因此效率非常高。
notion image

2.2.2 消费者分区策略

消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只能被组中的一个消费者实例消费。
  • 如果分区数大于或者等于组中的消费者实例数,一个消费者会负责多个分区
  • 如果分区数小于组中的消费者实例数,有些消费者将处于空闲状态并且无法接收消息
Kafka 中着三种消费分区分配策略,通过 partition.assignment.strategy 来设置。
  • RangeAssignor 范围分区策略,也是默认模式。
  • RoundRobinAssignor 分配策略,轮询分区模式。
  • StickyAssignor 策略,同时结合了 RangeAssignor 和 RoundRobinAssignor 的优点。
1、RangeAssignor 策略:默认分区侧裂,首先对同一个 Topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用 Partitions 分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。 假设 n=分区数/消费者数量m=分区数%消费者数量,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m)个消费者每个分配 n 个分区。
notion image
可以看出, C1消费者线程比其他消费者线程多消费了 2 个分区 如上,只是针对 1 个 topic 而言,C1 消费者多消费 1 个分区影响不是很大。如果有 N 多个 topic,那么针对每个 topic,消费者 C1 都将多消费 1 个分区,topic 越多,C1 消费的分区会比其他消费者明显多消费 N 个分区。这就是 Range 范围分区的一个很明显的弊端了。
2、RoundRobinAssignor 策略:即顺序分配,将消费组内所有消费者以及消费者所订阅的所有 topic 的 partition 按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者。RoundRobin 策略会尽可能保证每个消费者消费的 partion 数量一致。RoundRobin 的 两种情况:
  • 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobin 策略的分区分配会是均匀的。 举例,假设消费组中有 2 个消费者 C0 和 C1,都订阅了主题 T1 和 T2,并且每个主题都有3个分区,那么最终的分配结果为:
    • notion image
  • 如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 topic,那么在分配分区的时候此消费者将分配不到这个 topic 的任何分区。 举例,假设消费组内有 3 个消费者C0、C1 和 C2,它们共订阅了3个主题:T1、T2、T3,这 3 个主题分别有 1、2、3 个分区,即整个消费组订阅了 T1p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。具体而言,消费者 C0 订阅的是主题 T1,消费者 C1 订阅的是主题 T1 和 T2,消费者 C2 订阅的是主题 T1、T2 和 T3,那么最终的分配结果为:
    • notion image
3、StickyAssignor 策略:Kafka 从 0.11.x版本开始引入这种分配策略,它主要有两个目的:
  • 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个;
  • 分区的分配尽可能的与上次分配的保持相同。 当两者发生冲突时,第一个目标优先于第二个目标。
鉴于这两个目标,Sticky 策略的具体实现要比 Range 和 RoundRobin 这两种分配策略要复杂很多。Sticky 策略的实际分配效果如下:
notion image
这样初看上去似乎与采用 RoundRobin 策略所分配的结果相同,但是其优点是当发现分区重分配时,会尽可能的与上次分配的保持相同。 如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。Sticky 策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。
Kafka系列:副本机制Java并发系列(六):ThreadLocal
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏