type
status
date
slug
summary
tags
category
icon
password

1、概述

符合开发规范的最佳实践,以及适当的性能调优,可以帮助我们发挥 Kafka 的最大优势。调优收益优先级:应用层调优 > JVM 调优 > 操作系统调优

2、应用层

2.1 Broker端

2.1.1 副本设置

这应该是 Beoker 端最重要的参数设置之一。生产环境的 Kafka 一般至少三节点集群部署。为了保证数据安全,同时考虑磁盘资源占用问题, 至少选择双副本。 但对于有更高可用要求的(比如高风险业务)可以选择三副本(这种情况下 Kafka 需要部署 5 个以上节点)。参数配置如下:

2.1.2 合理选择数据保存时间

Kafka 通过分区对 Topic 进行横向扩展,实现集群的负载均衡。更多的分区配合更多的消费者可以提高 Topic 的吞吐量。但分区并不是越多越好,因为分区越多,占用内存也会越多,完成首领选举的时间也会越长,所以不宜设置过大的分区数。
那么如何确定最后最优的分区数量呢?需要根据流量和消费能力合理设置 Topic 的分区数,有一个业界的计算公式,假如 Topic 的 Producer 和 Consumer 吞吐量的值分别是 Tp 和 Tc,单位是 MB/s;总的目标吞吐量是 Tt,那么
这样可以保证一个分区可承载 1-2MB/s的流量。如当前分区数不能承载业务流量,再进行适当分区扩容。
如果你不想这么麻烦,也有另外一个更加简单粗暴的方案。考虑到节点间负载均衡的问题,建议分区选择 Broker 节点个数的整数倍。理想情况下,分区数等于Broker 节点个数,可以达到最优性能。
另外每个分区都有自己的开销,建议控制整个集群的分区数量。每个 Broker 的 partition 数量限制在 2000 到 4000,每个 Kafka 集群中 partition 的数量限制在 10000 以内。

2.1.3 合理选择数据保存时间

不应将 Kafka 作为数据库使用,数据堆积可能会引起 Kafka 磁盘写满的问题,造成集群读写功能异常或集群不可用。这里涉及到两个参数
  • log.retention.{hours|minutes|ms} :控制一条消息数据被保存多长时间。从优先级上来说 ms > minutes > hours。通常情况下情况下使用 hours 级别多一点。默认是log.retention.hours=168 ,表示默认保存 7 天的数据。
  • log.retention.bytes :指定 Broker 为消息保存的总磁盘容量大小,默认是 -1,表明你想在这台 Broker 上保存多少数据都可以。如果生产消息的速度过快,Broker 存在磁盘被打爆的风险。
这两个参数是属于或关系,只要有一个满足条件,Kafka 就会开始删除最老的那个日志段文件。
建议用户根据项目需求合理配置数据保存时间,建议为 72 小时,最多不超过 168 小时。

2.1.4 日志文件

日志文件的存储路径和两个参数有关:
  • log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
  • log.dir:注意这是 dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。
通常情况下只要设置log.dirs,即第一个参数就好了,不要设置log.dir。而且更重要的是,在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个CSV格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:
  • 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
  • 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。

2.1.5 开启认证模式

为保证集群的安全,请选择开启 Kafka 集群的 SASL/PLAIN 认证。参考 https://mapan1984.github.io/kafka-security/

2.1.6 主题设置

Kafka 的 Topic 命名要求是 3-128 字符,只能包含下划线 _ 、短划线 - 、数字和大小写字母。建议使用下划线 _ 分割,比如 产品编号_项目编号_topic唯一标识
另外关闭自动创建 Topic 的功能。

2.1.7 消息大小设置

建议设置 max.request.size < message.max.bytes < fetch.max.bytes
其中 max.request.size 在 Producer 端设置。

2.1.8 线程数调优

2.1.9 关闭自动Rebalance

auto.leader.rebalance.enable 默认为 true,表示允许 Kafka 定期地对一些 Topic 分区进行Leader重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。在生产环境中,建议设置为 false,避免不必要的 Leader 更换。

2.2 Producer端

