rocketmq 保证消息不丢失,如何做到,需要研究下如何维护消费进度的.
会会导致重复消费. 本文不考虑broker宕机无法启动,热备等情况
首先最终根数据保持在每个broker中的ConsumerOffsetManager类
[java]
- * Consumer消费进度管理
- *
- * shijia.wxr<vintage.wang@gmail.com>
- * 2013-8-11
- */
- public class ConsumerOffsetManager extends ConfigManager {
- private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer, Long>> offsetTable =
- new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
- }
缓存副本: 每个consumer的RemoteBrokerOffsetStore中
broker保存进度用的是 consumer传过来的commitOffset (本来以为是读取的offsetMax). (来自于consumer本地的 remoteOffSetStore里的offset ,最初来自于broker的offsetTable) .
* broker offset 更新流程:
1. 定时调用RemoteBrokerOffsetStore.persistAll 更新到broker
2. 每次pull_message时 上传的commitOffset,来自于本地offsetTable .
这个commitOffset 1. 最初来自于 consumer启动阶段, 启动负载均衡service时从broker获取的offset数据, 从而设置到pullRequest中,http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142693
2. 后续来自于consumer本地的offset值(详见consumer offset 更新流程: ) 代码在
[java]
- DefaultMQPushConsumerImpl.pullMessage(PullRequest) (com.alibaba.rocketmq.client.impl.consumer){
- long commitOffsetValue = 0L;
- if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
- commitOffsetValue =
- this.offsetStore.readOffset(pullRequest.getMessageQueue(),
- ReadOffsetType.READ_FROM_MEMORY);
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
- }
注: push模式, 后续的pull 通过 minOffset不停的获取数据. 不受消费进度offset的影响.
consumer offset 更新流程(共三处):
1.每次消费成功后,删除消费消息,获取本地 最小值( result = msgTreeMap.firstKey(); )更新
[java]
- ConsumeMessageConcurrentlyService.java 里
- public void processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest){
- ....
- long offset =consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); //笔者注:返回的是当前队列中的最小值
- if (offset >= 0) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),
- offset, true);
- }
- }
小技巧点:
[java]
- removeMessage(consumeRequest.getMsgs()); //笔者注:返回的是当前队列中的最小值,用的是当前队列内的最小值,而不是当前消费成功的值
例:多线程消费,一次拉取了10个消息,offset从 100 到110 ,如果最后一个先消费成功,第100还在消费中, 更改本地offset的时候 用的100(而不是110) . 这点实现保证了消费不丢失,但会出现多次消费的情况.
ps:
不管消息消费是否返回消费成功状态,都会执行上面两步.消费失败了,先发回到broker的retry队列中. 如果发送成功,再从本地队列中删除掉.如果发回到broker 因为网络原因失败.就会重新放入到本地队列. 并删除该消息并递增前进消费进度offset.
如果此时重启,由于消费进度已经前进,但消息没有被消费掉,也没有发回到broker,消息将丢失. 这算是个bug.
2. RequestCode.RESET_CONSUMER_CLIENT_OFFSET:命令
3. DefaultMQPushConsumerImpl.resetOffsetByTimeStamp
进阶用法,例如: 收集多个mq,然后异步刷新到数据库. 然后offset可能就不准了. 通过定时保存消息时间戳的方式,每次重启时自动回滚offset.保证消息不丢失. 但是会重复消费. 比较适合消息很频繁的情况.
类似与实时计算框架,先保存在内存中,然后汇总.
参考文献: 源代码 见github
store 消息不丢失:
http://blog.csdn.net/azhao_dn/article/details/7008590