type
status
date
slug
summary
tags
category
password

1、消息重复问题

RocketMQ 在设计上是至少一次(At least Once)语义,意思是每个消息必须投递一次,在以下情形可能会发生消息重复的问题:
  • Producer 端发送重复:消息成功发送到 Broker 并完成持久化,但网络闪断或生产者宕机导致 Broker 的确认应答失败。生产者未收到响应,认为消息发送失败并进行重试,导致 Broker 接收多条内容相同但 Message ID 可能不同的消息。
  • Consumer 端消费重复:这是更常见的情况。消费者成功处理消息后,在向 Broker 返回消费成功状态(ACK)时网络闪断。Broker 未收到 ACK,为保证消息至少被消费一次,会在之后(如网络恢复后)再次投递该消息。
  • 系统触发 Rebalance:当 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者也可能会收到少量重复消息。
因为 RocketMQ 的生产者端没有提供类似于 Kafka 那样的幂等性功能,所以 RocketMQ 解决消息重复的关键在于:确保消费端消费的幂等性。幂等性是指无论同一条消息被消费多少次,其对业务系统产生的结果都与消费一次相同。
为了实现上述流程,有几种经过验证的实践方案。
1、基于数据库唯一约束
这是最直接、可靠的方法之一,特别适用于像创建订单这样的场景。
  • 实现方式:为消息表设置唯一约束,通常使用业务的唯一标识(如订单号)作为唯一索引或主键。
  • 消费流程
      1. 开启数据库事务。
      1. 尝试将消息的业务唯一标识插入消息表。
      1. 如果插入成功,执行业务逻辑(如更新订单状态)。
      1. 提交事务。
  • 效果:如果消息重复,第二次插入会因唯一约束冲突而失败,事务回滚,从而避免重复消费。
  • 注意:此方法依赖于关系型数据库的事务特性,要求业务操作本身在同一个数据库事务中。
2、利用 Redis 的原子性
Redis 的 SETNX 命令或者 SET 命令(带 NX 选项)是实现分布式锁和幂等控制的利器,性能极高。
  • 实现方式:为每条消息创建一个唯一的 Key,通常是“业务前缀:唯一标识”(如 OrderPay:ORDER_100)。
  • 消费流程
      1. 消费前,尝试用 SETNX 命令将消息的唯一 Key 写入 Redis。
      1. 如果命令返回成功,表示当前是第一次消费,正常执行业务逻辑。
      1. 如果命令返回失败,表示 Key 已存在,消息已被消费过,直接跳过。
  • 优势:性能损耗低,并且可以利用 Redis 的 TTL(生存时间)为键值对设置自动过期,避免长期积累占用过多内存。
  • 注意:需要考虑 Redis 本身的高可用性,如果 Redis 出现故障,需要有补偿或降级策略。
3、使用乐观锁更新状态
如果业务逻辑的核心是更新某个资源的状态(如将订单从未支付更新为已支付),乐观锁是很好的选择。
  • 实现方式:在业务数据表中增加一个版本号(version)字段或状态条件。
  • 消费流程
      1. 查询当前数据的版本号。
      1. 执行业务逻辑。
      1. 更新数据时,带上版本号作为条件:update table set status = 'SUCCESS', version = version + 1 where id = #{id} and version = #{oldVersion}
  • 效果:如果消息重复消费,第一次消费已经更新了版本号,第二次的更新操作会因为条件不匹配而影响行数为0,从而判断为重复操作。
注意事项:
  1. 不要依赖 Message ID:RocketMQ 为每条消息生成的 Message ID 在消息重发时可能变化。务必使用消息中携带的业务唯一标识(即 Message Key)作为幂等判断的依据。生产者在发送消息时应设置 Key:message.setKey("ORDERID_100")
  1. 复杂业务的处理:对于流程长、涉及多个系统或 RPC 调用的复杂业务,可以考虑将业务拆解成多个原子性的小步骤,每个步骤通过消息驱动,并分别保证其幂等性。
  1. 做好监控和告警:即使有幂等控制,也建议对消息的重试次数进行监控。如果发现某条消息不断重试失败,可能需要人工介入检查业务逻辑是否存在问题。

2、消息丢失问题

消息的完整生命周期分为三个主要阶段:生产阶段存储阶段消费阶段。消息丢失可能发生在任何一个阶段。

2.1 生产阶段消息丢失

