RocketMQ如何保证消息的可靠性、顺序性?
date
Apr 26, 2023
slug
RocketMQ如何保证消息的可靠性、顺序性?
status
Published
tags
MQ
summary
type
Post
可靠性:
可靠性需要生产者,Broker,消费者三者的配合
发送端
- 同步发送
阻塞式发送,通过拿到发送结果判断是否需要重试
- 异步发送
通过回调函数来判断发送结果从而选择相应的重试车略
单向发送(不能保证发送消息可靠性)
- 重试策略
Broker会上传Topic与Broker的路由信息到NameServer,生产者发送失败后可以通过本地缓存或者NameServer获取Broker和Topic的路由信息轮询到可以正常发送的Broker进行重试发送
- 消息发送失败处理方式
- 至多重试2次(同步发送为2次,异步发送为0次)。
- 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
- 如果本身向broker发送消息产生超时异常,就不会再重试。
Producer的send方法本身支持内部重试,重试逻辑如下:
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
Broker端
CommitLog&ConsumerQueue 保存了写入的消息和消费者组(ConsumerGroup)的消费位点
目前RocketMQ存储模型使用本地磁盘进行存储,数据写入为producer -> direct memory -> pagecache -> 磁盘
- 单机刷盘机制
- 同步刷盘,保证消息不丢失但吞吐量低
- 异步刷盘,有少量消息丢失风险(默认策略)
- 过期文件删除
Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉,与消息大小、消费者消费状态无关。当消息堆积超过一定的程度但储存空间不够的,RocketMQ也会删除未消费的过期信息导致消息丢失
- 主从间的消息复制
消费端
- 重试消费
消费失败可以重试消费,可设置重新投递时间间隔和次数
- 死信队列
重试消费次数达到阈值后消息放入死信队列
- 回溯消费
对于需要重新消费的消息,只要消息不过期仍然可以重新消费
其他
- 缩容的时候先减少写队列,再减少读队列,直到readQueueNums≥writeQueueNums
顺序性
顺序发送+顺序消费才能消息的顺序性
顺序发送
一个Topic室友分不到多个Broker的队列组成的,如果需要全局顺序那么必须保证单一生产者串行发送,但这样的吞吐量太低了,一般情况下只需要保证局部顺序就可以了,比如保证某一个订单的相关消息是顺序的。
使用MessageQueueSelector 是队列选择器在发送到消息的时候把同一个业务key的消息发送到同一个队列。这种方法在Broker掉线是会造成同一个业务key的消息乱序的。
顺序消费
Push Consumer并发消费是通过在注册消费回调接口时传入MessageListenerConcurrently接口的实现来完成。顺序消费设置与并发消费API层面只有一处不同,在注册消费回调接口时传入MessageListenerOrderly接口的实现。
在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。