此方法已用到公司商城项目,可用。
业务:订单10分钟未支付,则自动取消订单; 发货后15日,买家未收货,则自动确认收货等;
思路:采用死信队列做延迟(该业务的所有消息的延时时间都是一样的才能用)
死信概念:消费达到设置的时间未消费这变为死消息,进入配置的死信队列进行消费。
原理:将消息投放到某队列中(表面队列),改队列无消费者,消息时间到了无法消费,变为死消息,进入死信队列,进行正真的消费,来做到消息的延时。
注意点:这里死信做延迟,是把消息放到死信队列里做延迟的,也就是说放在队列里做延时的,这样消息是先进先出的,有阻塞。因为订单未支付定时取消,10分钟是死的,所以消息先进先出是没有问题的。
如果你是做商品按指定时间上架/店铺的拼团指定时间上下架活动/预售指定上下架活动/优惠卷指定时间上下架活动的,这里的死信做延迟是不满足业务的。 打个比方,如果今天我先指定商品A明天凌晨12点上架售卖,再指定商品B今晚凌晨12点上架售卖,这里今晚12点到了,B消息也无法消费,因为还没到明天凌晨12点,消息A还没消费,阻塞了消息B消费。要等A消息消费完,B才能消费。这个我们下一篇讲解如何解决,如果我还没更新,着急解决的可以+我qq 1872065148 , 我可以先帮你解决这个问题。
pmx配置
<!-- MQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>boot项目配置mq
spring: rabbitmq: host: *** port: 5672 username: *** password: *** #设置手动ack回执 listener: simple: acknowledge-mode: manual #none 不确认,auto 自动确认 manual 手动确认配置rabbitmq的队列、交换机及其转发路由
@Configuration public class RabbitMqOrderConfig { //订单消息实际消费交换机,队列,路由 private final static String EXCHANGE_NAME = "mall_exchange"; private final static String QUEUE_NAME = "mall_queue"; private final static String ROUTE_KEY = "mall_routekey"; // 订单消息延迟消费队列所绑定的交换机 private final static String DELAY_EXCHANGE_NAME = "mall_delay_exchange"; private final static String DELAY_QUEUE_NAME = "mall_delay_queue"; private final static String DEALY_ROUTE_KEY = "mall_delay_routekey"; /** * 订单消息实际消费队列所绑定的交换机 */ @Bean DirectExchange assembleDirect() { return (DirectExchange) ExchangeBuilder .directExchange(EXCHANGE_NAME) .durable(true) .build(); } /** * 订单实际消费队列 */ @Bean public Queue assembleQueue() { return new Queue(QUEUE_NAME); } /** * 将订单队列绑定到交换机 */ @Bean Binding assembleBinding(DirectExchange assembleDirect, Queue assembleQueue) { return BindingBuilder .bind(assembleQueue) .to(assembleDirect) .with(ROUTE_KEY); } /** * 订单延迟队列队列所绑定的交换机 */ @Bean DirectExchange assembleTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(DELAY_EXCHANGE_NAME) .durable(true) .build(); } /** * 订单延迟队列(死信队列) */ @Bean public Queue assembleTtlQueue() { return QueueBuilder .durable(DELAY_QUEUE_NAME) .withArgument("x-dead-letter-exchange", EXCHANGE_NAME)//到期后转发的交换机 .withArgument("x-dead-letter-routing-key", ROUTE_KEY)//到期后转发的路由键 .build(); } /** * 将订单延迟队列绑定到交换机 */ @Bean Binding assembleTtlBinding(DirectExchange assembleTtlDirect, Queue assembleTtlQueue) { return BindingBuilder .bind(assembleTtlQueue) .to(assembleTtlDirect) .with(DEALY_ROUTE_KEY); } }生产者:
@Component public class RabbitOrderSender { // 订单消息延迟消费队列所绑定的队列,交换机以及路由 private final static String DELAY_EXCHANGE_NAME = "mall_delay_exchange"; private final static String DELAY_QUEUE_NAME = "mall_delay_queue"; private final static String DEALY_ROUTE_KEY = "mall_delay_routekey"; @Autowired private AmqpTemplate amqpTemplate; public void sendDelay(Long orderId, Long expirationTime) { this.amqpTemplate.convertAndSend(DELAY_EXCHANGE_NAME ,DEALY_ROUTE_KEY , orderId, message -> { // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间 //10分钟未支付直接取消订单 message.getMessageProperties().setExpiration(String.valueOf(60*1000*10)); return message; }); } }消费者:
@Component //监听的队列名(实际消费的队列),该队列的消息都走该消费者 @RabbitListener(queues = "mall_queue") public class RabbitOrderReceiver { @Autowired private OrderService orderSerivce; @RabbitHandler public void orderDelayQueue(Long orderId, Message message, Channel channel) { //ack回执 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); orderSerivce.payChecked(orderId); } }取消订单逻辑:
OrderSerivceIml中 //这里写接口payChecked接口自改订单状态为取消的具体逻辑,省略业务投放消息:
orderImpl中注入生产者 @Autowired private RabbitOrderSender rabbitOrderSender; //这里省略相关业务逻辑代码... //投放消息到延迟的交换机中,并设置10分钟后未支付则取消订单 rabbitOrderSender.sendDelay(orderId,60*1000*10L);