这个阶段指的是 Producer 将消息发送到 Broker 的过程中出现丢失。
产生原因:
  • 网络故障:Producer 和 Broker 之间网络中断,消息根本发不过去。
  • Broker 异常:Broker 宕机,无法接收消息。
  • 发送方式选择不当:使用了单向发送(sendOneWay),不关心发送结果。
解决方案:
  1. 使用同步发送(Sync Send)并检查发送结果
      • 这是最基本也是最重要的措施。同步发送会等待 Broker 返回确认(ACK)。
      • 代码中一定要检查 SendResult。如果发送失败(如返回 SEND_FLUSH_DISK_TIMEOUTSEND_FAILED 等状态),则进行重试。
  1. 合理配置重试机制
      • RocketMQ 的 Producer 自带重试逻辑(默认重试 2 次)。对于非常重要的消息,可以增加重试次数。
      • 重试的前提是,你的发送逻辑是幂等的,即重复发送相同的消息不会导致业务问题。

2.2 存储阶段消息丢失

这个阶段指的是消息已经成功到达 Broker,但在被写入磁盘持久化之前丢失。
产生原因:
  • 刷盘策略:Broker 的默认刷盘方式是异步刷盘。消息先写入 Page Cache(操作系统内存)就返回成功,然后由后台线程定期刷盘。如果此时 Broker 突然断电,内存中未刷盘的消息就会全部丢失。
解决方案:
  1. 将刷盘方式改为同步刷盘(SYNC_FLUSH)
      • 在 broker.conf 中配置:flushDiskType = SYNC_FLUSH
      • 同步刷盘模式下,Broker 会等待消息真正写入磁盘后,才向 Producer 返回 ACK。这保证了只要 Producer 收到成功响应,消息就一定在磁盘上。
      • 但同步刷盘会极大影响 Broker 的写入性能(TPS),因为每次写入都要进行磁盘 I/O。这是用性能换可靠性的典型场景。
  1. 主从复制(Replication)模式
      • 即使消息刷盘了,如果单机磁盘损坏,消息也会丢失。因此需要副本机制。
      • 异步复制(ASYNC_MASTER):消息写入主节点成功后即返回,然后异步复制到从节点。性能好,但如果主节点磁盘损坏且未完成复制,消息会丢失。
      • 同步复制(SYNC_MASTER):主节点等待消息被至少一个从节点成功复制后,才向 Producer 返回 ACK。
        • 在 broker.conf 中配置:brokerRole = SYNC_MASTER
最佳实践:对于金融等严格要求数据不丢失的场景,采用 同步刷盘(SYNC_FLUSH)+ 同步复制(SYNC_MASTER) 的组合。这样只有消息在主机磁盘和从机磁盘上都存在后,才会确认成功,可靠性最高,但性能也最低。

2.3 消费阶段消息丢失

这个阶段指的是 Consumer 从 Broker 拉取消息后,在处理完成之前消息被确认为已消费,从而导致丢失。
产生原因:
  • 消费失败却自动提交位移成功:Consumer 拉取到消息,如果业务处理逻辑失败或发生异常,但 Consumer 却向 Broker 返回了 CONSUME_SUCCESS(通常是自动提交),那么 Broker 就会认为消息已成功处理,从而不会再投递。这是最典型的消费阶段消息丢失,是代码逻辑bug导致的。
解决方案
  • 手动提交位移 + 消费幂等性控制
    • 使用手动管理消费位移,即在业务逻辑确保完成(比如数据库事务提交后)再手动提交消费位移,返回 CONSUME_SUCCESS
    • 如果消费失败,Broker 会自动重新投递,默认会重试 16 次。所以消费端消费逻辑必须支持幂等。可以使用数据库唯一键、分布式锁、或记录已处理消息的 Message ID/Key 到 Redis/DB 中,在处理前先判断是否已经处理过。

3、消息堆积问题

消息堆积的直接表现是:生产消息的速率远大于消费者消费消息的速率。具体表现包括:
  1. 监控告警:RocketMQ Console 或其他监控系统中,看到某些 Topic 的 Consumer Lag(消费者滞后)指标持续增长。
  1. 消息延迟:业务方发现消息从发送到被处理的时间越来越长。
  1. 磁盘压力:Broker 节点的磁盘使用率持续升高(因为所有消息都持久化在磁盘上)。
  1. 消费者客户端监控:发现消费 TPS 很低,或者有大量的消费失败重试。
