type
status
date
slug
summary
tags
category
icon
password
1、概述
所谓的消息交付可靠性保障,是指 Kafka 对Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:
- 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
- 至少一次(at least once):消息不会丢失,但有可能被重复发送。
- 精确一次(exactly once):消息不会丢失,也不会被重复发送。
目前,Kafka默认提供的交付可靠性保障是第二种,即至少一次。消息“已提交”的含义,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。
Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。
无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。
Kafka 分别通过幂等性(Idempotence)和事务(Transaction)这两种机制实现了 精确一次(exactly once)语义。
2、幂等性
幂等这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。幂等性最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。
2.1 Producer端开启幂等
在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可
enable.idempotence
被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。2.2 Kafka幂等实现原理
Kafka 为了实现幂等性,它在底层设计架构中引入了 ProducerID 和 SequenceNumber。
- ProducerID:Producer 初始化时都会被分配一个 ProducerID,对用户无感知,重启会发生变化
- SequenceNumber:Producer 为每个主题和分区分配一个从 0 开始单调递增的 SequenceNumber,在发送消息的时候为消息绑定这个 SequenceNumber。
在 ProducerID + Topic-Partition 级别上添加一个 SequenceNumber 信息,就可以实现分区级别消息的唯一性了。Broker 收到消息后会以 ProducerID 为单位存储 SequenceNumber。如果消息落盘会同时更新最大 SequenceNumber;如果新的消息带上的 SequenceNumber 不大于当前的最大 SequenceNumber,这个消息就会被 Broker 端拒绝掉。也就是说即使 Producer 重复发送了, Broker 端也会将其过滤掉。
Kafka 幂等消息的局限性:
- 只能保证单分区上的幂等性。即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。因为 SequenceNumber 是以 Topic + Partition 为单位单调递增的,如果一条消息被发送到了多个分区必然会分配到不同的 SequenceNumber ,导致重复问题。
- 只能实现单会话上的幂等性。不能实现跨会话的幂等性,当你重启 Producer 进程之后,这种幂等性保证就丧失了。因为重启 Producer 后会分配一个新的 ProducerID,相当于之前保存的 SequenceNumber 就丢失了。
3、事务
Kafka 的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的ACID,即原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 Read Committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息。再具体一点,事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
3.1 Producer端开启事务
Producer 开启事务只需要设置两个属性:
- 和幂等性 Producer 一样,开启
enable.idempotence = true
。
- 设置 Producer 端参数
transactional. id
。最好为其设置一个有意义的名字。
Procuder 开启事务后,发送消息的代码示例如下
上面这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。和普通 Producer 代码相比,事务型 Producer 的显著特点是调用了一些事务API,
initTransactions()
:初始化事务
beginTransaction()
:开启事务
sendOffsetsToTransaction()
:在事务内提交已经消费的偏移量(主要用于消费者)
commitTransaction()
:提交事务
abortTransaction()
:终止事务
3.2 Consumer端消费事务消息
Consumer 端消费事务消息的时候,只需要设置
isolation.level
参数的值即可。当前这个参数有两个取值:read_uncommitted
:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
read_committed
:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。
3.3 Kafka事务实现原理
1)启动生产者,分配协调器
我们在使用事务的时候,必须给生产者指定一个事务 ID,生产者启动时,Kafka 会根据事务 ID 来分配一个事务协调器(Transaction Coordinator) 。每个 Broker 都有一个事务协调器,负责分配 PID(Producer ID) 和管理事务。
事务协调器的分配涉及到一个特殊的主题 __transaction_state,该主题默认有50个分区,每个分区负责一部分事务;Kafka 根据
事务ID的hashcode值%50
计算出该事务属于哪个分区, 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。分配完事务协调器后,该事务协调器会给生产者分配一个 PID,接下来生产者就可以准备发送消息了。
2)发送消息
生产者分配到 PID 后,要先告诉事务协调器要把详细发往哪些分区,协调器会做一个记录,然后生产者就可以开始发送消息了,这些消息与普通的消息不同,它们带着一个字段标识自己是事务消息。
当生产者事务内的消息发送完毕,会向事务协调器发送 Commit 或 Abort 请求,此时生产者的工作已经做完了,它只需要等待 Kafka 的响应。
3)确认事务
当生产者开始发送消息时,协调器判定事务开始。它会将开始的信息持久化到主题
__transaction_state
中。当生产者发送完事务内的消息,或者遇到异常发送失败,协调器会收到 Commit 或 Abort 请求,接着事务协调器会跟所有主题通信,告诉它们事务是成功还是失败的。
如果是成功,主题会汇报自己已经收到消息,协调者收到所有主题的回应便确认了事务完成,并持久化这一结果。
如果是失败的,主题会把这个事务内的消息丢弃,并汇报给协调者,协调者收到所有结果后再持久化这一信息,事务结束;整个放弃事务的过程消费者是无感知的,它并不会收到这些数据。
- Author:mcbilla
- URL:http://mcbilla.com/article/53562ad1-5eb0-4e3f-a345-702f3f95fb52
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!