springboot基础学习5: RabbitMQ消息队列整合

mac2025-09-01  12

一、AMQP 基本概念 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念: 1、pulisher 不用说都知道是 生产者,是一个向交换器发布消息 的客户端应用程序 2、Exchange 交换器 用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 3、Broker 接收和分发消息的应用,RabbitMQ Server就是Message Broker,可以理解为消息队列服务器实体 4、Binding 交换器和队列(queue)通过某种路由方式 进行绑定。 一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则。 5、Queue 消息队列 用来保存消息 直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投递到一个或多个队列中。消息存储在队列中,当消费者以某种方式连接到队列将消息 进行消费掉。 6、Connection 网络连接 比如TCP 连接。 7、Channel 信道 多路复用,为什么要复用呢? 如果每一次访问RabbitMQ都建立一个连接,那么在消息量大的时候建立TCP 连接的开销非常大。所以就引用了信道的概念,信道是建立在真实的TCP连接内的虚拟连接。AMQP 命令都是通过信道发送出去的。不管是发布消息、订阅队列还是接受消息。这些动作都是通过信道完成的。 8、Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 9、Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

二、AMQP 运行机制 AMQP 中的消息路由:AMQP 中消息的路由过程和java 开发者熟悉的JMS存在一些差别,AMQP中增加了Exchange 和Binding的角色,生产者把消费者发布到Exchange 上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到那个队列。 一个消息是如何从生产者到达消费者的呢? 这个使用网络图片来说明

1、首先生产者一个消息,然后准备发布到某个队列上去。这个生产者将消息发送给 消息队列服务体(broker 可以理解为消息代理),当 Broker收到消息后 将消息给到一个合适的Exchage 上去,这个交换器是和消息队列绑定的,怎么绑定? 是通过路由键 将Exchange 和 Queues 进行绑定(Bindings)。 服务器中有非常多的Exchage和 Queue。 当消息给Exchage以后根据消息 携带过来的路由键来判断 将消息交给相应的队列。 Exchage 根据绑定规则将消息交给Queue以后 ,消费者就可以连接这个队列 来取出消息 进行消费。

/** * QUEUE_NAME 队列名称 * EXCHANGE_NAME 交换器名称 * routingKey 路由键 */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);

先使用一个简单的例子测试消息发送

package com.mq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * MQ 消息推送者 */ public class Send { /** 定义交换器的名字 */ private static final String EXCHANGE_NAME = "test_exchange"; private static final String QUEUE_NAME = "queue_name"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); /** * 创建一个连接 */ Connection connection = factory.newConnection(); /** * 创建一个频道 */ Channel channel = connection.createChannel(); /** .通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //声明创建队列 /** * * RabbitMQ可以给消息和RabbitMQ设置TTL。 目前有两种方法可以设置,第一种方法是通过队列设置, * 队列中所有消息都有相同的过期时间,第二种是对消息进行单独设置。每条消息TTL可以不同。如果上述两种方法同时使用, * 则消息的过期时间以两者之间TTL较小的那个数值为准。消息队列中生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。 */ Map<String, Object> argss = new HashMap<String, Object>(); argss.put("vhost", "/"); argss.put("username","root"); argss.put("password", "root"); /** 设置过期时间, // 如果不设置TTL,则表示此消息不会过期。 如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者, 否则该消息会被立即丢弃,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数, 之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。 */ argss.put("x-message-ttl",6000); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String[] routingKeys = {"error","info","warning"}; int count = 3; channel.confirmSelect(); for (int i = 0; i < count; i++) { /** * 发送的消息 */ String message = "Hello world"+i; /** 路由 */ String routingKey = routingKeys[i]; /** * //往队列中发出一条消息 * 1、先来测试消息 不是持久化的 情况 * * 这里是有 MessageProperties.PERSISTENT_TEXT_PLAIN 表示的是消息持久化,也就是 deliveryMode =2 */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey); /** * 功能描述: <br> 发送消息的时候设置消息deliverMode 将其设置为2 ,就是将消息设置为持久化, */ channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("Sent '"+routingKey +":" + message + "'"); } /** * 1、普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。 * 2、批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。 * 3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。 * * 这里是第二种模式:批量模式 */ if(!channel.waitForConfirms()){ System.out.println("send message failed."); } /** * 关闭频道和连接 */ channel.close(); connection.close(); } }

运行结果可以看到消息已经发送 RabbitMQ中队列已经创建并且消息已经进入队列。 队列中的消息 三、我们是springboot 和RabbitMQ整合使用 1、首先在pom文件中添加rabbitmq的依赖,这里使用的springboot是2.1.4.RELEASE 版本

<!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2、编写配置文件application.yml

