RocketMQ如何保证消息的可靠性、顺序性?

date
Apr 26, 2023
slug
RocketMQ如何保证消息的可靠性、顺序性?
status
Published
tags
MQ
summary
type
Post

可靠性:

可靠性需要生产者,Broker,消费者三者的配合

发送端

  • 同步发送
    • 阻塞式发送,通过拿到发送结果判断是否需要重试
  • 异步发送
    • 通过回调函数来判断发送结果从而选择相应的重试车略
  • 单向发送(不能保证发送消息可靠性)
  • 重试策略
    • Broker会上传Topic与Broker的路由信息到NameServer,生产者发送失败后可以通过本地缓存或者NameServer获取Broker和Topic的路由信息轮询到可以正常发送的Broker进行重试发送
  • 消息发送失败处理方式
    • Producer的send方法本身支持内部重试,重试逻辑如下:
    • 至多重试2次(同步发送为2次,异步发送为0次)。
    • 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
    • 如果本身向broker发送消息产生超时异常,就不会再重试。
    • 以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

Broker端

CommitLog&ConsumerQueue 保存了写入的消息和消费者组(ConsumerGroup)的消费位点
目前RocketMQ存储模型使用本地磁盘进行存储,数据写入为producer -> direct memory -> pagecache -> 磁盘
  • 单机刷盘机制
    • 同步刷盘,保证消息不丢失但吞吐量低
    • 异步刷盘,有少量消息丢失风险(默认策略)
    • 过期文件删除
      • Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉,与消息大小、消费者消费状态无关。当消息堆积超过一定的程度但储存空间不够的,RocketMQ也会删除未消费的过期信息导致消息丢失
  • 主从间的消息复制

消费端

  • 重试消费
    • 消费失败可以重试消费,可设置重新投递时间间隔和次数
  • 死信队列
    • 重试消费次数达到阈值后消息放入死信队列
  • 回溯消费
    • 对于需要重新消费的消息,只要消息不过期仍然可以重新消费

其他

  • 缩容的时候先减少写队列,再减少读队列,直到readQueueNums≥writeQueueNums

顺序性

顺序发送+顺序消费才能消息的顺序性

顺序发送

一个Topic室友分不到多个Broker的队列组成的,如果需要全局顺序那么必须保证单一生产者串行发送,但这样的吞吐量太低了,一般情况下只需要保证局部顺序就可以了,比如保证某一个订单的相关消息是顺序的。
使用MessageQueueSelector 是队列选择器在发送到消息的时候把同一个业务key的消息发送到同一个队列。这种方法在Broker掉线是会造成同一个业务key的消息乱序的。

顺序消费

Push Consumer并发消费是通过在注册消费回调接口时传入MessageListenerConcurrently接口的实现来完成。顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入MessageListenerOrderly接口的实现。
notion image
在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。
 

© Ryan Tang 2021 - 2025