第一章 RocketMQ入门笔记

mac2024-01-25  37

一、RocketMQ介绍

kafka最初是LinkedIn的一个内部基础设施系统。最初开发的起因是LinkedIn虽然有了数据库和其他系削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双11使用的核心产品。RocketMQ的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。

1)NameServer设计及其简单,RocketMQ摈弃了业界常用的Zookeeper充当消息管理的“注册中心”,而是使用自主研发的NameServer来实现各种元数据的管理(Topic路由信息等)。2)高效的I/O存储,RocketMQ追求消息发送的高吞吐量,RocketMQ的消息存储设计成文件组的概念,组内单个文件固定大小,引入了内存映射机制,所有主题的消息存储基于顺序读写,极大提高消息写性能,同时为了兼顾消息消费与消息查找,引入消息消费队列文件与索引文件。3)容忍存在设计缺陷,适当将某些工作下放给RocketMQ的使用者,比如消息只消费一次,这样极大的简化了消息中间件的内核,使得RocketMQ的实现发送变得非常简单与高效。RocketMQ原先阿里巴巴内部使用,于2017年提交到Apache基金会成为Apache基金会的顶级开源项目,GitHub 代码库链接: https://github.com/apache/rocketmq.git。

1、核心概念

 

1.1、NameServer

NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Rocket中的Broker。Broker在启动时向所有NameServer注册(主要是服务器地址等),生产者在发送消息之前先从NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

NameServer与每台Broker服务保持长连接,并间隔30S检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除,这样就可以实现RocketMQ的高可用。

1.2、主题

主题Topic:消息主题,一级消息类型,生产者向其发送消息,消费者负责从Topic接收并消费消息。

1.3、生产者

生产者:也称为消息发布者,负责生产并发送消息至Topic。

1.4、消费者

消费者:也称为消息订阅者,负责从Topic接收并消费消息。

1.5、消息

消息:生产者或消费者进行消息发送或消费的主题,对于RocketMQ来说,消息就是字节数组。

以下我们将总结下Rocket的整体运转: 

1)NameServer先启动。

2)Broker启动时向NameServer注册。

3)生产者在发送某个主题的消息之前先从NamerServer获取Broker服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker进行消息发送。

4)NameServer与每台Broker服务器保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除。

5)消费者在订阅某个主题的消息之前从NamerServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。

2、RocketMQ的设计理念和目标

2.1、设计理念

基于主题的发布和订阅,其核心功能,消息发送、消息存储和消息消费,整体设计追求简单与性能。

NameServer性能对比Zookeeper有极大的提升:

高效的IO存储机制;基于文件顺序读写;内存映射机制;容忍设计缺陷,比如消息只消费一次,Rocket自身不保证,从而简化Rocket的内核使得Rocket简单与高效,这个问题交给消费者去实现(幂等)。 

2.2、设计目标

架构模式:发布订阅模式,主要组件:消息发送者、消息服务器(消息存储)、消息消费、路由发现。

顺序消息:RocketMQ可以严格保证消息有序

消息过滤:消息消费是,消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息,可以支持在服务端与消费端的消息过滤机制。

消息存储:一般MQ核心就是消息的存储,对存储一般来说两个维度:消息堆积能力和消息存储性能。RocketMQ追求消息存储的高性能,引入内存映射机制,所有的主题消息顺序存储在同一个文件中。同时为了防止无限堆积,引入消息文件过期机制和文件存储空间报警机制。

消息高可用:

1)Rocket关机、断电等情况下,Rocket可以确保不丢失消息(同步刷盘机制不丢失,异步刷盘会丢失少量)。

2)另外如果Rocket服务器因为CPU、内存、主板、磁盘等关键设备损坏导致无法开机,这个属于单点故障,该节点上的消息全部丢失,如果开启了异步复制机制,Rocket可以确保只丢失很少量消息。

3)如果引入双写机制,这样基本上可以满足消息可靠性要求极高的场景(毕竟两台主服务器同时故障的可能性还是非常小)。

消息消费低延迟:RocketMQ在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。

确保消息必须被消费一次:消息确认机制(ACK)来确保消息至少被消费一次,一般ACK机制只能做到消息只被消费一次,有重复消费的可能。

消息回溯:已经消费完的消息,可以根据业务要求重新消费消息。