生产者、Broker、消费者(最常见)这三个环节都有可能发生消息堆积问题。
  • 消费者消息堆积(最常见)
    • 消费逻辑性能瓶颈:消费逻辑中有慢查询、复杂的计算、同步调用第三方接口(耗时过长)、或者线程阻塞(如锁竞争)。
    • Queue 的数量太少:Queue 的数量远少于消费者实例数,导致消费的并发度不够。
    • 消费线程数不足:默认的消费线程数是 20,如果消息处理本身是 CPU 密集型或 IO 密集型的,这个数量可能不够。
    • GC 问题:消费者应用频繁发生 Full GC,导致应用暂停,无法消费消息。
  • 生产者消息堆积
    • 流量激增:业务高峰(如秒杀、大促)时,生产者发送了远超平时数量的消息,超过了消费者的处理能力。
    • 消息体过大:发送了过大的消息(如几MB的报文),增加了网络传输和 Broker 存储、刷盘的压力,间接影响了消费速度。
  • Broker消息堆积
    • PageCache 繁忙:Broker 读写消息主要依赖操作系统的 PageCache。如果机器内存不足,或同时有大量其他 IO 操作,会导致 PageCache 压力大,读写变慢。
    • 磁盘 IO 瓶颈:Broker 使用的磁盘(尤其是机械硬盘)IOPS 或吞吐量达到上限,导致写入和读取消息变慢。
解决方案:整体思路是先紧急处理,再考虑长期优化。
第一步,紧急处理。
  1. 增加消费者实例(最有效):这是最快、最直接的方案。通过增加 Consumer 的实例数量(水平扩容),来提升整体的消费能力。例如,从 2 个实例扩容到 5 个或 10 个。
  1. 提升单个消费者的并行度
      • 增加消费线程数:将 ConsumeThreadMax 和 ConsumeThreadMin 调大。
      • 扩大消费的批量大小:对于 PushConsumer,可以调整 consumeMessageBatchMaxSize;对于 PullConsumer,可以手动拉取更多消息。
      注意:增加线程数和批量大小需要评估消费逻辑和机器资源(CPU、内存),避免把消费者应用拖垮。
  1. 临时 Topic 处理方案
    1. 创建临时 Topic,Queue 的数量是旧 Topic 的数倍
    2. 启动多个临时消费者,消费旧 Topic 的消息,不进行业务处理,而是快速转发到临时 Topic。
    3. 启动数倍原来消费者,去消费临时 Topic 的消息,进行业务处理。
    4. 在业务低峰期,再增加旧 Topic 的 Queue 数量和原来消费者的实例数。
第二步,长期优化。
  1. 优化消费代码
      • 检查消费逻辑中的数据库查询、RPC 调用、复杂计算等,进行性能优化。
      • 使用异步化处理:如果业务允许,可以将消息内容存入内存队列(如 Disruptor)或数据库,然后立即返回消费成功,再由后台线程异步慢慢处理。
      • 保证消费逻辑的幂等性,这是安全进行水平扩容和重试的基础。
  1. 批量消费
      • 如果业务场景适合,可以将消费模式改为批量消费。一次处理一批消息,能减少网络交互和数据库 IO 的次数,显著提升吞吐量。
      • 使用 DefaultMQPushConsumer 并设置 consumeMessageBatchMaxSize,或在消费逻辑中自己实现批量处理。

4、顺序消息问题

RocketMQ 保证消息在同一个 Queue 的有序性,即在同一个 Queue 内严格按照 FIFO(先进先出)的顺序进行发布和消费。
RocketMQ 实现顺序消息有两种方式:
  • 全局顺序: Topic 只设置一个 Queue,并严格到消费者是单线程发送,消费者是单线程消费,那么该 Topic 所有消息都是全局有序的。但这会严重影响并发性能,通常不推荐。
  • 分区顺序: Topic 可以设置多个 Queue,将一组需要保证顺序的消息(例如同一个订单 ID 的所有操作:创建、付款、发货)发送到同一个 Queue,这样可以保证这组消息的消费是顺序的。具体操作是:
    • 生产者在发送消息时自定义 MessageQueueSelector 接口的实现,最常见的做法是使用订单 ID 或用户 ID 等业务键作为分片键(Sharding Key),RocketMQ 会确保同一个键的消息总是落到同一个 Queue 中。另外生产者必须使用同步发送syncSend),并等待上一条消息发送成功后再发送下一条。如果使用异步发送,无法保证网络返回的先后顺序就是实际发送的顺序。
    • 消费者以 顺序消费模式MessageListenerOrderly)来消费消息,且必须以单线程的方式消费。如果某条消息消费失败,消费者会自动在本地进行重试(默认重试次数),期间会阻塞对该队列其他消息的消费,而不是跳过失败的消息去消费后面的。

