第二章 深入理解RocketMQ消息笔记

mac2024-05-31  34

一、深入消息模式

RocketMQ提供两个模式进行消费

1、拉模式

代码上使用DefaultMQPullConsumer

1)获取MessageQueues并遍历(一个Topic包括多个MessageQueue),如果是特殊情况,也可以选择指定的MessageQueue来读取消息。

2)维护Offsetstore,从一个MessageQueue里拉取消息时,要传入Offset参数,随着不断的读取消息,Offset会不断增长。这个时候就需要用户把Offset存储起来,根据实际的情况存入内存、写入磁盘或者数据库中。

3)根据不同的消息状态做不同的处理。

拉取消息的请求后,会返回:FOUND(获取到消息),NO_MATCHED_MSG(没有匹配的消息),NO_NEW_MSG(没有新消息),OFFSET_ILLEGAL(非法偏移量)四种状态,其中必要重要的是FOUND(获取到消息)和NO_NEW_MSG(没有新消息)。

总结:这种模式下用户需要自己处理Queue,并且自己保存偏移量,所以这种方式太过灵活,往往我们业务的关注重点不在内部消息的处理上,所以一般情况下我们会使用推模式。

代码示例:消费者-拉模式

public class PullConsumer {     private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();     public static void main(String[] args) throws MQClientException {         //拉模式         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullconsumer");         consumer.setNamesrvAddr("127.0.0.1:9876");         //consumer.setBrokerSuspendMaxTimeMillis(1000);         System.out.println("ms:"+consumer.getBrokerSuspendMaxTimeMillis());         consumer.start();         //1.获取MessageQueues并遍历(一个Topic包括多个MessageQueue  默认4个)         Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");         for (MessageQueue mq : mqs) {             System.out.println("queueID:"+ mq.getQueueId());             //获取偏移量             long Offset = consumer.fetchConsumeOffset(mq,true);             System.out.printf("Consume from the queue: %s%n", mq);             SINGLE_MQ:             while (true) { //拉模式,必须无限循环                 try {                     PullResult pullResult = consumer.pullBlockIfNotFound(mq,null, getMessageQueueOffset(mq), 32);                     System.out.printf("%s%n",pullResult);                     //2.维护Offsetstore(这里存入一个Map)                     putMessageQueueOffset(mq, pullResult.getNextBeginOffset());                     //3.根据不同的消息状态做不同的处理                     switch (pullResult.getPullStatus()) {                         case FOUND: //获取到消息                             for (int i=0;i<pullResult.getMsgFoundList().size();i++) {                                 System.out.printf("%s%n", new String(pullResult.getMsgFoundList().get(i).getBody()));                             }                             break;                         case NO_MATCHED_MSG: //没有匹配的消息                             break;                         case NO_NEW_MSG:  //没有新消息                             break SINGLE_MQ;                         case OFFSET_ILLEGAL: //非法偏移量                             break;                         default:                             break;                     }                 } catch (Exception e) {                     e.printStackTrace();                 }             }         }         consumer.shutdown();     }     private static long getMessageQueueOffset(MessageQueue mq) {         Long offset = OFFSE_TABLE.get(mq);         if (offset != null)             return offset;         return 0;     }     private static void putMessageQueueOffset(MessageQueue mq, long offset) {         OFFSE_TABLE.put(mq, offset);     } }

2、推模式

代码上使用DefaultMQPushConsumer,Push方式是Server端接收到消息后,主动把消息推给Client端,实时性高,但是使用Push方式主动推送也存在一些问题:比如加大Server端的工作量,其次Client端的处理能力各不相同,如果Client不能及时处理Server推过来的消息,会造成各种潜在的问题。

2.1、长轮询

所以RocketMQ使用“长轮询”的方式来解决以上问题,核心思想是这样,客户端还是拉取消息,Broker端HOLD住客户端发过来的请求一小段时间,在这个时间内(5s)有新消息达到,就利用现有的连接立刻返回消息给Consunmer。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。因为长轮询方式的有局限性,是在HOLD住Consumer请求的时候需要占用资源,所以它适合在消息队列这种客户端连接数可控的场景中。

2.2、流量控制

Push模式基于拉取,消费者会判断获取但还未处理的消息个数、消息总大小、Offset的跨度3个维度来控制,如果任一值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。

两种情况会限流,限流的做法是放弃本次拉取消息的动作,并且这个队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列。

1)当前的ProcessQueue(一个主题有多个队列,每一个队列会对应有一个ProcessQueue来处理消息)正在处理的消息数量>1000。

2)队列中最大最小偏移量差距>2000,这个是为了避免一条消息堵塞,消息进度无法向前推进,可能造成大量消息重复消费。