2.2.1 Key和Value

Kafka的消息字段只有两个:Key 和 Value。Key 是消息的标识,Value 是消息内容。为了便于追踪,重要消息最好都设置一个唯一的 Key,例如 时间戳+唯一业务ID。相同 Key 的消息只会路由同一个分区。这样可以通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。
如果消息发送量较大,建议不要设置 Key,并使用黏性分区策略。黏性分区策略详情,请参见黏性分区策略。

2.2.2 Acks

Acks的说明如下:
  • acks=0:无需服务端的Response、性能较高、丢数据风险较大。
  • acks=1:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。
  • acks=all:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。 一般建议选择acks=1,重要的服务可以设置acks=all。

2.2.3 在创建连接时使用网关域名或所有节点

在创建连接时直接配置Kafka网关域名或加入所有节点,防止在单个节点出现连接错误时,客户端误认为 Kafka 服务不可用。

2.2.4 合理使用Producer实例

  • Producer 实例是线程安全的,可以多个线程或者进程复用一个 Producer 实例,一般情况下不需要创建更多 Producer 实例。
  • 使用结束后关闭 Producer 或者使用连接池。Producer 会保持与 kafka server 端的 socket 连接,请确保使用结束后调用 close() 将其关闭以释放连接,并且避免出现一些不符合期望的行为。如果频繁的创建和关闭生产者,建议使用连接池的方式。

2.2.5 消息大小设置

2.2.6 Batch机制

Producer 在向服务端发送消息时,需要先确认往哪个 Topic 的哪个分区发送。我们给同一个分区发送多条消息时,Producer 将相关消息打包成一个 Batch,批量发送到服务端。一个合适的 Batch 大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。
Producer 端的 Batch 机制主要通过两个参数进行控制:
  • batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数),默认 16KB。达到设置的数值时,就会触发一次网络请求,然后 Producer 客户端把消息批量发往服务器。如果 batch.size 设置过小,有可能会影响稳定性问题。
  • linger.ms:条消息在缓存中的最长时间。若超过这个时间,Producer 客户端就会忽略 batch.size 的限制,立即把消息发往服务器。建议根据业务场景, 设置 linger.ms 在 100~1000 之间。
 
另外还有一个 buffer.memory 参数。所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.sizelinger.ms 的限制。
buffer.memory 的默认数值是32 MB,对于单个 Producer 而言,可以保证足够的性能。但是如果启动多个 Producer,那么每个 Producer 都有可能占用 32 MB 缓存空间,此时便有可能触发 OOM。
在生产时,一般没有必要启动多个 Producer;如有特殊情况需要,则需要考虑 buffer.memory 的大小,避免触发OOM。

2.2.7 消息压缩

在大流量的Kafka集群,推荐使用压缩方式发送消息,压缩格式:
  • 吞吐量方面 lz4 > snappy > gzip
  • 性能方面:lz4 > gzip > snappy
一般推荐 lz4 压缩算法。一般情况下,Broker 接收 Producer 的消息会原封不动的保存,消费拉取后在消费端解压。 所以压缩会极大的节省带宽和磁盘的占用。

2.2.8 黏性分区策略

只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是Producer端设置的分区策略。Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。
在消息没有指定Key的情况下,Kafka 2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,Kafka 2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。
黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。
如果您使用 Kafka Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。

2.3 Consumer端

2.3.1 避免Rebalance

第一种情况,避免因为未及时发送心跳,Consumer 被踢出 Group 导致 Rebalance。至少保证 session.timeout.ms >= 3 * heartbeat.interval.ms
第二种情况,避免两次 poll 操作间隔太长,Coordinator 认为这个 Consumer 处理能力太弱,会将其踢出 Group 导致 Rebalance。

2.3.2 消息位移提交

Consumer 位移提交有两个相关参数:
  • enable.auto.commit:是否采用自动提交位点机制。默认值为 true,表示默认采用自动提交机制。
  • auto.commit.interval.ms: 自动提交位点时间间隔。默认值为 1000,即 1s。
