博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq 的cluster模式下并发消费同一队列确保消息不丢失之消费进度维护
阅读量:5877 次
发布时间:2019-06-19

本文共 2634 字,大约阅读时间需要 8 分钟。

hot3.png

rocketmq 保证消息不丢失,如何做到,需要研究下如何维护消费进度的.

会会导致重复消费. 本文不考虑broker宕机无法启动,热备等情况

 

首先最终根数据保持在每个broker中的ConsumerOffsetManager类

 

[java]  

  1.  * Consumer消费进度管理  
  2.  *   
  3.  *  shijia.wxr<vintage.wang@gmail.com>  
  4.  *  2013-8-11  
  5.  */  
  6. public class ConsumerOffsetManager extends ConfigManager {  
  7.    private ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer, Long>> offsetTable =  
  8.             new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);  
  9. }  

 

 
 

缓存副本: 每个consumer的RemoteBrokerOffsetStore中 

 

 

 

broker保存进度用的是 consumer传过来的commitOffset (本来以为是读取的offsetMax). (来自于consumer本地的 remoteOffSetStore里的offset ,最初来自于broker的offsetTable) .

* broker offset 更新流程:

 

1. 定时调用RemoteBrokerOffsetStore.persistAll 更新到broker                   

2. 每次pull_message时 上传的commitOffset,来自于本地offsetTable .

                        这个commitOffset 1. 最初来自于 consumer启动阶段, 启动负载均衡service时从broker获取的offset数据, 从而设置到pullRequest中,http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142693

                                                       2. 后续来自于consumer本地的offset值(详见consumer offset 更新流程: ) 代码在

 

[java]  

  1.  DefaultMQPushConsumerImpl.pullMessage(PullRequest)  (com.alibaba.rocketmq.client.impl.consumer){  
  2.      long commitOffsetValue = 0L;  
  3.         if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {  
  4.             commitOffsetValue =  
  5.                     this.offsetStore.readOffset(pullRequest.getMessageQueue(),  
  6.                         ReadOffsetType.READ_FROM_MEMORY);  
  7.             if (commitOffsetValue > 0) {  
  8.                 commitOffsetEnable = true;  
  9.             }  
  10.         }  
  11. }  

 

 

                      注:  push模式, 后续的pull 通过 minOffset不停的获取数据. 不受消费进度offset的影响.

consumer offset 更新流程(共三处): 

                    1.每次消费成功后,删除消费消息,获取本地 最小值( result = msgTreeMap.firstKey(); )更新

 

[java]  

  1. ConsumeMessageConcurrentlyService.java 里  
  2. public void processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest){  
  3.  ....  
  4.        long offset =consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); //笔者注:返回的是当前队列中的最小值  
  5.         if (offset >= 0) {  
  6.             this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),  
  7.                 offset, true);  
  8.         }  
  9. }  

 

小技巧点:  

[java]  

  1. removeMessage(consumeRequest.getMsgs()); //笔者注:返回的是当前队列中的最小值,用的是当前队列内的最小值,而不是当前消费成功的值  

 

例:多线程消费,一次拉取了10个消息,offset从 100 到110 ,如果最后一个先消费成功,第100还在消费中, 更改本地offset的时候 用的100(而不是110) . 这点实现保证了消费不丢失,但会出现多次消费的情况.

ps: 

    不管消息消费是否返回消费成功状态,都会执行上面两步.消费失败了,先发回到broker的retry队列中. 如果发送成功,再从本地队列中删除掉.如果发回到broker 因为网络原因失败.就会重新放入到本地队列. 并删除该消息并递增前进消费进度offset.  

   如果此时重启,由于消费进度已经前进,但消息没有被消费掉,也没有发回到broker,消息将丢失. 这算是个bug.

 

 

 

        2. RequestCode.RESET_CONSUMER_CLIENT_OFFSET:命令

         3.  DefaultMQPushConsumerImpl.resetOffsetByTimeStamp 

        进阶用法,例如: 收集多个mq,然后异步刷新到数据库. 然后offset可能就不准了. 通过定时保存消息时间戳的方式,每次重启时自动回滚offset.保证消息不丢失. 但是会重复消费. 比较适合消息很频繁的情况.

类似与实时计算框架,先保存在内存中,然后汇总.

参考文献: 源代码 见github

store 消息不丢失:

http://blog.csdn.net/azhao_dn/article/details/7008590

转载于:https://my.oschina.net/xiaominmin/blog/1795694

你可能感兴趣的文章
洛谷P2925 [USACO08DEC]干草出售Hay For Sale
查看>>
MapReduce工作原理流程简介
查看>>
那些年追过的......写过的技术博客
查看>>
小米手机解锁bootload教程及常见问题
查看>>
Python内置函数property()使用实例
查看>>
Spring MVC NoClassDefFoundError 问题的解决方法。
查看>>
CentOS 6.9配置网卡IP/网关/DNS命令详细介绍及一些常用网络配置命令(转)
查看>>
python基础教程_学习笔记19:标准库:一些最爱——集合、堆和双端队列
查看>>
C# 解决窗体闪烁
查看>>
CSS魔法堂:Transition就这么好玩
查看>>
【OpenStack】network相关知识学习
查看>>
centos 7下独立的python 2.7环境安装
查看>>
[日常] 算法-单链表的创建
查看>>
前端工程化系列[01]-Bower包管理工具的使用
查看>>
使用 maven 自动将源码打包并发布
查看>>
Spark:求出分组内的TopN
查看>>
Python爬取豆瓣《复仇者联盟3》评论并生成乖萌的格鲁特
查看>>
关于跨DB增量(增、改)同步两张表的数据小技巧
查看>>
飞秋无法显示局域网好友
查看>>
学员会诊之03:你那惨不忍睹的三层架构
查看>>