type
status
date
slug
summary
tags
category
icon
password
1、概述
符合开发规范的最佳实践,以及适当的性能调优,可以帮助我们发挥 Kafka 的最大优势。调优收益优先级:应用层调优 > JVM 调优 > 操作系统调优。
2、应用层
2.1 Broker端
2.1.1 副本设置
这应该是 Beoker 端最重要的参数设置之一。生产环境的 Kafka 一般至少三节点集群部署。为了保证数据安全,同时考虑磁盘资源占用问题, 至少选择双副本。 但对于有更高可用要求的(比如高风险业务)可以选择三副本(这种情况下 Kafka 需要部署 5 个以上节点)。参数配置如下:
2.1.2 合理选择数据保存时间
Kafka 通过分区对 Topic 进行横向扩展,实现集群的负载均衡。更多的分区配合更多的消费者可以提高 Topic 的吞吐量。但分区并不是越多越好,因为分区越多,占用内存也会越多,完成首领选举的时间也会越长,所以不宜设置过大的分区数。
那么如何确定最后最优的分区数量呢?需要根据流量和消费能力合理设置 Topic 的分区数,有一个业界的计算公式,假如 Topic 的 Producer 和 Consumer 吞吐量的值分别是 Tp 和 Tc,单位是 MB/s;总的目标吞吐量是 Tt,那么
这样可以保证一个分区可承载 1-2MB/s的流量。如当前分区数不能承载业务流量,再进行适当分区扩容。
如果你不想这么麻烦,也有另外一个更加简单粗暴的方案。考虑到节点间负载均衡的问题,建议分区选择 Broker 节点个数的整数倍。理想情况下,分区数等于Broker 节点个数,可以达到最优性能。
另外每个分区都有自己的开销,建议控制整个集群的分区数量。每个 Broker 的 partition 数量限制在 2000 到 4000,每个 Kafka 集群中 partition 的数量限制在 10000 以内。
2.1.3 合理选择数据保存时间
不应将 Kafka 作为数据库使用,数据堆积可能会引起 Kafka 磁盘写满的问题,造成集群读写功能异常或集群不可用。这里涉及到两个参数
log.retention.{hours|minutes|ms}
:控制一条消息数据被保存多长时间。从优先级上来说 ms > minutes > hours。通常情况下情况下使用 hours 级别多一点。默认是log.retention.hours=168
,表示默认保存 7 天的数据。
log.retention.bytes
:指定 Broker 为消息保存的总磁盘容量大小,默认是 -1,表明你想在这台 Broker 上保存多少数据都可以。如果生产消息的速度过快,Broker 存在磁盘被打爆的风险。
这两个参数是属于或关系,只要有一个满足条件,Kafka 就会开始删除最老的那个日志段文件。
建议用户根据项目需求合理配置数据保存时间,建议为 72 小时,最多不超过 168 小时。
2.1.4 日志文件
日志文件的存储路径和两个参数有关:
log.dirs
:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?这说明它必须由你亲自指定。
log.dir
:注意这是 dir,结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。
通常情况下只要设置
log.dirs
,即第一个参数就好了,不要设置log.dir
。而且更重要的是,在线上生产环境中一定要为log.dirs
配置多个路径,具体格式是一个CSV格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3
这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。这样做有两个好处:- 提升读写性能:比起单块磁盘,多块物理磁盘同时读写数据有更高的吞吐量。
- 能够实现故障转移:即 Failover。这是 Kafka 1.1 版本新引入的强大功能。要知道在以前,只要 Kafka Broker 使用的任何一块磁盘挂掉了,整个Broker 进程都会关闭。但是自 1.1 开始,这种情况被修正了,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且 Broker 还能正常工作。
2.1.5 开启认证模式
为保证集群的安全,请选择开启 Kafka 集群的 SASL/PLAIN 认证。参考 https://mapan1984.github.io/kafka-security/
2.1.6 主题设置
Kafka 的 Topic 命名要求是 3-128 字符,只能包含下划线
_
、短划线 -
、数字和大小写字母。建议使用下划线 _
分割,比如 产品编号_项目编号_topic唯一标识
。另外关闭自动创建 Topic 的功能。
2.1.7 消息大小设置
建议设置
max.request.size
< message.max.bytes
< fetch.max.bytes
其中
max.request.size
在 Producer 端设置。2.1.8 线程数调优
2.1.9 关闭自动Rebalance
auto.leader.rebalance.enable
默认为 true,表示允许 Kafka 定期地对一些 Topic 分区进行Leader重选举,当然这个重选举不是无脑进行的,它要满足一定的条件才会发生。在生产环境中,建议设置为 false,避免不必要的 Leader 更换。2.2 Producer端
2.2.1 Key和Value
Kafka的消息字段只有两个:Key 和 Value。Key 是消息的标识,Value 是消息内容。为了便于追踪,重要消息最好都设置一个唯一的 Key,例如
时间戳+唯一业务ID
。相同 Key 的消息只会路由同一个分区。这样可以通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。如果消息发送量较大,建议不要设置 Key,并使用黏性分区策略。黏性分区策略详情,请参见黏性分区策略。
2.2.2 Acks
Acks的说明如下:
acks=0
:无需服务端的Response、性能较高、丢数据风险较大。
acks=1
:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。
acks=all
:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。 一般建议选择acks=1,重要的服务可以设置acks=all。
2.2.3 在创建连接时使用网关域名或所有节点
在创建连接时直接配置Kafka网关域名或加入所有节点,防止在单个节点出现连接错误时,客户端误认为 Kafka 服务不可用。
2.2.4 合理使用Producer实例
- Producer 实例是线程安全的,可以多个线程或者进程复用一个 Producer 实例,一般情况下不需要创建更多 Producer 实例。
- 使用结束后关闭 Producer 或者使用连接池。Producer 会保持与 kafka server 端的 socket 连接,请确保使用结束后调用
close()
将其关闭以释放连接,并且避免出现一些不符合期望的行为。如果频繁的创建和关闭生产者,建议使用连接池的方式。
2.2.5 消息大小设置
2.2.6 Batch机制
Producer 在向服务端发送消息时,需要先确认往哪个 Topic 的哪个分区发送。我们给同一个分区发送多条消息时,Producer 将相关消息打包成一个 Batch,批量发送到服务端。一个合适的 Batch 大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。
Producer 端的 Batch 机制主要通过两个参数进行控制:
batch.size
: 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数),默认 16KB。达到设置的数值时,就会触发一次网络请求,然后 Producer 客户端把消息批量发往服务器。如果batch.size
设置过小,有可能会影响稳定性问题。
linger.ms
:条消息在缓存中的最长时间。若超过这个时间,Producer 客户端就会忽略batch.size
的限制,立即把消息发往服务器。建议根据业务场景, 设置linger.ms
在 100~1000 之间。
buffer.memory
的默认数值是32 MB,对于单个 Producer 而言,可以保证足够的性能。但是如果启动多个 Producer,那么每个 Producer 都有可能占用 32 MB 缓存空间,此时便有可能触发 OOM。 在生产时,一般没有必要启动多个 Producer;如有特殊情况需要,则需要考虑
buffer.memory
的大小,避免触发OOM。2.2.7 消息压缩
在大流量的Kafka集群,推荐使用压缩方式发送消息,压缩格式:
- 吞吐量方面 lz4 > snappy > gzip
- 性能方面:lz4 > gzip > snappy
一般推荐 lz4 压缩算法。一般情况下,Broker 接收 Producer 的消息会原封不动的保存,消费拉取后在消费端解压。 所以压缩会极大的节省带宽和磁盘的占用。
2.2.8 黏性分区策略
只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是Producer端设置的分区策略。Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。
在消息没有指定Key的情况下,Kafka 2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,Kafka 2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。
黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。
如果您使用 Kafka Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。
2.3 Consumer端
2.3.1 避免Rebalance
第一种情况,避免因为未及时发送心跳,Consumer 被踢出 Group 导致 Rebalance。至少保证
session.timeout.ms >= 3 * heartbeat.interval.ms
。第二种情况,避免两次 poll 操作间隔太长,Coordinator 认为这个 Consumer 处理能力太弱,会将其踢出 Group 导致 Rebalance。
2.3.2 消息位移提交
Consumer 位移提交有两个相关参数:
enable.auto.commit
:是否采用自动提交位点机制。默认值为 true,表示默认采用自动提交机制。
auto.commit.interval.ms
: 自动提交位点时间间隔。默认值为 1000,即 1s。
这两个参数组合的结果就是,每次 poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过
auto.commit.interval.ms
规定的时长,则客户端会启动位点提交动作。因此,如果将 enable.auto.commit
设置为 true,则需要在每次 poll 数据时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃。如果想自己控制位点提交,请把
enable.auto.commit
设为 false,并自行控制位点提交。2.3.3 消息大小设置
3、JVM层
1、设置堆内存大小。Kafka 并不需要过高的堆内存,一般堆内存的大小不需要超过 6G。在很多公司的实际环境中,这个大小已经被证明是非常合适的,你可以安心使用。
2、将 G1 作为 Kafka 的默认垃圾回收器。G1 垃圾回收器非常适合大内存应用,可以根据工作负载进行自我调节,并且可以控制停顿时间是恒定的。正常情况下,G1 只需要很少的配置就能完成这些工作。以下是 G1 的两个主要的性能调优参数。
MaxGCPauseMillis
:该参数指定了每次垃圾回收的默认停顿时间。这个值不是固定的,G1 可以根据具体情况进行调整,默认是 200 毫秒。G1 可以决定垃圾回收的频率,以及每一轮需要回收多少个区域,这样算下来,每一轮垃圾回收大概需要 200 毫秒。
InitiatingHeapOccupancyPercent
:该参数指定了在 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认是 45。也就是说,在堆内存使用率达到 45% 之前,G1 不会启动垃圾回收。这个百分比包括新生代和老年代的内存。
Kafka 使用堆内存以及处理垃圾对象的效率是比较高的,所以可以把这些参数设置得小一些。如果一台服务器有 64 GB内存,并使用 5 GB 堆内存来运行 Kafka,那么可以将
MaxGCPauseMillis
设置为 20 毫秒,将 InitiatingHeapOccupancyPercent
设置为 35,让垃圾回收比默认的早一些启动。那么如何为 Kafka 进行设置呢?只需要设置下面这两个环境变量即可:
KAFKA_HEAP_OPTS
:指定堆大小。
KAFKA_JVM_PERFORMANCE_OPTS
:指定 GC 参数。
Kafka 要修改 JVM 参数可以直接修改启动脚本
bin/kafka-server-start.sh
中的变量值。也可以在启动 Broker 之前,先设置上这两个环境变量:
4、操作系统层
4.1 磁盘
- 文件系统选择 EXT4 或 XFS,优先选择 XFS。XFS 是很多 Linux 发行版默认的文件系统,因为它只需要做少量调优就可以承担大部分的工作负载,表现要优于 Ext4。Ext4 也可以表现得很好,但需要做更多的调优,存在较大的风险,其中就包括设置更长的提交时间间隔(默认是5),以便降低刷新频率。Ext4 引入了块分配延迟,一旦系统崩溃,更容易造成数据丢失和文件系统毁坏。XFS 也使用了分配延迟算法,不过比Ext4要安全些。XFS为Kafka提供了更好的性能,除了文件系统提供的自动调优,不需要再做额外的调优。另外,XFS 的批量磁盘写入的效率更高,所有这些组合在一起,提高了整体的I/O吞吐量。
- 禁用 atime 更新。最好在挂载(Mount)文件系统时禁掉 atime 更新。atime 的全称是access time,记录的是文件最后被访问的时间。记录 xatime 需要操作系统访问 inode 资源,而禁掉 atime 可以避免 inode 访问时间的写入操作,减少文件系统的写操作数。你可以执行
mount -o noatime
命令进行设置。
4.2 文件系统
- 调整文件句柄数。使用
ulimit -n
来查看单个进程能够打开的最大文件句柄数量(socket连接也算在里面),默认值是 1024。有时候程序会碰到 Too Many File Open 这类的错误,就是因为这个值设置的过小。建议在生产环境中适当调大此值,例如设置为 65536。
- 调整
vm.max_map_count
。vm.max_map_count
是一个与内核虚拟内存子系统相关的参数,用于控制进程可以拥有的内存映射区域的最大数量。它通常用于限制一个进程可以打开的文件数量,默认值是 65530。在一个主题数超多的Broker机器上,你会碰到 OutOfMemoryError:Map failed 的严重错误。建议在生产环境中适当调大此值,比如将其设置为 655360。具体设置方法是修改/etc/sysctl.conf
文件,增加vm.max_map_count=655360
,保存之后,执行sysctl -p
命令使它生效。
4.3 内存
- 调整虚拟内存
vm.swappiness
。对大多数应用程序(特别是那些依赖吞吐量的应用程序)来说,要尽量避免内存交换。内存页和磁盘之间的数据交换对Kafka各方面的性能都有重大影响。Kafka大量使用了系统页面缓存,如果虚拟内存被交换到磁盘,则说明已经没有多余内存可以分配给页面缓存。但是虚拟内存在某些情况下确实是需要的,它可以防止操作系统由于内存不足而突然终止进程。建议将vm.swappiness
设置成一个很小的值,比如设置为 1,以防止 Linux 的 OOM Killer 开启随意杀掉进程。可以修改/etc/sysctl.conf
文件,增加vm.swappiness=1
,然后重启机器即可。
为什么不把vm.swappiness
设置为 0?在以前,人们建议把vm.swappiness
设置为0,意思是“除非发生内存溢出,否则不要进行内存交换”。直到Linux内核3.5-rc1版本发布,这个值的含义才发生了变化。这个变化被移植到了其他发行版中,包括 Red Hat 企业版内核 2.6.32-303。于是,0 的意思就变成“在任何情况下都不要发生交换”。这也就是为什么现在建议把这个值设置为 1。
- 调整
vm.overcommit_memory
:建议将vm.overcommit_memory
设置为 0。0 表示内核会根据应用程序来确定空闲内存的数量。如果这个参数被设置为 0 以外的值,则可能会导致操作系统夺走过多内存,从而减少 Kafka 的运行内存,这种情况在高摄取率的应用程序中很常见。
- 给 Kafka 预留足够的内存作为 page cache:Kafka 本身不需要太多内存,除了 Kafka 本身所占用的内存之外,剩下的系统内存用于页面缓存。这对 Kafka 而言至关重要。在某种程度上,我们可以这样说:给 Kafka 预留的页缓存越大越好,最小值至少要容纳一个日志段的大小,也就是 Broker 端参数
log.segment.bytes
的值。该参数的默认值是 1GB。预留出一个日志段大小,至少能保证 Kafka 可以将整个日志段全部放入页缓存,这样,消费者程序在消费时能直接命中页缓存,从而避免昂贵的物理磁盘I/O操作。这就是为什么不建议把Kafka同其他重要的应用程序部署在一起,因为它们需要共享页面缓存,从而会降低 Kafka 的性能。
4.4 网络
- 调整 socket 读写缓冲区的默认内存和最大内存。socket 读写缓冲区内存对应的参数分别是
net.core.wmem_default
和net.core.rmem_default
,合理的值是131 072(也就是128 KiB)。读写缓冲区最大内存对应的参数分别是net.core.wmem_max
和net.core.rmem_max
,合理的值是2 097 152(也就是2 MiB)。需要注意的是,最大值并不意味着每个 socket 一定要分配这么大的缓冲空间,只是在必要的情况下才会达到这个上限。
- 配置 TCP socket 的读写缓冲区。对应的参数分别是
net.ipv4.tcp_wmem
和net.ipv4.tcp_rmem
。这些参数的值由 3 个整数组成,使用空格分隔,分别表示最小值、默认值和最大值。最大值不能大于 net.core.wmem_max 和 net.core.rmem_max 指定的大小。例如,“409665536 2048000”表示最小值是 4 KiB、默认值是 64 KiB、最大值是 2 MiB。根据Kafka服务器接收流量的实际情况,可能需要设置更大的最大值,以便为网络连接提供更大的缓冲空间。
4.5 CPU
与磁盘和内存相比,Kafka对计算处理能力的要求相对较低,一般情况下 CPU 不会是 Kafka 的性能瓶颈因素。
- Author:mcbilla
- URL:http://mcbilla.com/article/19e01ef7-80b3-4a3d-920c-cc183a2e2101
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!