RabbitMQ(二)--与SpringBoot的集成

mac2024-04-05  31

1、配置pom文件,添加spring-boot-starter-amqp的支持

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2、配置application.properties文件--地址、用户等信息

spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin # 手动确认消息 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.direct.acknowledge-mode=manual

3、配置交换机和队列

/** * @author user * Description: 该类初始化创建队列、转发器,并把队列绑定到转发器 */ @Configuration public class RabbitMQConfig { public final static String QUEUE_SEND_EMPLOYEE_NOTICE = "queue-employee-notice"; public final static String QUEUE_SEND_MANAGER_NOTICE = "queue-manager-notice"; public final static String EXCHANGE_DIRECT_MICRO_RESUME = "exchange-wechat-resume"; /** * Employee消息队列 * @return */ @Bean public Queue employeeQueue() { return new Queue(QUEUE_SEND_EMPLOYEE_NOTICE); } /** * Manager消息队列 * @return */ @Bean public Queue managerQueue() { return new Queue(QUEUE_SEND_MANAGER_NOTICE); } /** * 交换器 * @return */ @Bean DirectExchange resumeEchange() { return new DirectExchange(EXCHANGE_DIRECT_MICRO_RESUME); } /** * 声明绑定关系 * @param employeeQueue * @param resumeEchange * @return */ @Bean Binding employeeBinding(Queue employeeQueue, Exchange resumeEchange) { return BindingBuilder.bind(employeeQueue()).to(resumeEchange()).with(QUEUE_SEND_EMPLOYEE_NOTICE); } /** * 声明绑定关系 * @param managerQueue * @param resumeEchange * @return */ @Bean Binding managerBinding(Queue managerQueue, Exchange resumeEchange) { return BindingBuilder.bind(managerQueue()).to(resumeEchange()).with(QUEUE_SEND_MANAGER_NOTICE); } }

4、消息发送者

@Component public class MQSender { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private AmqpTemplate rabbitTemplate; /** * 发送到员工消息队列 * @param messageVO 自定义消息 * @throws InternalServiceException */ @Async public void sendEmployeeNotify(Object messageVO) throws InternalServiceException { logger.info("MQ Sender sendEmployeeNotify method enter"); new Thread(() -> { // 发送更新技能通知 rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_DIRECT_MICRO_RESUME, RabbitMQConfig.QUEUE_SEND_EMPLOYEE_NOTICE, messageVO); }).start(); } /** * 发送到经理消息队列 */ @Async public void sendManagerNotify(Object messageVO) throws InternalServiceException { logger.info("MQ Sender sendManagerNotify method enter"); new Thread(() -> { // 发送待评价通知 rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_DIRECT_MICRO_RESUME, RabbitMQConfig.QUEUE_SEND_MANAGER_NOTICE, messageVO); }).start(); } }

5、消息接收者

@Component public class MQReceiver { private Logger log = LoggerFactory.getLogger(this.getClass()); /** * 监听员工消息队列 */ @Async @RabbitHandler @RabbitListener(queues = RabbitMQConfig.QUEUE_SEND_EMPLOYEE_NOTICE) public void sendEmployeeNotify(Object messageVO, Message message, Channel channel) { log.info("enter rabbitmq receiver methods::sendEmployeeNotify"); if (messageVO == null) { return; } try { // 处理消息逻辑 // ... // 消息发送成功,从队列里面移除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { try { if (message.getMessageProperties().getRedelivered()) { // 重试3次 log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); // 最后一个参数 requeue 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } catch (Exception e1) { log.error("rabbitmq sendEmployeeNotify fail {}", e1); } log.error("rabbitmq sendEmployeeNotify fail", e); } } /** * 监听经理消息队列 */ @Async @RabbitHandler @RabbitListener(queues = RabbitMQConfig.QUEUE_SEND_MANAGER_NOTICE) public void sendManagerNotify(Object messageVO, Message message, Channel channel) { log.info("enter rabbitmq receiver methods::sendManagerNotify"); if (messageVO == null) { return; } try { // 处理消息逻辑 // ... // 消息发送成功,从队列里面移除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { try { if (message.getMessageProperties().getRedelivered()) { // 重试3次 log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); // 最后一个参数 requeue 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } catch (Exception e1) { log.error("rabbitmq sendManagerNotify fail {}", e1); } log.error("rabbitmq sendManagerNotify fail", e); } } }

6、手动确认消息

成功确认 void basicAck(long deliveryTag, boolean multiple) throws IOException; deliveryTag:该消息的index multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。 消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。 失败确认 void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; deliveryTag:该消息的index。 multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。 requeue:被拒绝的是否重新入队列。 void basicReject(long deliveryTag, boolean requeue) throws IOException; deliveryTag:该消息的index。 requeue:被拒绝的是否重新入队列。 channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。

7、测试--http://localhost:9997/users/send

@RestController @RequestMapping("/users") public class UserController { @Autowired MQSender mQSender; @GetMapping("send") public String send() { mQSender.sendEmployeeNotify("rabbit test"); return "send"; } }

 

最新回复(0)