上一章内容,介绍了RocketMQ的普通消息发送,本章内容主要介绍发送事务消息。
1、producer启动
TransactionMQProducer继承自DefaultMQProducer,在启动的时候,与DefaultMQProducer启动流程一致,只是增加了对事务消息检查本地事务状态的线程池初始化。
public class TransactionMQProducer extends DefaultMQProducer { private TransactionCheckListener transactionCheckListener; private int checkThreadPoolMinSize = 1; private int checkThreadPoolMaxSize = 1; private int checkRequestHoldMax = 2000; public TransactionMQProducer() {} public TransactionMQProducer(final String producerGroup) { super(producerGroup); } public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { super(producerGroup, rpcHook); } @Override public void start() throws MQClientException { this.defaultMQProducerImpl.initTransactionEnv(); super.start(); } }初始化检查线程池方法:
public void initTransactionEnv() { TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); this.checkExecutor = new ThreadPoolExecutor(// producer.getCheckThreadPoolMinSize(), // producer.getCheckThreadPoolMaxSize(), // 1000 * 60, // TimeUnit.MILLISECONDS, // this.checkRequestQueue); }2、本地事务执行实现LocalTransactionExecuter和本地事务查询TransactionCheckListener
public interface LocalTransactionExecuter { LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); } public interface TransactionCheckListener { /**获取(检查)【本地事务】状态 @param msg 消息 @return 事务状态*/ LocalTransactionState checkLocalTransactionState(final MessageExt msg); } public enum LocalTransactionState { COMMIT_MESSAGE, // 提交事务 ROLLBACK_MESSAGE, // 回滚事务 UNKNOW, // 未知 }3、生产者调用发送方事务消息方法
public class TransactionMQProducer extends DefaultMQProducer { ... @Override public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { // 先校验本地事务检查器是否存在 if (null == this.transactionCheckListener) { throw new MQClientException("localTransactionBranchCheckListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); } ... }委托给DefaultMQProducerImpl.sendMessageInTransaction方法。先发送一条half消息到broker,如果发送成功,执行本地事务,根据本地事务执行的状态,判断再次发送的消息是commit还是rollback。
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { // 事务执行器检查 if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); } // 校验消息 Validators.checkMessage(msg, this.defaultMQProducer); // 发送【Half消息】 SendResult sendResult; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } // 处理发送【Half消息】结果 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 发送【Half消息】成功,执行【本地事务】逻辑 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { // 事务编号。目前开源版本暂时没用到,猜想ONS在使用。 msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } // 发送half消息,执行【本地事务】逻辑 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; // 发送【Half消息】失败,标记【本地事务】状态为回滚 case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 结束事务:提交消息 COMMIT / ROLLBACK try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // 返回【事务发送结果】 TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }注意,第一条消息发送的模式是同步模式,需要及时知晓消息的投递状态,当第一条half消息发送成功后,并且执行了本地事务,开始对本地事务执行的状态做判断,进一步确认第二条half消息的推送状态。方法在endTransaction()中。
public void endTransaction(// final SendResult sendResult, // final LocalTransactionState localTransactionState, // final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { // 解码消息编号 final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } // 创建请求,要获取到事务id String transactionId = sendResult.getTransactionId(); // 一定要保证两个half消息在同一个broker final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 提交消息 COMMIT / ROLLBACK。!!!通信方式为:Oneway!!! this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }如果本地事务执行成功,会发送commit消息,如果本地事务执行失败,会发送rollback消息,如果未知,则发送未知的消息到broker。注意一点是,消息的方式OneWay。
public void endTransactionOneway(// final String addr, // final EndTransactionRequestHeader requestHeader, // final String remark, // final long timeoutMillis// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); request.setRemark(remark); this.remotingClient.invokeOneway(addr, request, timeoutMillis); }1、broker接收消息,通过SendMessageProcessor组件的sendMessage方法处理
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); final TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } final MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null; final Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); //判断是否是事务消息 如果是事务消息则用事务消息的逻辑处理 final String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } // 处理prepare消息 putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }继续查看处理第一条消息的逻辑:TransactionalMessageService的prepareMessage()方法
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) { return transactionalMessageBridge.putHalfMessage(messageInner); }TransactionalMessageBridge的putHalfMessage()方法进行调用保存方法
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) { return store.putMessage(parseHalfMessageInner(messageInner)); } // 这里会把消息真正的topic和真正的queueId保存下来,然后替换成内部topic和queueid private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }2、broker处理prepare消息的第二次确认,如果消息是commit, 使消费者可见消息;如果是hirollback,就删除在broker端存储的事务消息。
在EndTransactionProcessor中,处理消息:
public class EndTransactionProcessor implements NettyRequestProcessor{ private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private final BrokerController brokerController; public EndTransactionProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); LOGGER.info("Transaction request:{}", requestHeader); if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); LOGGER.warn("Message store is slave mode, so end transaction is forbidden. "); return response; } if (requestHeader.getFromTransactionCheck()) { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("Check producer[{}] transaction state, but it's pending status." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { LOGGER.warn("Check producer[{}] transaction state, the producer commit the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } else { switch (requestHeader.getCommitOrRollback()) { case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { break; } case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } OperationResult result = new OperationResult(); // 如果提交消息, if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 根据请求头中信息,从commitlog中恢复出原来是消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { // 校验消息是否和当前消息匹配 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 使用原来的真正的消息topic和queueid,构建一条新的消息,使其可见 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // 存储构建的新的消息 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // 设置prepare消息的标识位是delete,使其不可见 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // rollback消息 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } } response.setCode(result.getResponseCode()); response.setRemark(result.getResponseRemark()); return response; } }继续看commit消息的处理,主要是获取上次提交的prepare消息的offset,
private OperationResult getHalfMessageByOffset(long commitLogOffset) { OperationResult response = new OperationResult(); // 根据上次prepare消息的offset,找到消息 MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset); if (messageExt != null) { response.setPrepareMessage(messageExt); response.setResponseCode(ResponseCode.SUCCESS); } else { response.setResponseCode(ResponseCode.SYSTEM_ERROR); response.setResponseRemark("Find prepared transaction message failed"); } return response; }根据prepare恢复出消息真正的topic和queueId
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); // 在存储消息的时候,替换了消息,并且把真正的topic存储下消息中,现在取回来 msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()); msgInner.setWaitStoreMsgOK(false); // 获取事务id msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); msgInner.setSysFlag(msgExt.getSysFlag()); TopicFilterType topicFilterType = (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG : TopicFilterType.SINGLE_TAG; long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags()); msgInner.setTagsCode(tagsCodeValue); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC); MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID); return msgInner; }更改prepare消息标识位是delete方法实现:
@Override public boolean deletePrepareMessage(MessageExt msgExt) { if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) { log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt); return true; } else { log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId()); return false; } }这里有一个非常巧妙的思路:对于已经提交的prepare消息,首先是外部consumer不可见,所以发送到broker后会替换其topic和queueId;第二部确认消息时,分为提交和回滚两种操作,但是原来的消息已经存储了起来,并且需要保留历史记录,所以如果commit,会根据原来的prepare消息自动产生一条新的消息,并让它作为正常的消息投放;对于prepare消息,不论是rollback还是commit,都会生成一个新的delete消息来标记,所以在事务消息回查的时候,只需要看下Prepared消息有没有对应的delete消息就可以了。
broker在启动时,会同时启动线程回查服务,其实现在TransactionMessageCheckService的run方法中,
public class TransactionalMessageCheckService extends ServiceThread { @Override public void run() { log.info("Start transaction check service thread!"); long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); while (!this.isStopped()) { this.waitForRunning(checkInterval); } log.info("End transaction check service thread!"); } // 这个方法是super父类的, protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } } // protected void onWaitEnd() { // 超时时间 final long timeout = this.brokerController.getBrokerConfig().getTransactionTimeOut(); // 最大检查次数 final int checkMax = this.brokerController.getBrokerConfig().getTransactionCheckMax(); // 当前时间 final long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); // 开始检查 this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); } }