一 Rabbitmq是什么
RabbitMq是一个消息队列,是以一种队列的结构来存放message,遵循这FIFO的规则。主要可以用来在不同的进程和线程之间进行通信。
为什么会产生消息队列?
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
RabbitMq官方网站:https://www.rabbitmq.com/#support
二 RabbitMq五种消息队列模式
RabbitMQ有多种消息队列模式,主要使用五种消息队列模式,结构如图所示:
其中有几个概念需要先介绍一下,Mq中相当与一个消息的生产消费过程,所以主要有消息的生产者,消息队列,消息的消费者三种组成。
message消息类型可以是markdown等。
1.简单队列
在简单的队列里,p代表producer生产者,红色代表消息队列,c为consumer消费者。在最简单的队列模式中,生产者发送消息到消息队列中,消费者只要队列中有消息就取出消费。
1.1 maven配置
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
1.2 ConnectionUtil连接
RabbitMq的一些配置连接,封装为一个Util类。
1 public class ConnectionUtil {
2
3 public static Connection getConnection()
throws Exception {
4 //定义连接工厂
5 ConnectionFactory factory =
new ConnectionFactory();
6 //设置服务地址
7 factory.setHost("localhost"
);
8 //端口
9 factory.setPort(5672
);
10 //设置账号信息,用户名、密码、vhost
11 factory.setVirtualHost("testhost"
);
12 factory.setUsername("admin"
);
13 factory.setPassword("admin"
);
14 // 通过工程获取连接
15 Connection connection =
factory.newConnection();
16 return connection;
17 }
18 }
1.3 生产者发送消息
生产者发送消息到指定到队列中,首先获取连接到mq,然后创建通道。生产发送消息发送完之后就需要断开与队列的连接。
其中如果把队列的持久化设置为true,则队列中的消息为持久化消息,当消费者没有消费时会一直堆积在队列中。
1 public class Send {
2
3 //队列名
4 private final static String QUEUE_NAME = "q_test_01"
;
5
6 public static void main(String[] argv)
throws Exception {
7 // 获取到连接以及mq通道
8 Connection connection =
ConnectionUtil.getConnection();
9 // 从连接中创建通道
10 Channel channel =
connection.createChannel();
11
12 // 声明(创建)队列,第二个参数为是否持久化队列
13 channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
14
15 // 消息内容
16 String message = "Hello World!"
;
17 channel.basicPublish("", QUEUE_NAME,
null, message.getBytes());
18 System.out.println(" [x] Sent '" + message + "'"
);
19 //关闭通道和连接
20 channel.close();
21 connection.close();
22 }
23 }
1.4 消费者获取消息
将获取消息实现接口,使其在程序运行时被调起并执行。在接收消息的方法中,同样首先和rabbitMq进行建立连接,创建通道并且声明队列。程序在运行到channel.basicConsume时会被阻塞,只有当有消息时,才会执行上一步中的取消息后的相关操作。
1 public class ReceivingFromRabbitmq
implements ApplicationListener<ApplicationReadyEvent>
{
2 @Override
3 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
4 try {
5 // 获取到连接以及mq通道
6 Connection connection =
connectionUtil.getConnection();
7 // 从连接中创建通道
8 Channel channel =
connection.createChannel();
9 // 声明队列
10 channel.queueDeclare(queueName, queueDurable,
false,
false,
null);
11
12 DeliverCallback deliverCallback = (consumerTag, delivery) ->
{
13 String message =
new String(delivery.getBody(), "UTF-8"
);
14 System.out.println(" [x] Received '" + message + "'"
);
15
16 //执行获取消息后的相关操作
17 };
18
19 channel.basicConsume(queueName,
true, deliverCallback, consumerTag ->
{
20 });
21 }
catch (Exception e) {
22 e.printStackTrace();
23 }
24 }
25 }
2. Work模式
在work模式中包括一个生产者和两个消费者,然而生产者发送的消息只能被一个消费者所获取消费。
2.1 消费者1
1 public class Recv {
2
3 private final static String QUEUE_NAME = "test_queue_work"
;
4
5 public static void main(String[] argv)
throws Exception {
6
7 // 获取到连接以及mq通道
8 Connection connection =
ConnectionUtil.getConnection();
9 Channel channel =
connection.createChannel();
10
11 // 声明队列
12 channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
13
14 // 同一时刻服务器只会发一条消息给消费者
15 //channel.basicQos(1);
16
17 // 定义队列的消费者
18 QueueingConsumer consumer =
new QueueingConsumer(channel);
19 // 监听队列,false表示手动返回完成状态,true表示自动
20 channel.basicConsume(QUEUE_NAME,
true, consumer);
21
22 // 获取消息
23 while (
true) {
24 QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
25 String message =
new String(delivery.getBody());
26 System.out.println(" [y] Received '" + message + "'"
);
27 //休眠
28 Thread.sleep(10
);
29 // 返回确认状态,注释掉表示使用自动确认模式
30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31 }
32 }
2.2 消费者2
1 public class Recv2 {
2
3 private final static String QUEUE_NAME = "test_queue_work"
;
4
5 public static void main(String[] argv)
throws Exception {
6
7 // 获取到连接以及mq通道
8 Connection connection =
ConnectionUtil.getConnection();
9 Channel channel =
connection.createChannel();
10
11 // 声明队列
12 channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
13
14 // 同一时刻服务器只会发一条消息给消费者
15 //channel.basicQos(1);
16
17 // 定义队列的消费者
18 QueueingConsumer consumer =
new QueueingConsumer(channel);
19 // 监听队列,false表示手动返回完成状态,true表示自动
20 channel.basicConsume(QUEUE_NAME,
true, consumer);
21
22 // 获取消息
23 while (
true) {
24 QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
25 String message =
new String(delivery.getBody());
26 System.out.println(" [x] Received '" + message + "'"
);
27 // 休眠1秒
28 Thread.sleep(1000
);
29 //下面这行注释掉表示使用自动确认模式
30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31 }
32 }
33 }
2.3 生产者
1 public class Send {
2
3 private final static String QUEUE_NAME = "test_queue_work"
;
4
5 public static void main(String[] argv)
throws Exception {
6 // 获取到连接以及mq通道
7 Connection connection =
ConnectionUtil.getConnection();
8 Channel channel =
connection.createChannel();
9
10 // 声明队列
11 channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
12
13 for (
int i = 0; i < 100; i++
) {
14 // 消息内容
15 String message = "" +
i;
16 channel.basicPublish("", QUEUE_NAME,
null, message.getBytes());
17 System.out.println(" [x] Sent '" + message + "'"
);
18
19 Thread.sleep(i * 10
);
20 }
21
22 channel.close();
23 connection.close();
24 }
25 }
生产者向队列中发送了一百条消息。
结果:
两个消费者接收不同的消息,但是接收的数量是相同的。
虽然设置了每个接收者的休眠时间不同,但是他们接收的消息的数量却是相同的,因为RabbitMq使用轮询方式进行信息分发,及默认将消息顺序的发给下一个消费者。
同时在这种模式中,消息分发给消费者后,只有当消费者完成消费并向RabbitMq返回确认消息,Mq才会对消息进行删除。如果在执行中消费者死亡,或没有发挥确认消息,则mq会将该消息即使分发给其他消费者。
当Rabbitmq退出或奔溃时,队列和消息会丢失,所以我们需要将队列进行持久化声明:
boolean durable =
true ;
channel.queueDeclare(“hello”,durable,false,
false,
null);
2.4 work模式实现“能者多劳”
打开上述代码的注释:
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1
);
//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
//同时改为手动模式
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME,
false, consumer);
测试结果,消费者1比消费者2获取的消息更多。
3. 订阅模式
一次向多个消费者发送消息。
1、1个生产者,多个消费者2、每一个消费者都有自己的一个队列3、生产者没有将消息直接发送到队列,而是发送到了交换机4、每个队列都要绑定到交换机5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
3.1 生产者发送消息(可看作后台系统)
向交换机X发送消息。但是如果交换机没绑定队列时,消息就会丢失。因为交换机没有存储消息的能力,消息只能存放在队列中。
1 public class Send {
2
3 private final static String EXCHANGE_NAME = "test_exchange_fanout"
;
4
5 public static void main(String[] argv)
throws Exception {
6 // 获取到连接以及mq通道
7 Connection connection =
ConnectionUtil.getConnection();
8 Channel channel =
connection.createChannel();
9
10 // 声明exchange
11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"
);
12
13 // 消息内容
14 String message = "Hello World!"
;
15 channel.basicPublish(EXCHANGE_NAME, "",
null, message.getBytes());
16 System.out.println(" [x] Sent '" + message + "'"
);
17
18 channel.close();
19 connection.close();
20 }
21 }
3.2 消费者1(类似前台系统)
1 public class Recv {
2
3 private final static String QUEUE_NAME = "test_queue_work1"
;
4
5 private final static String EXCHANGE_NAME = "test_exchange_fanout"
;
6
7 public static void main(String[] argv)
throws Exception {
8
9 // 获取到连接以及mq通道
10 Connection connection =
ConnectionUtil.getConnection();
11 Channel channel =
connection.createChannel();
12
13 // 声明队列
14 channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
15
16 // 绑定队列到交换机
17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""
);
18
19 // 同一时刻服务器只会发一条消息给消费者
20 channel.basicQos(1
);
21
22 // 定义队列的消费者
23 QueueingConsumer consumer =
new QueueingConsumer(channel);
24 // 监听队列,手动返回完成
25 channel.basicConsume(QUEUE_NAME,
false, consumer);
26
27 // 获取消息
28 while (
true) {
29 QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
30 String message =
new String(delivery.getBody());
31 System.out.println(" [Recv] Received '" + message + "'"
);
32 Thread.sleep(10
);
33
34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
35 }
36 }
37 }
3.3 消费者2(搜索系统)
1 public class Recv2 {
2
3 private final static String QUEUE_NAME = "test_queue_work2"
;
4
5 private final static String EXCHANGE_NAME = "test_exchange_fanout"
;
6
7 public static void main(String[] argv)
throws Exception {
8
9 // 获取到连接以及mq通道
10 Connection connection =
ConnectionUtil.getConnection();
11 Channel channel =
connection.createChannel();
12
13 // 声明队列
14 channel.queueDeclare(QUEUE_NAME,
false,
false,
false,
null);
15
16 // 绑定队列到交换机
17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""
);
18
19 // 同一时刻服务器只会发一条消息给消费者
20 channel.basicQos(1
);
21
22 // 定义队列的消费者
23 QueueingConsumer consumer =
new QueueingConsumer(channel);
24 // 监听队列,手动返回完成
25 channel.basicConsume(QUEUE_NAME,
false, consumer);
26
27 // 获取消息
28 while (
true) {
29 QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
30 String message =
new String(delivery.getBody());
31 System.out.println(" [Recv2] Received '" + message + "'"
);
32 Thread.sleep(10
);
33
34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
35 }
36 }
37 }
测试结果:同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
4. 路由模式
——持续更新——
学习自大佬:
https://blog.csdn.net/hellozpc/article/details/81436980
转载于:https://www.cnblogs.com/Mask-D/p/11202430.html