消息堆积:消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,RocketMQ采用磁盘文件存储,所以堆积能力比较强,同时提供文件过期删除机制。

定时消息:定时消息,定时消息是指消息发送到Rocket Broker上之后,不被消费者理解消费,要到等待一定的时间才能进行消费,apache的版本目前只支持等待指定的时间才能被消费,不支持任意精度的定时消息消费(一个说法是任意精度的定时消息会带来性能损耗,但是阿里云版本的RocketMQ却提供这样的功能,充值收费优先策略?)。

消息重试机制:消息重试是指在消息消费时,如果发送异常,那么消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制。

二、RocketMQ中消息的发送

 普通消息是指消息队列RocketMQ中无特性的消息,区别于有特性的定时/延时消息、顺序消息和事务消息。RocketMQ发送普通消息有三种实现方式:单向(OneWay)发送、可靠同步发送、可靠异步发送。

消息生产的客户端依赖如下:

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-client</artifactId>     <version>4.4.0</version></dependency>

配置文件broker.conf

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

1、单向(OneWay)发送

单向发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答;此方式发送消息的过程耗时非常短,一般在微秒级别。

代码演示:

public class OnewayProducer {     public static void main(String[] args) throws Exception{         //生产者实例化         DefaultMQProducer producer = new DefaultMQProducer("oneway");         //指定rocket服务器地址         //producer.setNamesrvAddr("localhost:9876");         producer.setNamesrvAddr("localhost:9876");         //启动实例         producer.start();         for (int i = 0; i < 10; i++) {             //创建一个消息实例,指定topic、tag和消息体             Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */,                     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)             );             //发送消息             producer.sendOneway(msg);             System.out.printf("%s%n",  new String(msg.getBody()));         }         //生产者实例不再使用时关闭.         producer.shutdown();     } }

1.1、Producer Group(生产者分组)

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组,在这里可以不用关心,只要知道有这么一个概念即可。RocketMQ中的生产者组只能有一个在用的生产者。

分组的作用如下(简单的场景不需要了解这个概念):

1)标识一类Producer;

2)可以通过运维工具查询这个发送消息应用下有多个Producer实例;

3)发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。

1.2、Producer实例

Producer的一个对象实例,不同的Producer实例可以运行在不同进程内或者不同机器上,Producer实例线程安全,可在同一进程内多线程之间共享。

1.3、Message Key

Key一般用于消息在业务层面的唯一标识。对发送的消息设置好Key,以后可以根据这个Key来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ会创建专门的索引文件,用来存储Key与消息的映射,由于是Hash索引,应尽量使Key唯一,避免潜在的哈希冲突。

Tag和Key的主要差别是使用场景不同,Tag用在Consumer代码中,用于服务端消息过滤,Key主要用于通过命令进行查找消息RocketMQ并不能保证message id唯一,在这种情况下生产者在push消息的时候可以给每条消息设定唯一的key,消费者可以通过message key保证对消息幂等处理。

1.4、Tag

消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。Topic与Tag 都是业务上用来归类的标识,区分在于Topic是一级分类,而Tag可以理解为是二级分类。

以天猫交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建Topic_Order和Topic_Pay,其中订单消息根据商品品类以不同的Tag再进行细分,如电器类、男装类、女装类、化妆品类,最后他们都被各个不同的系统所接收。通过合理的使用Topic和Tag,可以让业务结构清晰,更可以提高效率。 您可能会有这样的疑问:到底什么时候该用Topic,什么时候该用Tag?

1)消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的Topic,无法通过Tag进行区分。

2)业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的Topic进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分。

3)消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分。

4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的Topic。

2、可靠同步发送

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

 

代码演示:

