监控日志展示如下:
[2019-10-30 14:31:23.339 INFO ] [ConsumeMessageThread_7] (com.xxx.service.mq.MQConsumerService:93) - 消费消息:msgId=0A064C3E000179C63692734B339201B0 topic=topic_xxx tag=yyy reconsumeTimes=12reconsumeTimes 代表消费重试次数。 同时日志中频繁显示同一条msgId。
通过debug发现在执行业务代码时,抛出的异常被mq捕获,如下:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // RemotingHelper.exceptionSimpleDesc(e), // ConsumeMessageOrderlyService.this.consumerGroup, // msgs, // messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; }当发生异常时messageListener.consumeMessage方法的返回值为null 当status为null时,status会被赋值为ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT状态(即隔一段时间后重试)。
上文造成消息重试的异常信息如下
java.lang.NoSuchMethodError: com.soaclient.dto.xxx.LogQueryDto.getMqTypeIn()Ljava/util/List;由于该异常为Error级别,而业务代码异常捕获级别为Exception,导致异常没有被捕获。
1、修改业务代码异常 2、修改异常捕获级别为throwable级别,并输出异常日志 3、增加重试次数限制,当大于一定次数时,则不再重试。 如果不加重试,针对顺序消费的情况,可能会出现消费被阻塞的情况。 而无序消费,默认最多重试16次。
当为无序消费时,代码如下:
try { ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), // ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }消息队列 MQ > 高级特性 > 消息重试 https://help.aliyun.com/document_detail/43490.html?spm=a2c4g.11186623.6.555.35b96c99oNQKPt