5、RocketMQ和Kafka对比

特性维度
Apache Kafka
Apache RocketMQ
说明与对比
核心定位
分布式事件流平台
金融级可靠的消息队列
Kafka 偏向于海量数据流的处理;RocketMQ 偏向于业务消息的可靠传递。
消息模型
基于 Topic 和 Partition 的拉模型
基于 Topic 和 Queue 的拉/推模型
RocketMQ 支持推和拉,对客户端更友好;Kafka 只有拉模型,更利于客户端控制。
消息顺序
分区内有序
队列内有序(支持普通顺序和严格顺序)
两者都能通过将消息路由到同一分区/队列来保证顺序。RocketMQ 的严格顺序模式牺牲了一些可用性。
消息可靠性
非常高(通过副本机制)
非常高(同步刷盘+主从同步)
两者都可通过配置达到零丢失。RocketMQ 的同步双写(同步刷盘+主从)可靠性极高。
事务消息
支持(但相对复杂,依赖外部状态)
原生支持,非常完善(两阶段提交)
RocketMQ 的事务消息是其核心亮点,与业务场景结合紧密,使用简便。
定时/延迟消息
不支持(需自实现)
原生支持(18个延迟级别)
RocketMQ 开箱即用,非常适合电商场景中的超时关单等需求。
消息回溯
支持(基于偏移量 Offset)
支持(基于时间戳和偏移量)
两者都支持,RocketMQ 可按时间回溯,更灵活。
吞吐量
极致的高吞吐
非常高,通常略低于 Kafka
Kafka 在顺序读写和 Zero-Copy 方面优化得更极致,吞吐量是其最大优势。
延迟
毫秒到秒级(吞吐优先)
亚毫秒到毫秒级(低延迟优化)
RocketMQ 在写入和消费延迟上通常更低,更适合在线业务。
消息过滤
弱(主要通过 Topic 和 Consumer Group)
(支持 Tag 和 SQL92 语法过滤)
RocketMQ 的 Tag 过滤非常实用,可以在服务端过滤,减少网络传输。
语言生态
极其丰富(官方支持多种语言,社区活跃)
主要支持 Java,其他语言客户端由社区维护
Kafka 的生态更强大,尤其是在大数据领域。
社区与成熟度
极高,Apache 顶级项目,业界事实标准
,Apache 顶级项目,在国内经过阿里海量业务考验
Kafka 全球社区更活跃;RocketMQ 在国内拥有巨大影响力。
管理工具
有官方和第三方工具(如 Kafka Manager)
功能强大的官方控制台
RocketMQ 自带的管理控制台功能非常全面,易于运维。
存储方式
每个 Partition 对应一个顺序追加的日志文件。Consumer 对该文件进行“顺序读”,这种存储方式,对于每个文件来说是顺序 IO,性能提高。
所有 Topic 的消息都写入一个统一的 Commit Log 文件,然后异步构建消费队列(ConsumeQueue)索引,Consumer 通过索引定位到消息。这种写单一文件的方式也能获得极高的顺序写性能。
两者都是顺序写,性能极高
选择 Kafka 的场景:
  • 大数据日志采集、实时数据管道:需要将海量数据(如用户行为日志、应用监控数据)从一个系统传输到另一个系统(如 Hadoop、Spark、ES)。
  • 流式处理:需要基于消息流进行实时计算、分析、聚合等(通常配合 Kafka Streams、Flink、Spark Streaming)。
  • 活动追踪、运营指标:需要高吞吐、可持久化的消息流来记录事件。
  • 核心诉求是吞吐量,并且技术团队有能力进行深度运维和调优。
选择 RocketMQ 的场景:
  • 金融、电商等核心业务系统:对事务一致性有严格要求,如支付、订单等场景。
  • 需要高可靠、低延迟的异步通信:如业务解耦、微服务间的通信。
  • 需要丰富的消息功能:如定时消息(延时任务)、消息过滤(Tag)、消息回溯等。
  • Java 技术栈为主,希望有开箱即用的强大管理控制台,降低运维成本。