public class SyncProducer {     public static void main(String[] args) throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("sync");         //producer.setRetryTimesWhenSendFailed(2);         producer.setNamesrvAddr("localhost:9876");         producer.start();         for (int i = 0; i < 10; i++) {             Message msg = new Message("TopicTest" , "TagB" ,                     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)             );             SendResult sendResult = producer.send(msg);             System.out.printf("%s%n%n%n", sendResult.getSendStatus()+":(MsgId):"                     +sendResult.getMsgId()+":(queueId):"                     +sendResult.getMessageQueue().getQueueId()                     +"(value):"+ new String(msg.getBody()));         }         producer.shutdown();     } }

Message ID:消息的全局唯一标识(内部机制的 ID 生成是使用机器 IP 和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑 Key),由消息队列 MQ

系统自动生成,唯一标识某条消息。

SendStatus:发送的标识。成功,失败等

Queue:

 

RocketMQ收到消息后,所有主题的消息都存储在commitlog文件中,当消息到达commitlog后,将会采用异步转发到消息队列,也就是consumerqueue,Queue是数据分片的产物,数据分片可以提高消费者的效率。

 

broker.conf

# 在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数

defaultTopicQueueNums=4

3、可靠异步发送

代码演示:

public class AsyncProducer {     public static void main(String[] args) throws Exception{         //生产者实例化         DefaultMQProducer producer = new DefaultMQProducer("async");         //指定rocket服务器地址         producer.setNamesrvAddr("localhost:9876");         //启动实例         producer.start();         //发送异步失败时的重试次数(这里不重试)         producer.setRetryTimesWhenSendAsyncFailed(0);         int messageCount = 10;         final CountDownLatch countDownLatch = new CountDownLatch(messageCount);         for (int i = 0; i < messageCount; i++) {             try {                 final int index = i;                 Message msg = new Message("TopicTest", "TagC", "OrderID"+index,                         ("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET));                 //生产者异步发送                 producer.send(msg, new SendCallback() {                     @Override                     public void onSuccess(SendResult sendResult) {                         countDownLatch.countDown();                         System.out.printf("%-10d OK %s %n", index, new String(msg.getBody()));                     }                     @Override                     public void onException(Throwable e) {                         countDownLatch.countDown();                         System.out.printf("%-10d Exception %s %n", index, e);                         e.printStackTrace();                     }                 });             } catch (Exception e) {                 e.printStackTrace();             }         }         //Thread.sleep(1000);         countDownLatch.await(5, TimeUnit.SECONDS);         producer.shutdown();     } }

消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送;发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

 

4、RocketMQ中消息发送的权衡

三种发送方式的对比

 

三、RocketMQ消息消费

1、集群消费和广播消费

基本概念:消息队列RocketMQ是基于发布/订阅模型的消息系统,消息的订阅方订阅关注的Topic,以获取并消费消息。由于订阅方应用一般是分布式系统,以集群方式部署有多台机器,因此消息队列RocketMQ约定以下概念:集群——使用相同Group ID的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括Tag的使用),这些订阅者在逻辑上可以认为是一个消费节点。

集群消费:当使用集群消费模式时,消息队列RocketMQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。

广播消费:当使用广播消费模式时,消息队列RocketMQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

2、场景对比

2.1、集群消费模式:

 

适用场景&注意事项:

消费端集群化部署,每条消息只需要被处理一次。由于消费进度在服务端维护,可靠性更高。集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

2.2、广播消费模式:

 

适用场景&注意事项:

广播消费模式下不支持顺序消息。广播消费模式下不支持重置消费位点。每条消息都需要被相同逻辑的多台机器处理。消费进度在客户端维护,出现重复的概率稍大于集群模式。广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

2.3、使用集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个Group ID,用于订阅同一个Topic。

 

适用场景&注意事项:

每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。消费进度在服务端维护,可靠性高于广播模式。对于一个Group ID来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。 假设Group ID1部署了三个消费者实例C1、C2、C3,那么这三个实例将共同分担服务器发送给Group ID1的消息。同时,实例之间订阅关系必须保持一致。

3、消费方式

3.1、推模式

代码上使用 DefaultMQPushConsumer

这种模型下,系统收到消息后自动调用处理函数来处理消息,自动保存 Offset,并且加入新的消费者后会自动做负载均衡。

底层实现上,推模式还是使用的 pull 来实现的,pull 就是拉取,push 方式是 Server 端接收到消息后,主动把消息推给Client端,实时性高。但是使用Push 方式有很多弊端,首先加大 Server 端的工作量,其次不同的 Client 端处理能力不同,Client 的状态不受 Server 控制,如果 Client 不能及时处理 Server推送过来的消息,会造成各种潜在问题。

所以 RocketMQ 是通过“长轮询”的方式,同时通过 Client 端和 Server 端的配合,达到既拥有 Pull 的优点,又能达到确保实时性的目的。

3.2、拉模式

代码上使用DefaultMQPullConsumer,使用方式类似,但是更加复杂,除了像推模式一样需要设置各种参数之外,还需要处理额外三件事情:

1)获取MessageQueues并遍历(一个Topic包括多个MessageQueue),如果是特殊情况,也可以选择指定的MessageQueue来读取消息。

2)维护Offsetstore,从一个MessageQueue里拉取消息时,要传入Offset参数,随着不断的读取消息,Offset会不断增长。这个时候就需要用户把Offset存储起来,根据实际的情况存入内存、写入磁盘或者数据库中。

