RocketMQ 源码分析 07 消息消费02

mac2022-06-30  88

 

每隔20秒就进行一下doRebalance;遍历订阅信息,对每个topic的队列进行重新负载

广播模式下,根据topic更新messageQueue

集群模式下,会使用负载均衡的策略,分配消费队列

1.机房就近

2.平均分配

3.平均轮询分配

4.根据配置分配

5.根据机房分配

6.一致性hash

 

然后根据分配结果,发生变更的话,就更新消费队列

1.CONSUME_FROM_LAST_OFFSET : 从队列最新偏移量开始消费

2.CONSUME_FROM_FIRST_OFFSET : 从队列头开始消费

3.CONSUME_FROM_TIMESTAMP : 从消费者启动的时间戳对应的消费进度开始消费

 

下面重点分步骤来详细探究 MQClientInstance.doRebalance 方法的执行流程。

2.1.1 MQClientInstance.doRebalance

1、根据 topic 来进行负载。

2、移除 MessageQueue,如果 MesageQueue 的 topic 不在订阅的主题中,接下来重点关注 rebalanceByTopic 方法。

RebalanceImpl rebalanceByTopic详解:

part1:根据消息消费模式(集群还是广播)我们先重点看集群模式。

part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行。

part3: 主要是对主题的消息队列排序、消费者ID进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列。

part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变。

目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将 MessageQueue 消息消费进度 持久化,然后丢弃,重新被其他消费者加载。顺序消息将会本系列的后续文章中详细介绍。

接下来处理 MessageQueue 的 ProcessQueue,也就是在 ProcessQueueTable 中没有 mq 的处理队列(因为重新负载后,可能会分配一些新的队列)。 主要就是在内存中移除 MessageQueue 的 offerset, 然后计算下一个拉取偏移量,然后每一个MessageQueue创建一个拉取任务(PullRequest)。 

part5:如果消息负载发生变化,需处理

本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:

1、首先RebalanceService线程启动,为消费者分配消息队列,其实每一个MessageQueue 会构建一个 PullRequest 对象,然后通过 RebalanceImpl 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage,通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中(DefaultMQPushConsumerImpl 的机制就是pullInterval 为 0;

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

下面对消息消费重试做一个简单的总结:

1、如果返回结果是 CONSUME_SUCCESS,此时 ackIndex = msg.size() - 1, 再看发送 sendMessageBack 循环的条件,for (int i = ackIndex + 1; i < msg.size() ;) 从这里可以看出如果消息成功,则无需发送 sendMsgBack 给 broker;如果返回结果是RECONSUME_LATER, 此时 ackIndex = -1 ,则这批所有的消息都会发送消息给 Broker,也就是这一批消息都得重新消费。

如果发送ack消息失败,则会延迟5s后重新在消费端重新消费。

首先消费者向 Broker 发送 ACK 消息,如果发生成功,重试机制由 broker 处理,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次将本次消费任务,默认延迟5S后在消费者重新消费。

根据消费结果,设置ackIndex的值。 如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送sendMessageBack。 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)。 2、需要延迟执行的消息,在存入 commitlog 之前,会备份原先的主题(retry+消费组名称)、与消费队列ID,然后将主题修改为SCHEDULE_TOPIC_XXXX,会被延迟任务 ScheduleMessageService 延迟拉取。

3、ScheduleMessageService 在执行过程中,会再次存入 commitlog 文件中放入之前,会清空延迟等级,并恢复主题与队列,这样,就能被消费者所消费,因为消费者在启动时就订阅了该消费组的重试主题。

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

RocketMQ之消息ACK机制

1、消息消费进度概述 首先简要阐述一下消息消费进度:

消费者订阅消息消费队列(MessageQueue), 当生产者将消息负载发送到 MessageQueue 中时,消费订阅者开始消费消息,消息消费过程中,为了避免重复消费,需要一个地方存储消费进度(消费偏移量)。

消息模式主要分为集群模式、广播模式:

集群模式:一条消息被集群中任何一个消费者消费。 广播模式:每条消息都被每一个消费者消费。 广播模式,既然每条消息要被每一个消费者消费,则消费进度可以与消费者保存在一起,也就是本地保存,但由于集群模式下,一条消息只能被集群内的一个消费者消费,进度不能保存在消费端,只能集中保存在一个地方,比较合适的是在 Broker 端。

 

广播模式下: 消费进度存储在消费者本地

集群模式下: 消费进度保存在Broker

消息消费进度保存在磁盘

2、消息消费进度存储接口

接下来我们先分析一下消息消费进度接口:OffsetStore。

根据消息消费模式(集群模式、广播模式)会创建不同的 OffsetStore 对象。

由于上篇文章,谈到广播模式消息,如果返回 CONSUME_LATER,竟然不会重试,而是直接丢弃,为什么呢?由于这个原因,这次破天荒的从广播模式的OffsetStore开始学习。

消息进度以本地文件方式保存。源码路径:org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore。

LocalFileOffsetStore 首先在 DefaultMQPushConsumerImpl#start 方法中创建,并 执行load方法加载消费进度。接下来结束一下几个关键的实现方法。

保存逻辑很简单,就没必要一一分析,其调用入口为:MQClientInstance#startScheduledTask。

顺藤摸瓜,原来是一个定时任务,默认消费端启动10秒后,每隔5s的频率持久化一次。

广播模式消费进度存储容易,但其实还是不明白为什么RocketMQ广播模式,如果消费失败,则丢弃,因为广播模式有时候也必须确保每个消费者都成功消费,通常的场景为,通过MQ刷新本地缓存等。

在集群模式下,多个消费者会负载到不同的消费队列上,因为消息消费进度是基于消息队列进行保存的,也就是不同的消费者之间的消费进度保存是不会存在并发的,但是在同一个消费者,非顺序消息消费时,一个消费者(多个线程)并发消费消息,比如m1 < m2,,但m2先消费完,此时是如何保存的消费进度呢?举个例子,如果m2的offset为5,而m1的offset为4,如果m2先消费完,保存进度为5,那m1消息消费完,保存进度为4,这样岂不乱来了,该如何处理呢

2.2.2 updateOffset

代码@1:如果当前并没有存储该mq的offset,则把传入的offset放入内存中(map)。

代码@3:如果offsetOld不为空,这里如果不为空,说明同时对一个MQ消费队列进行消费,并发执行。

代码@4,@5:根据 increaseOnly 更新原先的 offsetOld 的值。

2.2.3 readOffset 根据读取来源,读取消费队列的消费进度。

综上所述,我们了解到的情况是,广播模式,存放在消费者本地,集群模式,存储在Broker,存储文件,存放的是JSON。

也就是 OffsetStore 提供保存消费进度方法,也就是 {“consumeGroup" : [ {”ConsumeQueue1“:offset} ] }。

2.2.4 问题答疑 现在我们思考如下问题:下面讨论还是基于非顺序消息:

1、集群模式,一个消费组是多个线程消费该队列中的消息,并发执行,例如在q1中存在 m1,m2,m3,m4,m5

最后消费成功的顺序有可能是 m1,m3,m2,m5,m4,如果消费消息,就将该消息的offset存入offset中,岂不是会乱,如果一批拉取了多条消息,消费进度是如何保存的。要解决上述问题,我们移步到到调用offsetStore.updateStore方法,重点看一下那块逻辑:

主要一下,msgTreeMap 的类型,TreeMap, 按消息的 offset 升序排序,返回的 result, 如果 treemap 中不存在任何消息,那就返回该处理队列最大的偏移量+1,如果移除自己本批消息后,处理队列中,还存在消息,则返回该处理队列中最小的偏移量,也就是此时返回的偏移量有可能不是消息本身的偏移量,而是处理队列中最小的偏移量。

优点:防止消息丢失(也就是没有消费到)。

缺点:会造成消息重复消费。

 

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

定时消息机制

 

 

最新回复(0)