RabbitMQ学习(三)——重试机制、解决消息幂等性(防止重复消费)

mac2024-01-31  46

思考:

1. 引入消息队列之后如何保证高可用性2. 如何保证消息不被重复消费呢?3. 如何保证消息的可靠性传输(如何处理消息丢失的问题)?4. 我该怎么保证从消息队列里拿到的数据按顺序执行?5. 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?6. 如果让你来开发一个消息队列中间件,你会怎么设计架构?

一、重试机制(自动补偿机制)

rabbitmq默认情况下,如果消费者程序出现异常情况下,会自动实现补偿机制

重试机制原理:@RabbitListtener 底层实现,使用AOP进行拦截,如果程序没有抛出异常,自动提交事务:如果AOP使用异常通知拦截 获取异常信息的话,自动实现补偿机制,该消息会一直缓存到RabbitMQ服务器端存放,一直重试扫不抛出异常为止

修改重试机制策略:一般默认情况下,间隔5秒重试一次

 

问题1:如果消费端 程序业务出现异常,消息会消费成功吗?

问题2:如何合适选择重试机制?

            情况1:消费者获取到消息之后,调用第三方接口,但接口暂时无法访问,是否需要重试?——需要重试,因为接口可能网络引起的暂时无法访问

            情况2:消费者获取到消息后,抛出数据转换异常,是否需要重试?——不需要,应该采用日志记录+定时任务job健康检查+人工 进行补偿

举个代码例子演示重试机制:

@RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg); JSONObject jsonObject = JSONObject.parseObject(msg); // 获取email参数 String email = jsonObject.getString("email"); // 请求地址 String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; JSONObject result = HttpClientUtils.httpGet(emailUrl);//根据配置文件的重试次数去重试几次该方法连接 if (result == null) { // 因为网络原因,造成无法访问,继续重试。 throw new Exception("调用接口失败!"); } //如上图配置重试间隔是3s,重试次数是5次。也就是15秒后将不再执行重试 System.out.println("执行结束....");

二、解决消息幂等性(防止重复消费)

 MQ 有个基本原则:数据不能多一条,也不能少一条。不能多,就是这一篇-----不能重复消费和保证消息幂等性,不能少,就是下一篇的保证消息可靠性传输。

(1)重复消费产生原因:

网络延迟传输中,消费出现异常或者 消息延迟消费,会造成MQ进行重试补偿,在重试过程中可能会造成重复消费,重复消费问题就是幂等性问题

(2)重复消费消息的几种场景

场景1:消费者干的事是拿一条数据往数据库写一条,如果消息重复两次就写了两条,导致数据出错。

         解决场景1:同一条消息消息到第二次时判断一下是否已消费过,若是则直接扔掉,一条数据出现两次但是数据库只有一条,这就保证了系统的幂等性

(3)何为幂等性?(官方定义):

一次和多次请求某资源对于资源本身应该有相同的结果(网络超时除外)。也就是,任意多次执行对资源本身产生的影响均与一次执行的结果相同

(4)解决幂等性(重复消费)的几种业务场景

业务场景1:从生产者拿到个数据后要写库,先根据主键查一下,如果这个数据有了就别插了直接update

业务场景2:如果是写redis的都没问题,因为每次都是set,redis天然的幂等性

业务场景3:需要让生产者发送每条数据的时候加上一个全局唯一的id,消费的时候先根据id去比如redis查一下判断是否消费过,若没有则处理然后这个id写redis,若消费过就不处理

业务场景4:如果数据库有唯一建约束了,插入只会报错,不会导致数据库出现脏数据,本身幂等了

(5)解决决幂等性(重复消费)的几种方法

方法1:唯一ID + 指纹码机制,利用数据库主键去重

思路:根据消息生成一个全局唯一ID,然后加上一个指纹码。指纹码可以系统生成也可以根据某些规则自定义拼接,目的是确定本次才做唯一,将ID+指纹码作为拼接好的值作为主键就可以去重了,在消费消息前先去数据库查看这条消息指纹码是否存在,没有就插入有就忽视。

高并发写数据库性能瓶颈:可以跟进ID进行分库分表策略,采用一些路由算法进行分流,要保证ID通过这种算法消息即使投递多次都落在同一数据库分片上,这样就由单台数据库幂等变成多库的幂等。

方法1:利用Redis的原子性去实现

redis是单线程的,但是性能好也有很多原子性的命令,比如setnx命令,在接收到消息后将消息ID作为key去执行setnx命令,如果执行成功则表示没有执行过这条消息,可以进行消费(setnx命令特点:当且仅当key不存在,将key值设为value值;若key已存在该命令不做任何操作)

方法3:使用全局ID区分消息,解决幂等性(常用)

对于方法3还是以生产者和消费者代码举例:

生产者:在请求头设置消息id(messageId),可以用随机ID比如UUID.randomUUID(),也可以用业务逻辑唯一ID

@Component public class FanoutProducer { @Autowired private AmqpTemplate amqpTemplate; public void send(String queueName) { String msg = "my_fanout_msg:" + System.currentTimeMillis(); Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); amqpTemplate.convertAndSend(queueName, message); } }

消费者:可以通过判断该messageId是不是消费过,进行决定是否消费

疑问:高并发场景下该方案有没有影响?

           没有。重试机制是有间隔性的,并且有顺序性

@Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message) throws Exception { if(messageId){ .... } System.out.println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // int i = 1 / 0; } }

 

最新回复(0)