这两个参数组合的结果就是,每次 poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过 auto.commit.interval.ms 规定的时长,则客户端会启动位点提交动作。因此,如果将 enable.auto.commit 设置为 true,则需要在每次 poll 数据时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,请把 enable.auto.commit 设为 false,并自行控制位点提交。

2.3.3 消息大小设置

3、JVM层

1、设置堆内存大小。Kafka 并不需要过高的堆内存,一般堆内存的大小不需要超过 6G。在很多公司的实际环境中,这个大小已经被证明是非常合适的,你可以安心使用。
2、将 G1 作为 Kafka 的默认垃圾回收器。G1 垃圾回收器非常适合大内存应用,可以根据工作负载进行自我调节,并且可以控制停顿时间是恒定的。正常情况下,G1 只需要很少的配置就能完成这些工作。以下是 G1 的两个主要的性能调优参数。
  • MaxGCPauseMillis:该参数指定了每次垃圾回收的默认停顿时间。这个值不是固定的,G1 可以根据具体情况进行调整,默认是 200 毫秒。G1 可以决定垃圾回收的频率,以及每一轮需要回收多少个区域,这样算下来,每一轮垃圾回收大概需要 200 毫秒。
  • InitiatingHeapOccupancyPercent:该参数指定了在 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认是 45。也就是说,在堆内存使用率达到 45% 之前,G1 不会启动垃圾回收。这个百分比包括新生代和老年代的内存。
Kafka 使用堆内存以及处理垃圾对象的效率是比较高的,所以可以把这些参数设置得小一些。如果一台服务器有 64 GB内存,并使用 5 GB 堆内存来运行 Kafka,那么可以将 MaxGCPauseMillis 设置为 20 毫秒,将 InitiatingHeapOccupancyPercent 设置为 35,让垃圾回收比默认的早一些启动。
那么如何为 Kafka 进行设置呢?只需要设置下面这两个环境变量即可:
  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。
Kafka 要修改 JVM 参数可以直接修改启动脚本 bin/kafka-server-start.sh中的变量值。
也可以在启动 Broker 之前,先设置上这两个环境变量:

4、操作系统层

4.1 磁盘

  • 文件系统选择 EXT4 或 XFS,优先选择 XFS。XFS 是很多 Linux 发行版默认的文件系统,因为它只需要做少量调优就可以承担大部分的工作负载,表现要优于 Ext4。Ext4 也可以表现得很好,但需要做更多的调优,存在较大的风险,其中就包括设置更长的提交时间间隔(默认是5),以便降低刷新频率。Ext4 引入了块分配延迟,一旦系统崩溃,更容易造成数据丢失和文件系统毁坏。XFS 也使用了分配延迟算法,不过比Ext4要安全些。XFS为Kafka提供了更好的性能,除了文件系统提供的自动调优,不需要再做额外的调优。另外,XFS 的批量磁盘写入的效率更高,所有这些组合在一起,提高了整体的I/O吞吐量。
  • 禁用 atime 更新。最好在挂载(Mount)文件系统时禁掉 atime 更新。atime 的全称是access time,记录的是文件最后被访问的时间。记录 xatime 需要操作系统访问 inode 资源,而禁掉 atime 可以避免 inode 访问时间的写入操作,减少文件系统的写操作数。你可以执行 mount -o noatime命令进行设置。

4.2 文件系统

  • 调整文件句柄数。使用 ulimit -n 来查看单个进程能够打开的最大文件句柄数量(socket连接也算在里面),默认值是 1024。有时候程序会碰到 Too Many File Open 这类的错误,就是因为这个值设置的过小。建议在生产环境中适当调大此值,例如设置为 65536。
  • 调整 vm.max_map_countvm.max_map_count是一个与内核虚拟内存子系统相关的参数,用于控制进程可以拥有的内存映射区域的最大数量。它通常用于限制一个进程可以打开的文件数量,默认值是 65530。在一个主题数超多的Broker机器上,你会碰到 OutOfMemoryError:Map failed 的严重错误。建议在生产环境中适当调大此值,比如将其设置为 655360。具体设置方法是修改 /etc/sysctl.conf 文件,增加 vm.max_map_count=655360,保存之后,执行 sysctl -p 命令使它生效。

