RabbitMq消息队列

mac2022-06-30  115

一 Rabbitmq是什么

  RabbitMq是一个消息队列,是以一种队列的结构来存放message,遵循这FIFO的规则。主要可以用来在不同的进程和线程之间进行通信。

  为什么会产生消息队列?

不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

  RabbitMq官方网站:https://www.rabbitmq.com/#support

二 RabbitMq五种消息队列模式

  RabbitMQ有多种消息队列模式,主要使用五种消息队列模式,结构如图所示:

 

  其中有几个概念需要先介绍一下,Mq中相当与一个消息的生产消费过程,所以主要有消息的生产者,消息队列,消息的消费者三种组成。

  message消息类型可以是markdown等。 

1.简单队列

  在简单的队列里,p代表producer生产者,红色代表消息队列,c为consumer消费者。在最简单的队列模式中,生产者发送消息到消息队列中,消费者只要队列中有消息就取出消费。

1.1 maven配置

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>

1.2 ConnectionUtil连接

  RabbitMq的一些配置连接,封装为一个Util类。

1 public class ConnectionUtil { 2 3 public static Connection getConnection() throws Exception { 4 //定义连接工厂 5 ConnectionFactory factory = new ConnectionFactory(); 6 //设置服务地址 7 factory.setHost("localhost"); 8 //端口 9 factory.setPort(5672); 10 //设置账号信息,用户名、密码、vhost 11 factory.setVirtualHost("testhost"); 12 factory.setUsername("admin"); 13 factory.setPassword("admin"); 14 // 通过工程获取连接 15 Connection connection = factory.newConnection(); 16 return connection; 17 } 18 }

1.3 生产者发送消息

  生产者发送消息到指定到队列中,首先获取连接到mq,然后创建通道。生产发送消息发送完之后就需要断开与队列的连接。

  其中如果把队列的持久化设置为true,则队列中的消息为持久化消息,当消费者没有消费时会一直堆积在队列中。

1 public class Send { 2 3 //队列名 4 private final static String QUEUE_NAME = "q_test_01"; 5 6 public static void main(String[] argv) throws Exception { 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 // 从连接中创建通道 10 Channel channel = connection.createChannel(); 11 12 // 声明(创建)队列,第二个参数为是否持久化队列 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 15 // 消息内容 16 String message = "Hello World!"; 17 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 18 System.out.println(" [x] Sent '" + message + "'"); 19 //关闭通道和连接 20 channel.close(); 21 connection.close(); 22 } 23 }

1.4 消费者获取消息

  将获取消息实现接口,使其在程序运行时被调起并执行。在接收消息的方法中,同样首先和rabbitMq进行建立连接,创建通道并且声明队列。程序在运行到channel.basicConsume时会被阻塞,只有当有消息时,才会执行上一步中的取消息后的相关操作。

1 public class ReceivingFromRabbitmq implements ApplicationListener<ApplicationReadyEvent> { 2 @Override 3 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { 4 try { 5 // 获取到连接以及mq通道 6 Connection connection = connectionUtil.getConnection(); 7 // 从连接中创建通道 8 Channel channel = connection.createChannel(); 9 // 声明队列 10 channel.queueDeclare(queueName, queueDurable, false, false, null); 11 12 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 13 String message = new String(delivery.getBody(), "UTF-8"); 14 System.out.println(" [x] Received '" + message + "'"); 15 16 //执行获取消息后的相关操作 17 }; 18 19 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { 20 }); 21 } catch (Exception e) { 22 e.printStackTrace(); 23 } 24 } 25 }

 

2. Work模式 

  在work模式中包括一个生产者和两个消费者,然而生产者发送的消息只能被一个消费者所获取消费。

2.1 消费者1

1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者 15 //channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,false表示手动返回完成状态,true表示自动 20 channel.basicConsume(QUEUE_NAME, true, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [y] Received '" + message + "'"); 27 //休眠 28 Thread.sleep(10); 29 // 返回确认状态,注释掉表示使用自动确认模式 30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 }

2.2 消费者2

1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 声明队列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一时刻服务器只会发一条消息给消费者 15 //channel.basicQos(1); 16 17 // 定义队列的消费者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 监听队列,false表示手动返回完成状态,true表示自动 20 channel.basicConsume(QUEUE_NAME, true, consumer); 21 22 // 获取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 //下面这行注释掉表示使用自动确认模式 30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }

2.3 生产者

1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明队列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 for (int i = 0; i < 100; i++) { 14 // 消息内容 15 String message = "" + i; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent '" + message + "'"); 18 19 Thread.sleep(i * 10); 20 } 21 22 channel.close(); 23 connection.close(); 24 } 25 }

  生产者向队列中发送了一百条消息。

结果:

  两个消费者接收不同的消息,但是接收的数量是相同的。

  虽然设置了每个接收者的休眠时间不同,但是他们接收的消息的数量却是相同的,因为RabbitMq使用轮询方式进行信息分发,及默认将消息顺序的发给下一个消费者。

  同时在这种模式中,消息分发给消费者后,只有当消费者完成消费并向RabbitMq返回确认消息,Mq才会对消息进行删除。如果在执行中消费者死亡,或没有发挥确认消息,则mq会将该消息即使分发给其他消费者。

  当Rabbitmq退出或奔溃时,队列和消息会丢失,所以我们需要将队列进行持久化声明:

boolean durable = true ; channel.queueDeclare(“hello”,durable,falsefalsenull);

2.4 work模式实现“能者多劳”

  打开上述代码的注释:

// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); //开启这行 表示使用手动确认模式 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //同时改为手动模式 // 监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME, false, consumer);

  测试结果,消费者1比消费者2获取的消息更多。

 

3. 订阅模式

  一次向多个消费者发送消息。

  

1、1个生产者,多个消费者2、每一个消费者都有自己的一个队列3、生产者没有将消息直接发送到队列,而是发送到了交换机4、每个队列都要绑定到交换机5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

3.1 生产者发送消息(可看作后台系统)

向交换机X发送消息。但是如果交换机没绑定队列时,消息就会丢失。因为交换机没有存储消息的能力,消息只能存放在队列中。

1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 12 13 // 消息内容 14 String message = "Hello World!"; 15 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 16 System.out.println(" [x] Sent '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }

3.2 消费者1(类似前台系统)

1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [Recv] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }

3.3 消费者2(搜索系统)

1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 获取到连接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 声明队列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 绑定队列到交换机 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一时刻服务器只会发一条消息给消费者 20 channel.basicQos(1); 21 22 // 定义队列的消费者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 监听队列,手动返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 获取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [Recv2] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }

测试结果:同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。

4. 路由模式

 

——持续更新——

 

学习自大佬:

https://blog.csdn.net/hellozpc/article/details/81436980

转载于:https://www.cnblogs.com/Mask-D/p/11202430.html

最新回复(0)