RabbitMQ与SpringBoot整合实战

mac2025-07-07  8

SpringBoot整合RabbitMQ SpringBoot与RabbitMQ集成非常筒単,不需要做任何的额外设置只需要两步即可: step1:引入相关依赖:spring-boot-starter-amqp step2:対application.properties迸行配置

生产端核心配置

消费端核心配置

SpringBoot整合RabbitMQ实战 1.首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New --> Spring Starter Project

2.添加依赖

    <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-amqp</artifactId>     </dependency> 然后复制一份工程,重命名为rabbitmq-springboot-consumer,修改pom相关artifactId,name及description

生产端工程 添加生产端配置

# rabbitmq连接基本配置 spring.rabbitmq.addresses=192.168.0.113:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000

# 开启confirm机制 spring.rabbitmq.publisher-confirms=true # 开启return模式 spring.rabbitmq.publisher-returns=true # 配合return机制使用,表示接收路由不可达的消息 spring.rabbitmq.template.mandatory=true 创建配置类

@Configuration @ComponentScan({"com.rxy.springboot.*"}) public class MainConfig {

} 消息的confirm和return机制 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效 生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等 创建生产端处理类

import java.util.Map; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;

@Component public class RabbitSender {

    //自动注入RabbitTemplate模板类     @Autowired     private RabbitTemplate rabbitTemplate;     //确认机制     final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {         /**          * correlationData: 回调的相关数据,包含了消息ID          * ack: ack结果,true代表ack,false代表nack          * cause: 如果为nack,返回原因,否则为null          */         @Override         public void confirm(CorrelationData correlationData, boolean ack, String cause) {             System.err.println("correlationData: " + correlationData);             System.err.println("ack: " + ack);             if(!ack){                 //做一些补偿机制等                 System.err.println("异常处理....");             }         }     };     //返回机制     final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {         @Override         public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,                                      String exchange, String routingKey) {             System.err.println("return exchange: " + exchange + ", routingKey: "                      + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);         }     };          //发送消息方法调用: 构建Message消息     public void send(Object message, Map<String, Object> properties) throws Exception {         MessageHeaders messageHeaders = new MessageHeaders(properties);         //注意导包         Message msg = MessageBuilder.createMessage(message, messageHeaders);         rabbitTemplate.setConfirmCallback(confirmCallback);         rabbitTemplate.setReturnCallback(returnCallback);         //id + 时间戳 ,保证全局唯一 ,这个是实际消息的ID         //在做补偿性机制的时候通过ID来获取到这条消息进行重发         String id = "1234567890";         CorrelationData correlationData = new CorrelationData(id);         //exchange, routingKey, object, correlationData         rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);     } } 在管控台创建topic交换机exchange-1和队列queue-1,并建立绑定关系为springboot.# 测试方法

@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqSpringbootProducerApplicationTests {

    @Test     public void contextLoads() {     }

    @Autowired     private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");          @Test     public void testSender1() throws Exception {          Map<String, Object> properties = new HashMap<>();          properties.put("number", "12345");          properties.put("send_time", simpleDateFormat.format(new Date()));          rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);     } } 运行测试方法,打印如下内容

correlationData: CorrelationData [id=1234567890] ack: true 将发送消息convertAndSend的routingKey修改为spring.hello,再次运行测试方法,打印如下内容

return exchange: exchange-1, routingKey: spring.abc, replyCode: 312, replyText: NO_ROUTE correlationData: CorrelationData [id=1234567890] ack: true 消费端工程 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列(不推荐)、根据业务记录日志等处理 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况 @RabbitMQListener注解 消费端监听@RabbitMQListener注解,这个在实际工作中非常的好用。 @RabbitListener是一个组合注解,里面可以注解配置: @QueueBinding、@Queue、 @Exchange直接通过这个组合注解一次性搞定消费端交换机、 队列、绑定、路由、并且配置监听功能等 由于类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置

消费端配置

spring.rabbitmq.addresses=192.168.0.113:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000

# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作) spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL # 设置当前消费者数量(线程数) spring.rabbitmq.listener.simple.concurrency=5 # 设置消费者最大并发数量 spring.rabbitmq.listener.simple.max-concurrency=10 消费端消息处理类

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel; import com.rxy.springboot.entity.Order;

@Component public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(             value = @Queue(value = "queue-1",              durable="true"),             exchange = @Exchange(value = "exchange-1",              durable="true",              type= "topic",              ignoreDeclarationExceptions = "true"),             key = "springboot.*"             )     )     @RabbitHandler     public void onMessage(Message message, Channel channel) throws Exception {         System.err.println("--------------------------------------");         System.err.println("消费端Payload: " + message.getPayload());         Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);         //手工ACK         channel.basicAck(deliveryTag, false);     } } 前面生产端已经发送了一条消息到queue-1队列,可以再运行发送一条,然后运行消费端工程启动类Application,打印如下

-------------------------------------- 消费端Payload: Hello RabbitMQ For Spring Boot! -------------------------------------- 消费端Payload: Hello RabbitMQ For Spring Boot! 其实@RabbitListener会自动声明队列、交换机及绑定关系,可以在管控台删除对应的队列和交换机,然后重新运行进行测试

使用配置方式 将队列、交换机、绑定关系使用配置方式,并且消息体内容使用java对象

首先增加相关的配置 spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-2 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* 另外两个工程都增加实体类Order,注意要发送java对象,必须实现序列化接口

public class Order implements Serializable {     private String id;     private String name; } 消费类增加Order对象消息的处理方法 @Payload: 接收消息的消息体对象 @Headers: 接收消息的属性 AmqpHeaders: 抽象类,里面包含了消息的常用属性key     @RabbitListener(bindings = @QueueBinding(             value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",              durable="${spring.rabbitmq.listener.order.queue.durable}"),             exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",              durable="${spring.rabbitmq.listener.order.exchange.durable}",              type= "${spring.rabbitmq.listener.order.exchange.type}",              ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),             key = "${spring.rabbitmq.listener.order.key}"             )     )     @RabbitHandler     public void onOrderMessage(@Payload Order order,              Channel channel,              @Headers Map<String, Object> headers) throws Exception {         System.err.println("--------------------------------------");         System.err.println("消费端order: " + order.getId());         Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);         //手工ACK         channel.basicAck(deliveryTag, false);     } 生产端发送方法,直接发送java对象

    //发送消息方法调用: 构建自定义对象消息     public void sendOrder(Order order) throws Exception {         rabbitTemplate.setConfirmCallback(confirmCallback);         rabbitTemplate.setReturnCallback(returnCallback);         //id + 时间戳 全局唯一          CorrelationData correlationData = new CorrelationData("0987654321");         rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);     } 生产端测试方法

    @Test     public void testSender2() throws Exception {          Order order = new Order("001", "第一个订单");          rabbitSender.sendOrder(order);     } 运行说明 先启动消费端,然后启动生产端

# 生产端打印 correlationData: CorrelationData [id=0987654321] ack: true # 消费端打印 -------------------------------------- 消费端order: 001  

最新回复(0)