4.3 内存

  • 调整虚拟内存vm.swappiness。对大多数应用程序(特别是那些依赖吞吐量的应用程序)来说,要尽量避免内存交换。内存页和磁盘之间的数据交换对Kafka各方面的性能都有重大影响。Kafka大量使用了系统页面缓存,如果虚拟内存被交换到磁盘,则说明已经没有多余内存可以分配给页面缓存。但是虚拟内存在某些情况下确实是需要的,它可以防止操作系统由于内存不足而突然终止进程。建议将 vm.swappiness 设置成一个很小的值,比如设置为 1,以防止 Linux 的 OOM Killer 开启随意杀掉进程。可以修改 /etc/sysctl.conf 文件,增加 vm.swappiness=1,然后重启机器即可。
    • 为什么不把 vm.swappiness 设置为 0?
      在以前,人们建议把 vm.swappiness 设置为0,意思是“除非发生内存溢出,否则不要进行内存交换”。直到Linux内核3.5-rc1版本发布,这个值的含义才发生了变化。这个变化被移植到了其他发行版中,包括 Red Hat 企业版内核 2.6.32-303。于是,0 的意思就变成“在任何情况下都不要发生交换”。这也就是为什么现在建议把这个值设置为 1。
  • 调整 vm.overcommit_memory :建议将 vm.overcommit_memory 设置为 0。0 表示内核会根据应用程序来确定空闲内存的数量。如果这个参数被设置为 0 以外的值,则可能会导致操作系统夺走过多内存,从而减少 Kafka 的运行内存,这种情况在高摄取率的应用程序中很常见。
  • 给 Kafka 预留足够的内存作为 page cache:Kafka 本身不需要太多内存,除了 Kafka 本身所占用的内存之外,剩下的系统内存用于页面缓存。这对 Kafka 而言至关重要。在某种程度上,我们可以这样说:给 Kafka 预留的页缓存越大越好,最小值至少要容纳一个日志段的大小,也就是 Broker 端参数 log.segment.bytes 的值。该参数的默认值是 1GB。预留出一个日志段大小,至少能保证 Kafka 可以将整个日志段全部放入页缓存,这样,消费者程序在消费时能直接命中页缓存,从而避免昂贵的物理磁盘I/O操作。这就是为什么不建议把Kafka同其他重要的应用程序部署在一起,因为它们需要共享页面缓存,从而会降低 Kafka 的性能。

4.4 网络

  • 调整 socket 读写缓冲区的默认内存和最大内存。socket 读写缓冲区内存对应的参数分别是 net.core.wmem_defaultnet.core.rmem_default,合理的值是131 072(也就是128 KiB)。读写缓冲区最大内存对应的参数分别是 net.core.wmem_maxnet.core.rmem_max,合理的值是2 097 152(也就是2 MiB)。需要注意的是,最大值并不意味着每个 socket 一定要分配这么大的缓冲空间,只是在必要的情况下才会达到这个上限。
  • 配置 TCP socket 的读写缓冲区。对应的参数分别是 net.ipv4.tcp_wmemnet.ipv4.tcp_rmem。这些参数的值由 3 个整数组成,使用空格分隔,分别表示最小值、默认值和最大值。最大值不能大于 net.core.wmem_max 和 net.core.rmem_max 指定的大小。例如,“409665536 2048000”表示最小值是 4 KiB、默认值是 64 KiB、最大值是 2 MiB。根据Kafka服务器接收流量的实际情况,可能需要设置更大的最大值,以便为网络连接提供更大的缓冲空间。

4.5 CPU

与磁盘和内存相比,Kafka对计算处理能力的要求相对较低,一般情况下 CPU 不会是 Kafka 的性能瓶颈因素。
Kafka系列:面试常见问题Kafka系列:精确一次性语义
mcbilla
mcbilla
一个普通的干饭人🍚
Announcement
type
status
date
slug
summary
tags
category
icon
password
🎉欢迎来到飙戈的博客🎉
-- 感谢您的支持 ---
👏欢迎学习交流👏