server: port: 8089 tomcat: uri-encoding: UTF-8 spring: banner: charset: UTF-8 http: encoding: charset: UTF-8 enabled: true force: true messages: encoding: UTF-8 datasource: url: jdbc:mysql://127.0.0.1:3306/lin?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true username: root password: 123 driver-class-name: com.mysql.cj.jdbc.Driver application: name: spring-boot-rabbimq rabbitmq: # 这里不能配置 本地ip 地址,如果要配置ip地址会启动报错 host: localhost port: 5672 username: guest password: guest virtual-host: / logging: level: org.springframework.web: trace

3、消息发送者

/** * @ClassName: HelloSender * @Description: 消息发送 * @Author: lin * @Date: 2019/3/24 20:27 * History: * @<version> 1.0 */ @Component public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void sendMessage(){ String context = "hello" + new Date(); System.out.println("SendMessage:" + context); this.rabbitTemplate.convertAndSend("hello", context); } }

4、消息接收者

package org.learn.boot.rabbitmq.hello; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @ClassName: HelloReceiver * @Description: 消息接收者 * @Author: lin * @Date: 2019/3/24 23:32 * History: * @<version> 1.0 */ @Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello){ System.out.println("Receiver:"+ hello); } }

5、消息监听者

/** * 监听消息队列 ,通过注解的方式 * @ClassName: BankService * @Description: * @Author: lin * @Date: 2019/9/27 7:42 * History: * @<version> 1.0 */ @Service public class BankService { /** * 这样就表示这个队列一有消息,下面这里就会接收到消息进行打印 */ @RabbitListener(queues = "lin.news") public void receiver(Bank bank){ System.out.println("收到消息:"+ bank); } }

5、测试消息发送

@Test public void hello(){ helloSender.sendMessage(); }

可以看到消息已经发送和接收到了

6、Exchange 有几种类型 a、direct 这种就是一对一的方式,也是说路由键哈Binding的一致,交换器就会将消息发送到对应的队列中去,而不会发送到其他的队列中去。如下 会将消息发送到 lin.news这个指定的路由键中去。而和这个路由键绑定的队列 是lin.news 所以消息发送到这个队列中来 测试代码

/** * 1、测试点对点 */ @Test public void testDirect(){ //Messge需要自己构造一个,定义消息体内容和消息头 //rabbitTemplate.send(exchange, routingKey, message); // object 默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq //rabbitTemplate.convertAndSend(exchange, routingKey, message); Map<String, Object> map =new HashMap<>(16); map.put("msg","点对点消息测试"); map.put("data", Arrays.asList("test",123,true)); //如果不进行序列化处理,那么对象会被默认的序列化以后发送出去 rabbitTemplate.convertAndSend("exchange.direct", "lin.news", new Bank("2324","招商银行")); }

进行队列监听 测试结果 2、测试fanout模式,这种方式就是不管路由键是什么 消息会全部发送过去。 先将队列中的消息清空 测试代码

@Test public void testFanout(){ rabbitTemplate.convertAndSend("exchange.fanout","liu.news", new Bank("13f3452", "工商银行")); }

可以看到所有队列都有了消息,所以这种方式是不管路由键是什么,它会将消息发送到所有的队列中去 获取消息 3、topic模式 根据匹配规则来发送消息,指定路由键是lin.# 那么 和这个路由键对应的队列匹配。 首先是和lin.* 这个匹配 ,然后在和 *.new 进行匹配不过liu.news和上面的路由键lin.#不匹配所有 消息不会进入该队列中来。

/** * 2、测试Topic模式 */ @Test public void testTopic(){ //Messge需要自己构造一个,定义消息体内容和消息头 //rabbitTemplate.send(exchange, routingKey, message); // object 默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq //rabbitTemplate.convertAndSend(exchange, routingKey, message); Map<String, Object> map =new HashMap<>(16); map.put("msg","topic消息测试"); map.put("data", Arrays.asList("test",55,true)); //如果不进行序列化处理,那么对象会被默认的序列化以后发送出去 rabbitTemplate.convertAndSend("exchange.topic", "lin.#", new Bank("777","中国银行")); }

测试几个如下,可以看到消息已经进入了下面的队列 获取消息 可以知道了消息确实进入了相应的队列。 再来测试就是将消息发送来 liu.news 这个路由键中

@Test public void testTopic02(){ //Messge需要自己构造一个,定义消息体内容和消息头 //rabbitTemplate.send(exchange, routingKey, message); // object 默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq //rabbitTemplate.convertAndSend(exchange, routingKey, message); Map<String, Object> map =new HashMap<>(16); map.put("msg","topic消息测试"); map.put("data", Arrays.asList("test",88,true)); //如果不进行序列化处理,那么对象会被默认的序列化以后发送出去 rabbitTemplate.convertAndSend("exchange.topic", "*liu.news", new Bank("888","建设银行")); }

查看队列消息 获取消息,可以看到消息已经发送到这个队列中了 从上面的测试来看,消息投递需要根据 选择的方式。不同的方式消息将发送到不同的队列中去。 springboot 学习代码:https://github.com/liu92/learn 参考http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

最新回复(0)