RabbitMQ(七)——spring boot集成延时队列

mac2025-12-03  9

前言

上一篇博客已经总结了消费延时队列的两种方式,同时也根据实例说明了spring boot 消费消息的两种配置方式。这一篇博客在上一篇博客的基础上总结延时队列(其实就是常说的死信队列,但是不太喜欢这个名字,所以我这里称为延时队列)

什么是延时队列

简单点理解,就是在一定时长之后,消息不能在该队列中存在了。这个队列就称为延时队列,在实际中常见的订单在指定时间之后失效,背后就有延时队列的影子。如下图所示,生产者发送的消息,通过第一个交换机发送到延时队列,然后在延时队列中消息存放达到一定时长之后,消息被重新投递到第二个交换机中,然后第二个交换机将消息投递到消费者真正监听的消费队列中,其中延时队列的左右其实很简单,只是在消息达到一定时长之后重新投递消息。这里需要说明一点的是,延时队列与二个交换机的绑定是根据队列创建时指定的参数绑定的。这一点在后面的实例中会有所体现。

springboot中集成延时队列

这里直接进入实例部分,第一个交换机与延时队列的绑定,这里为了简单,就不用@RabbitListener注解实现了,毕竟这里是没有消费者的,我们在@Configuration容器中直接进行绑定。

1、引入各种消息队列已经routingKey的配置

#延时队列的生产端配置 simple.produce.exchange.name=simple.produce.exchange simple.produce.routing.key.name=simple.produce.routing.key ##真正的延时队列 simple.delay.queue.name=simple.delay.queue ##延时队列再次投放消息的队列和交换机 simple.deal.routing.key.name=simple.deal.routing.key simple.deal.exchange.name=simple.deal.exchange.name simple.deal.queue.name=simple.deal.queue.name

2、将生产者的exchange与延时队列绑定

在创建延时队列的时候,通过指定x-dead-letter-exchange、x-dead-letter-routing-key和x-message-ttl三个参数,完成延时队列与二次发送消息的交换机的绑定,各个参数看名字就知道是什么意思,这里不再赘述。

/** * 创建延时队列,并将其与二次消息投放的交换机进行绑定 * @return */ @Bean public Queue delayQueue(){ Map<String, Object> args=new HashMap(); args.put("x-dead-letter-exchange", env.getProperty("simple.deal.exchange.name")); args.put("x-dead-letter-routing-key", env.getProperty("simple.deal.routing.key.name")); args.put("x-message-ttl", 10000); return new Queue(env.getProperty("simple.delay.queue.name"),true,false,false,args); } @Bean public TopicExchange produceDelayExchange(){ return new TopicExchange(env.getProperty("simple.produce.exchange.name")); } /** * 将生产端与延时队列进行绑定 * @return */ @Bean public Binding bindProduceAndDelayQueue(){ return BindingBuilder.bind(delayQueue()).to(produceDelayExchange()) .with(env.getProperty("simple.produce.routing.key.name")); }

 3、真正的消费端,通过@RabbitListener注解绑定

/** * autor:liman * createtime:2019/10/31 * comment: 延时队列最终处理消息的队列和交换机 */ @Component public class DelayRealDealQueueListener { private static final Logger log = LoggerFactory.getLogger(DelayRealDealQueueListener.class); @Autowired private ObjectMapper objectMapper; @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${simple.deal.queue.name}", durable = "true") , exchange = @Exchange(value = "${simple.deal.exchange.name}", type = ExchangeTypes.TOPIC) , key = "${simple.deal.routing.key.name}") ,containerFactory = "singleListenerContainer" ) public void dealDelayMessage(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long delivertTag, Channel channel){ try{ String result = new String(message); log.info("开始处理真正的延时处理消息:{}",result); channel.basicAck(delivertTag,true); }catch (Exception e){ log.error("延时消息异常:{}",e.fillInStackTrace()); } } }

3、编写一个简单的生产者,用于测试

/** * autor:liman * createtime:2019/10/31 * comment: * 延迟队列的controller */ @RestController public class DelayQueueController { private static final Logger log= LoggerFactory.getLogger(DelayQueueController.class); private static final String Prefix="delay/queue"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; @Autowired private Environment env; @RequestMapping(value = Prefix+"/send",method = RequestMethod.GET) public BaseResponse sendMail(@RequestParam String message){ BaseResponse response=new BaseResponse(StatusCode.Success); try { rabbitTemplate.setExchange(env.getProperty("simple.produce.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("simple.produce.routing.key.name")); // String str="延迟队列的消息"; Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build(); rabbitTemplate.convertAndSend(msg); }catch (Exception e){ e.printStackTrace(); } log.info("发送消息完毕----"); return response; } }

 测试结果:

消息在延时队列中的时候

消息进入真正的处理队里中的时候(关闭了消息确认)

 

总结

至此,关于RabbitMQ的简单集成,几乎完成,后面会继续结合实际场景总结几篇博客

最新回复(0)