3)根据不同的消息状态做不同的处理。

四、深入消息发送

1、消息生产者流程

 

生产者的流程主要讲述DefaultMQProducer类的具体实现。

消息发送的主要流程:验证消息、查找路由、消息发送(包含异常机制)

验证消息:主要是要求主题名称、消息体不能为空、消息长度不能等于0,且不能超过消息的最大的长度4M(生产者对象中配置maxMessageSize=1024*1024*4)。

查找路由:客户端(生产者)会缓存topic路由信息(如果是第一次发送消息,本地没有缓存,查询NameServer尝试获取),路由信息主要包含了消息队列(queue相关信息)。

消息发送:选择消息队列,发送消息,发送成功则返回。选择消息队列两种方式(一般有两种,这里不做详细讲解,后续做详细讲解)。

2、批量消息发送

public class SimpleBatchProducer {     public static void main(String[] args) throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");         producer.setNamesrvAddr("localhost:9876");         producer.start();         //如果一次只发送不超过4M的消息,那么批处理很容易使用         //同一批的消息应该有:相同的主题,相同的waitstoremsgok,不支持调度         String topic = "TopicTest";         List<Message> messages = new ArrayList<>();         messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));         messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));         messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));         producer.send(messages);         System.out.printf("Batch over");         producer.shutdown();     } }

注意单批次不能超过消息的最大的长度4M(生产者对象中配置maxMessageSize=1024*1024*4)

3、消息重试机制

public class AsyncProducer {     public static void main(String[] args) throws Exception {         //生产者实例化         DefaultMQProducer producer = new DefaultMQProducer("async");         //指定rocket服务器地址         producer.setNamesrvAddr("192.168.0.128:9876");         //启动实例         producer.start();         //发送异步失败时的重试次数(默认值)         producer.setRetryTimesWhenSendAsyncFailed(2);         int messageCount = 10;         final CountDownLatch countDownLatch = new CountDownLatch(messageCount);         for (int i = 0; i < messageCount; i++) {             try {                 final int index = i;                 Message msg = new Message("TopicTest","TagC","OrderID"+index,                         ("Hello world "+index).getBytes(RemotingHelper.DEFAULT_CHARSET));                 //生产者异步发送                 producer.send(msg, new SendCallback() {                     @Override                     public void onSuccess(SendResult sendResult) {                         countDownLatch.countDown();                         System.out.printf("%-10d OK %s %n", index, new String(msg.getBody()));                     }                     @Override                     public void onException(Throwable e) {                         countDownLatch.countDown();                         System.out.printf("%-10d Exception %s %n", index, e);                         e.printStackTrace();                     }                 });             } catch (Exception e) {                 e.printStackTrace();             }         }         //Thread.sleep(1000);         countDownLatch.await(100, TimeUnit.SECONDS);         producer.shutdown();     } }

通过代码和源码,一般发送并忘记没有重试。注意重试的原则,一般会采用规避原则(规避原则就是上一次消息发送过程中发现错误,在某一段时间内,消息生产者不会选择该Broker上的消息队列,这样可以提高发送消息的成功率)。

规避原则:

如下图注意了,这里我们发现,有可能在实际的生产过程中,我们的RocketMQ有几台服务器构成的集群(集群后续会细讲)。

其中有可能是一个主题TopicA中的4个队列分散在Broker1、Broker2、Broker3服务器上。如果这个时候Broker2挂了,我们知道,但是生产者不知道(因为生产者客户端每隔30S更新一次路由,但是NamServer与Broker之间的心跳检测间隔是10S,所以生产者最快也需要30S才能感知Broker2挂了),所以发送到queue2的消息会失败,RocketMQ发现这次消息发送失败后,就会将Broker2排除在消息的选择范围,下次再次发送消息时就不会发送到Broker2,这样做的目的就是为了提高发送消息的成功率。

 

最新回复(0)