2.3、消息队列负载与重新分布机制

在集群消费模式中,往往会有很多个消费者,对应消费一个主题(topic),一个主题中有很多个消费者队列(queue),我们要考虑的问题是,集群内多个消费者是如何负载主题下的多个消费者队列,并且如果有新的消费者加入,消息队列又会如何重新分布。

从源码的角度上看,RocketMQ消息队列重新分布是由RebalanceService线程来实现的,一个MQClientInstance持有一个RebalanceService实现,并且随着MQClientInstance的启动而启动。

备注:MQClientInstance是生产者和消费者中最大的一个实例,作为生产者或者消费者引用RocketMQ客户端,在一个JVM中所有消费者,生产者都持有同一个实例,MQClientInstance只会启动一次

RocketMQ默认提供5中分配算法:

如果有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8),有3个消费者(c1,c2,c3)

1)平均分配(AllocateMessageQueueAveragely)

c1:q1、q2、q3

c2:q4、q5、q6

c3:q7、q8

2)平均轮询分配(AllocateMessageQueueAveragelyByCircle)

c1:q1、q4、q7

c2:q2、q5、q8

c3:q3、q6

3)一直性Hash(AllocateMessageQueueConsistentHash)

不推荐使用,因为消息队列负载均衡信息不容易跟踪。

4)根据配置(AllocateMessageQueueByConfig)

为每一个消费者配置固定的消费队列。

5)根据Broker部署机房名(AllocateMessageQueueByMachineRoom)

对每一个消费者负载不同Broker上的队列。一般尽量使用“平均分配”“平均轮询分配”,因为分配算法比较直观。无论哪种算法,遵循的原则是一个消费者可以分配多个消息队列,同一个消息队列只会分配一个消费者,所以如果消费者个数大于消息队列数量,则有些消费者无法消费消息。

RebalanceService每隔20S进行一次队列负载:

每次进行队列重新负载时会查询出当前所有的消费者,并且对消息队列、消费者列表进行排序。因为在一个JVM中只会有一个pullRequestQueue对象。具体可见源码中PullMessageService类中代码实现。

public class PullMessageService extends ServiceThread {     private final InternalLogger log = ClientLogger.getLog();     private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();     private final MQClientInstance mQClientFactory;     private final ScheduledExecutorService scheduledExecutorService = Executors         .newSingleThreadScheduledExecutor(new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 return new Thread(r, "PullMessageServiceScheduledThread");             }         });     public PullMessageService(MQClientInstance mQClientFactory) {         this.mQClientFactory = mQClientFactory;     }     public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {         if (!isStopped()) {             this.scheduledExecutorService.schedule(new Runnable() {                 @Override                 public void run() {                     PullMessageService.this.executePullRequestImmediately(pullRequest);                 }             }, timeDelay, TimeUnit.MILLISECONDS);         } else {             log.warn("PullMessageServiceScheduledThread has shutdown");         }     }     public void executePullRequestImmediately(final PullRequest pullRequest) {         try {             this.pullRequestQueue.put(pullRequest);         } catch (InterruptedException e) {             log.error("executePullRequestImmediately pullRequestQueue.put", e);         }     }     public void executeTaskLater(final Runnable r, final long timeDelay) {         if (!isStopped()) {             this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);         } else {             log.warn("PullMessageServiceScheduledThread has shutdown");         }     }     public ScheduledExecutorService getScheduledExecutorService() {         return scheduledExecutorService;     }     private void pullMessage(final PullRequest pullRequest) {         final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());         if (consumer != null) {             DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;             impl.pullMessage(pullRequest);         } else {             log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);         }     }     @Override     public void run() {         log.info(this.getServiceName() + " service started");         while (!this.isStopped()) {             try {                 PullRequest pullRequest = this.pullRequestQueue.take();                 this.pullMessage(pullRequest);             } catch (InterruptedException ignored) {             } catch (Exception e) {                 log.error("Pull Message Service Run Method exception", e);             }         }         log.info(this.getServiceName() + " service end");     }     @Override     public void shutdown(boolean interrupt) {         super.shutdown(interrupt);         ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);     }     @Override     public String getServiceName() {         return PullMessageService.class.getSimpleName();     } }