6、RocketMQ为什么这么快

  1. 顺序写:写入 CommitLog 的时候是顺序写入的,而且是所有 Topic 的消息都写入同一份 CommitLog 文件,这样比随机写入的性能就会提高很多。
  1. Page Cache + 零拷贝
      • Page Cache:消息首先写入操作系统的 Page Cache,而不是直接刷盘。后续的读取操作也会优先从 Page Cache 中查找。这相当于使用了速度极快的内存作为缓存,只有当内存不足时,才会进行磁盘交换。
      • 零拷贝技术
        • 传统方式:磁盘文件 -> 内核缓冲区 -> 用户缓冲区 -> 内核缓冲区(Socket)-> 网络设备。经历了 4 次上下文切换和 4 次数据拷贝。
        • RocketMQ 的方式:主要使用 mmap(内存映射)和 sendfile 系统调用。
          • mmap:CommitLog,ConsumeQueue,IndexFile 都是通过 mmap() 创建出来的文件。在消息存储(写 CommitLog)和消息拉取(读 ConsumeQueue)时,它将磁盘文件直接映射到进程的虚拟地址空间,使得应用程序可以像操作内存一样操作文件,避免了数据从内核空间到用户空间的拷贝。
          • sendfile:在消息发送时(消费者拉取消息),数据直接从文件描述符(映射了 CommitLog 的文件)传输到网络套接字,完全绕开了用户空间,实现了真正的“零拷贝”。

7、RocketMQ为什么自研NameServer而不是使用Zookeeper

简单来说就是 Zookeeper 太重了,在消息队列这个场景,服务发现中心只需要提供服务发现、心跳、路由这几个功能就可以了,用不到 Zookeeper 那么强大的功能。
Zookeeper的“重”:Zookeeper 是一个功能完备的分布式协调系统,它提供了强一致性(通过ZAB协议)、选举、分布式锁、配置管理等一系列强大功能。但这些功能对于 RocketMQ 的核心需求来说,是过度的沉重的。引入 Zookeeper 意味着引入了额外的复杂性,如需要部署和维护 Zookeeper 集群、处理 Zookeeper 的会话管理、监听机制等。
NameServer的“轻”:RocketMQ的核心需求非常简单直接:
  • 注册:Broker 启动后向所有 NameServer 注册自己的路由信息(Topic、队列等)。
  • 心跳:Broker 定期向 NameServer 发送心跳,证明自己还活着。
  • 路由查询:Producer 和 Consumer 客户端定期从 NameServer 拉取最新的路由信息,知道要去哪个Broker上收发消息。
NameServer完美地、且仅完美地满足了这三个需求。它没有选举、没有复杂的分布式协议,逻辑非常简单,就是一张动态的路由表
RocketMQ 使用 NameServer 相较于 Zookeeper 的优势:
  1. 运维简单:不需要引入一个重量级的中间件,简化系统部署和后期运维的难度。
  1. 功能满足需求:RocketMQ 需要的三个核心功能(服务发现、心跳和路由查询),NameServer 已经可以提供,不需要再引入额外的复杂性。
  1. 写入和读取性能更高:NameServer 采用去状态化的设计,任何一个节点都可以读取和写入数据,通过最终一致性模型保证所有的节点的数据一致;而 Zookeeper 采用的是强一致性模型,同一时间只有 Leader 可以写入数据,且需要同步到所有 Follwer 后数据才能被读取。所以 NameServer 的写入和读取性能远高于 Zookeeper。
特性
RocketMQ NameServer
Apache Zookeeper
对RocketMQ的意义
一致性模型
最终一致性 (AP)
强一致性 (CP)
满足需求,读性能极高,实现简单
数据同步
无同步,Broker分别上报
基于ZAB协议的Quorum同步
简单,无同步开销,延迟低
功能
极其简单:注册、心跳、路由查询
功能丰富:选举、锁、配置管理等
够用就好,避免不必要的复杂度和重量级依赖
性能
高吞吐,低延迟(尤其是读)
写有瓶颈,读尚可
适应海量Topic和客户端的路由查询场景
扩展性
极易水平扩展,节点间无状态
扩展性相对较差,写性能随节点增加可能下降
轻松应对集群规模增长
容错性
某个节点宕机不影响服务,Broker会向其他节点汇报
依赖Quorum,少数节点宕机不影响,Leader选举期间不可用
满足高可用需求,且故障恢复更快
运维
简单,无额外依赖
相对复杂,需要独立运维ZK集群
降低整体系统的运维成本
RocketMQ自研NameServer不是一个技术上的“重复造轮子”,而是一个极具针对性的架构设计。它深刻理解了消息队列场景的特定需求(高吞吐、低延迟、最终一致性可接受),并据此做出了最合适的技术选型。
领域驱动(DDD)简介RocketMQ系列:集群管理
Loading...