2.4、消息确认(ACK)

PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。

public class PushConsumerB {     public static void main(String[] args) throws InterruptedException, MQClientException {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");         consumer.subscribe("TopicTest", "*");         consumer.setNamesrvAddr("192.168.0.128:9876");         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//每次从最后一次消费的地址         consumer.registerMessageListener(new MessageListenerConcurrently() {             @Override             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                 System.out.printf("queueID:%d:%s:Messages:%s %n",  msgs.get(0).getQueueId(),Thread.currentThread().getName(), new String(msgs.get(0).getBody()));                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;             }         });         consumer.start();         System.out.printf("ConsumerPartOrder Started.%n");     } }

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费者的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。如果某些已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息消费成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度;但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值。

 

这钟方式和传统的一条message单独ack的方式有本质的区别,性能上提升的同时,会带来一个潜在的重复问题。由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

2.5、消息进度存储

广播模式:

同一个消费组的所有消费者都需要消费主题下的所有消息,因为消费者的行为都是独立的,互不影响,固消息进度需要独立存储,所以这种模式下消息进度存储在消费者本地。

集群模式:

集群模式消息进度存储文件存放在服务器Broker上。

二、顺序消息

顺序消息(FIFO消息)是消息队列RocketMQ提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。顺序消息分为全局顺序消息分区顺序消息

1、全局顺序消息

RocketMQ在默认情况下不保证顺序,要保证全局顺序,需要把Topic的读写队列数设置为1,然后生产者和消费者的并发设置也是1。所以这样的话高并发,高吞吐量的功能完全用不上。

 

适用场景:

适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。

示例:

要确保全局顺序消息,需要先把Topic的读写队列数设置为1,然后生产者和消费者的并发设置也是1。

mqadmin update Topic -t AllOrder -c DefaultCluster -r 1 -w 1 -n 127.0.0.1:9876

在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。

2、部分顺序消息

对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费;Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

三、延时消息

1、概念介绍

延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

2、适用场景

消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

3、使用方式

Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销阿里云RocketMQ提供了任意时刻的定时消息功能ApacheRocketMQ并没有,阿里并没有开源)

发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。延迟消息是根据延迟队列的level来的,延迟队列默认是msg.setDelayTimeLevel(5)代表延迟一分钟。

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"是这18个等级(秒(s)、分(m)、小时(h)),level为1,表示延迟1秒后消费,level为5表示延迟1分钟后消费,level为18表示延迟2个小时消费。生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。

public class TimerProducer {     public static void main(String[] args) throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("sync");         producer.setNamesrvAddr("192.168.0.128:9876");         producer.start();         for (int i = 0; i < 10; i++) {             Message msg = new Message("TopicTest" ,"TagB" ,                     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)             );             // 这个是设置延时消息的属性             //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"  18个等级             msg.setDelayTimeLevel(4);             SendResult sendResult = producer.send(msg);             System.out.printf("%s%n", sendResult);         }         producer.shutdown();     } }

四、消息过滤

1、概念介绍

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是可以实现服务端的过滤。

2、表达式过滤

主要支持如下2种的过滤方式:

1)Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG可以用||分隔。其中Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层——Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同则丢弃该消息,不进行消息消费。

2)SQL92的过滤方式:这种方式的大致做法和上面的Tag过滤方式一样,只是具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。

具体使用见 http://rocketmq.apache.org/docs/filter-by-sql92-example/

注意如果开启SQL过滤的话,Broker需要开启参数enablePropertyFilter=true,然后服务器重启生效。

代码示例:生产者代码

public class SqlProducer {     public static void main(String[] args) {         DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");         producer.setNamesrvAddr("127.0.0.1:9876");         try {             producer.start();         } catch (MQClientException e) {             e.printStackTrace();             return;         }         for (int i = 0; i < 10; i++) {             try {                 String tag;                 int div = i % 3;                 if (div == 0) {                     tag = "TagA";                 } else if (div == 1) {                     tag = "TagB";                 } else {                     tag = "TagC";                 }                 Message msg = new Message("SQL", tag,                     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                 msg.putUserProperty("a", String.valueOf(i));                 SendResult sendResult = producer.send(msg);                 System.out.printf("%s%n", sendResult);             } catch (Exception e) {                 e.printStackTrace();                 try {                     Thread.sleep(1000);                 } catch (InterruptedException e1) {                     e1.printStackTrace();                 }             }         }         producer.shutdown();     } }

消费者代码:

public class SqlConsumer {     public static void main(String[] args) {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");         consumer.setNamesrvAddr("127.0.0.1:9876");         try {             consumer.subscribe("SQL",MessageSelector.bySql(" TAGS is not null and TAGS in ('TagA', 'TagB')  "));         } catch (MQClientException e) {             e.printStackTrace();             return;         }         consumer.registerMessageListener(new MessageListenerConcurrently() {             @Override             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,                 ConsumeConcurrentlyContext context) {                 System.out.printf("queueID:%d:%s:Messages:%s %n",  msgs.get(0).getQueueId(),                         msgs.get(0).getTags(), new String(msgs.get(0).getBody()));                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;             }         });         try {             consumer.start();         } catch (MQClientException e) {             e.printStackTrace();             return;         }         System.out.printf("ConsumerPartOrder Started.%n");     } }

3、类过滤

新版本(>=4.3.0)已经不支持(代码中FilterServerConsumer 新版本已经不支持了)

五、概要设计与事务

1、RocketMQ存储概要设计

目前的MQ中间件从存储模型来,分为需要持久化和不需要持久化的两种模型,现在大多数的是支持持久化存储的,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ。ZeroMQ却不需要支持持久化存储而业务系统也大多需要MQ有持久存储的能力,这样可以大大增加系统的高可用性。

从存储方式和效率来看,文件系统高于KV存储,KV存储又高于关系型数据库,直接操作文件系统肯定是最快的,但如果从可靠性的角度出发直接操作文件系统是最低的,而关系型数据库的可靠性是最高的。

RocketMQ主要存储的文件包括Commitlog文件、ConsumeQueue文件、IndexFile。RocketMQ将所有主题的消息存储在同一文件,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。

但由于一般的消息中间件是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每个消息队列有一个消息文件IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,可以根据消息的属性快速从Commitlog文件中检索消息,整体如下:

 

1 ) CommitLog:消息存储文件,所有消息主题的消息都存储在CommitLog文件中。

2 ) ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到消息消费队列,供消息消费者消费。

3 ) IndexFile:消息索引文件,主要存储消息Key与Offset的对应关系。

1.1、消息存储结构

1)CommitLog

CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享,文件地址:$ {user.home}\store\$ { commitlog} \ $ { fileName}。在CommitLog中,一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向CommitLog中顺序写 ,但是随机读。commitlog文件默认大小为1G,可通过在broker置文件中设置mapedFileSizeCommitLog属性来改变默认大小。

 

Commitlog文件存储的逻辑视图如下,每条消息的前面4个字节存储该条消息的总长度。但是一个消息的存储长度是不固定的。

 

2)ConsumeQueue

ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每Message Queue都有一个对应ConsumeQueue文件,文件地址在:

$ {$storeRoot} \consumequeue\$ {topicName} \$ { queueld} \$ {fileName}。

 

 

ConsumeQueue中存储的是消息条目,为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,每一个ConsumeQueue条目不会存储消息的全量信息,消息条目如下:

 

ConsumeQueue即为Commitlog文件的索引文件,其构建机制是当消息到达Commitlog文件后由专门的线程产生消息转发任务,从而构建消息消费队列文件ConsumeQueue与下文提到的索引文件。存储机制这样设计有以下几个好处:

① CommitLog顺序写,可以大大提高写入效率。

实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍。

② 虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。

③ 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message Key、Tag等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。

3)IndexFile

index存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息检索消息的速度,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。

 

4)Config

config文件夹中存储着Topic和Consumer等相关信息,主题和消费者群组相关的信息就存在在此。

topics.json : topic 配置属性;

subscriptionGroup.json :消息消费组配置信息;

delayOffset.json :延时消息队列拉取进度;

consumerOffset.json :集群消费模式消息消进度;

consumerFilter.json :主题消息过滤信息。

 

5)其他

abort:如果存在abort文件说明Broker非正常闭,该文件默认启动时创建,正常退出之前删除。

checkpoint:文件检测点,存储commitlog文件最后一次刷盘时间戳、ConsumeQueue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。

1.2、内存映射

内存映射文件,是由一个文件到一块内存的映射。文件的数据就是这块区域内存中对应的数据,读写文件中的数据,直接对这块区域的地址操作就可以,减少了内存复制的环节。所以说,内存映射文件比起文件I/O操作,效率要高,而且文件越大,体现出来的差距越大。

RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog,ConsumeQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

1.3、文件刷盘机制

RocketMQ存储与读写是基于JDK NIO的内存映射机制,具体使用MappedByteBuffer(基于MappedByteBuffer操作大文件的方式,其读写性能极高)RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息超出内存的限制,RocketMQ为了提高性能,会尽可能地保证磁盘的顺序在通过Producer写 RocketMQ的时候写消息,有两种写磁盘方式:

 

1)异步刷盘方式

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息积累到一定程度时,统一触发写磁盘动作,快速写入。

2)同步刷盘方式

在返回写成功状态时,消息已经被写人磁盘具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

消息存储时首先将消息追加到内存,再根据配值的刷盘策略在不同时间进行刷写磁盘,如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer force()方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端RocketMQ使用,一个单独的线程按照某个设定的频执行刷盘操作。

通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘), SYNC_FLUSH(同步刷盘)默认为异步。

3.3、总结

实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作,会明显降低性能;通常情况下,应该把Rocket置成异步刷盘方式。

1.4、过期文件删除

由于RocketMQ操作CommitLog,ConsumeQueue文件是基于内存映射机制并在启动的时候会加载 commitlog,ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。

删除过程分别执行清理消息存储文件(Commitlog)与消息消费队列文件(ConsumeQueue文件),消息消费队列文件与消息存储文件(Commitlog)共用一套过期文件机制。

RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为42小时(不同版本的默认值不同,这里以4.4.0为例),通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。

触发文件清除操作的是一个定时任务,而且只有一个定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s执行一次。

1)过期判断

文件删除主要是由这个配置属性:fileReservedTime:文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除,另外还有其他两个配置参数:

deletePhysicFilesInterval:删除物理文件的时间间隔(默认是100ms),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除, 因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。

destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用小于等于0为止,即可删除该文件。

2)删除条件

A、指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次;删除过期文件操作, 默认为凌晨4点。

B、磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio磁盘空间强制删除文件水位,默认是85),会触发过期文件删除操作。另外还有 RocketMQ的磁盘配置参数:①物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入。②物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75%) 表示磁盘使用正常。

2、RocketMQ中的事务消息

2.1、事务消息实现思想

RocketMQ事务消息,是指发送消息事件和其他事件需要同时成功或同失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。

RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是:先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,RocketMQ实现的具体流程如下:

 

1)发送方向RocketMQ发送“待确认”Prepare消息。

2)RocketMQ将收到的“待确认”(一般写入一个HalfTopic主题<RMQ_SYS_TRANS_HALF_TOPIC>)消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。

发送方开始执行本地事件逻辑:

3)发送方根据事件执行结果向RocketMQ发送二次确认(Commit还是Rollback)消息RocketMQ收到Commit则将第一阶段消息标记为可投递(这些消息才会进入生产时发送实际的主题RealTopic),订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。

4)如果出现异常情况,步骤3提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求。

5)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit Roolback状态。

2.2、两阶段提交

提交半事务是一个阶段,提交全事务和事务回查是另外一个阶段,所以称之为两阶段提交。

事务状态回查机制:RocketMQ通过TransactionalMessageCheckService线程定时去检测RMQ_SYS_ TRANS_ HALF_TOPIC主题中的消息,回查消息的事务状态;TransactionalMessageCheckService的检测频率默认为1分钟,可通过在broker.conf文件中设置transactionChecklnterval来改变默认值,单位为毫秒。

2.3、代码实现

com.chj.transaction包中,LocalTransactionState枚举类,COMMIT_MESSAGE提交消息,即broker确认了这条消息的正确性之后执行提交,标记这条消息可被消费,这样的话consumer就可以正常消费这条消息了;ROLLBACK_MESSAGE回滚消息,意思是当我们的本地主事务发生异常的时候,回滚本地事务的同时,同样需要一种方法通知到Rocketmq不要继续发送消息了,当broker收到这个命令时候就会标记消息为RollBack的状态,consumer就不能收到UNKNOW消息回查的状态。

消费者-事务消息的生产者代码实现:

public class TransactionProducer {     public static void main(String[] args) throws MQClientException, InterruptedException {         TransactionListener transactionListener = new TransactionListenerImpl();         // 支持事务的生产者 TransactionMQProducer         TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");         producer.setNamesrvAddr("192.168.0.128:9876");         // 设置用于事务消息的处理线程池         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 Thread thread = new Thread(r);                 thread.setName("client-transaction-msg-check-thread");                 return thread;             }         });         producer.setExecutorService(executorService);         //TODO 设置事务监听器,监听器实现接口org.apache.rocketmq.client.producer.TransactionListener         //TODO 监听器中实现需要处理的交易业务逻辑的处理,以及MQ Broker中未确认的事务与业务的确认逻辑         producer.setTransactionListener(transactionListener);         producer.start();         // TODO 生成不同的Tag,用于模拟不同的处理场景         String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};         for (int i = 0; i < 10; i++) {//10条消息             try {                 //组装产生消息                 Message msg = new Message("TopicTransaction", tags[i % tags.length], "KEY" + i,                                 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                 //TODO 以事务发送消息,并在事务消息被成功预写入到RocketMQ中后,执行用户定义的交易逻辑,    //TODO 交易逻辑执行成功后,再实现实现业务消息的提交逻辑                 SendResult sendResult = producer.sendMessageInTransaction(msg, null);                 System.out.printf("%s%n", sendResult);                 Thread.sleep(10);             } catch (MQClientException | UnsupportedEncodingException e) {                 e.printStackTrace();             }         }         for (int i = 0; i < 100000; i++) {             Thread.sleep(1000);         }         producer.shutdown();     } }

事务监听机制,主要是执行事务以及事务回查代码:

public class TransactionListenerImpl implements TransactionListener {     private AtomicInteger transactionIndex = new AtomicInteger(0);     //使用transactionId     private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();     //TODO 执行事务     @Override     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {         //TODO 执行本地事务开始         int value = transactionIndex.getAndIncrement();         //业务处理         System.out.println("执行本地事务..."+value);         int status = value % 3;         localTrans.put(msg.getTransactionId(), status);         switch (status) {             case 0:                 //LocalTransactionState.UNKNOW表示未知的事件,需要RocketMQ进一步服务业务进行确认该交易的处理                 return LocalTransactionState.UNKNOW;             case 1:                 return LocalTransactionState.COMMIT_MESSAGE;             case 2:                 return LocalTransactionState.ROLLBACK_MESSAGE; //这条消息抛弃了(账户余额不足 1W)             default:                 return LocalTransactionState.COMMIT_MESSAGE;         }         //TODO 执行本地事务结束     }     //该方法用于RocketMQ与业务确认未提交事务的消息的状态(一分钟执行一次)     @Override     public LocalTransactionState checkLocalTransaction(MessageExt msg) {         System.out.println("事务回查-----UNKNOW-----------");         Integer status = localTrans.get(msg.getTransactionId());         //业务处理(1分钟)         int mod = msg.getTransactionId().hashCode() % 2;         if (null != status) {             switch (mod) {                 case 0:                     return LocalTransactionState.ROLLBACK_MESSAGE;                 case 1:                     return LocalTransactionState.COMMIT_MESSAGE;                 default:                     return LocalTransactionState.COMMIT_MESSAGE;             }         }         return LocalTransactionState.COMMIT_MESSAGE;     } }

执行结果:

发送10条消息,其中四条未处理,三条成功,三条抛弃;然后等待事务回查,把未处理的四条休息成功处理两条,回滚两条。

执行本地事务...0

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803780000, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=3], queueOffset=0]

执行本地事务...1

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803900001, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=0], queueOffset=1]

执行本地事务...2

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE8039E0002, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=1], queueOffset=2]

执行本地事务...3

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803AA0003, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=2], queueOffset=3]

执行本地事务...4

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803B50004, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=3], queueOffset=4]

执行本地事务...5

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803C00005, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=0], queueOffset=5]

执行本地事务...6

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803CB0006, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=1], queueOffset=6]

执行本地事务...7

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803D60007, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=2], queueOffset=7]

执行本地事务...8

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803E20008, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=3], queueOffset=8]

执行本地事务...9

SendResult [sendStatus=SEND_OK, msgId=0AD6022F29E018B4AAC29EE803ED0009, offsetMsgId=null,

messageQueue=MessageQueue [topic=TopicTransaction, brokerName=陈化静LX18081303, queueId=0], queueOffset=9]

等待10秒后执行事务回查方法:

事务回查-----UNKNOW-----------

事务回查-----UNKNOW-----------

事务回查-----UNKNOW-----------

事务回查-----UNKNOW-----------

 

最新回复(0)