RocketMQ的消费者,在订阅topic的时候需要遵循“订阅关系一致性”原则,即:一个消费者分组(group)下的所有消费者实例的处理逻辑必须一致,一旦订阅关系不一致就会导致消费混乱,甚至消息丢失。对大多数分布式应用来说,一个group下通常会挂有多个consumer实例。由于RocketMq的消费者订阅关系由Topic+Tag组成,因此保持订阅一致就意味着,所有consumer实例需要保证:
订阅的topic必须一致订阅topic中的tag必须一致通俗的讲就是一个消费者组GroupA,有consumerA和consumerB,消费者A订阅了topicA、tagA,消费者B订阅了topicB、tagB就会导致订阅不一致问题。
消费者的信息在broker里面是通过一个map存储的,key是groupName,value是组的信息
//org.apache.rocketmq.broker.client.ConsumerManager private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);ConsumerGroupInfo里面存储了该group里面订阅的topic信息,同样使用map存储,key是topic,所以当相同group下面的消费者,订阅的topic如果不一致,就会覆盖map里面的值。
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();下面我们来看看具体的源码实现:当我们启动消费者的时候(调用DefaultMQPushConsumer.start()方法),会启动MQ客户端。
//DefaultMQPushConsumerImpl.start() /// mQClientFactory.start();mq客户端会启动心跳发送线程,定时向broker发送心跳信息。
//MQClientInstance.startScheduledTask() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //清理离线broker MQClientInstance.this.cleanOfflineBroker(); //发送心跳信息到broker MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);重点看这个sendHeartbeatToAllBrokerWithLock方法,点进去之后继续看sendHeartbeatToAllBroker()方法
private void sendHeartbeatToAllBroker() { final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); //如果消费者和生产者信息为空,直接返回 if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer"); return; } if (!this.brokerAddrTable.isEmpty()) { //记录了发送心跳的次数 long times = this.sendHeartbeatTimesTotal.getAndIncrement(); Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<Long, String>> entry = it.next(); String brokerName = entry.getKey(); HashMap<Long, String> oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr != null) { if (consumerEmpty) { if (id != MixAll.MASTER_ID) continue; } try { //向broker心跳。 int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr); } } } } } } } }发送心跳代码this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);一个broker地址的参数。一个心跳信息,一个超时时间。 看看heartbeatData传了什么数据
public class HeartbeatData extends RemotingSerializable { // 客户端id private String clientID; //生产者信息 里面只存了一个groupName private Set<ProducerData> producerDataSet = new HashSet<ProducerData>(); //消费者信息,存储了groupName,消费类型(pull,push),消息模式(集群、广播),以及订阅的topic、tag、version的信息 private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>(); //get set省略 }心跳信息发送给broker,我们再来看看broker对心跳信息的处理逻辑 在rocketMQ的broker模块下ClientManageProcessor.heartBeat方法
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); //心跳信息解码 HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() ); //循环消费者集合 for (ConsumerData data : heartbeatData.getConsumerDataSet()) { //通过groupName获取订阅组配置信息 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } //注册消费者信息 boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); if (changed) { log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } } for (ProducerData data : heartbeatData.getProducerDataSet()) { this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }继续看registerConsumer这个方法,通过groupName获取到消费者组信息后,去更新消费这订阅信息updateSubscription,如果订阅信息变更了,broker通知consumer,Id列表发生了变化
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { //获取消费者组信息。 ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); //更新消费者订阅信息 boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { //通知Consumer,Id列表发生变化 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; }更新订阅者信息就在consumerGroupInfo.updateSubscription这个方法里面了,再点进去看看
public boolean updateSubscription(final Set<SubscriptionData> subList) { boolean updated = false; //步骤一 for (SubscriptionData sub : subList) { //根据topic在订阅表中查找 SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); if (old == null) { //订阅表找不到,就将新的订阅信息放入订阅表 SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); if (null == prev) { updated = true; log.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString()); } } else if (sub.getSubVersion() > old.getSubVersion()) { //如果新的版本比老的版本大,更新订阅信息 || 版本号是一个时间戳,也就是后来的总比以前的大 if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString() ); } this.subscriptionTable.put(sub.getTopic(), sub); } } //步骤二 Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, SubscriptionData> next = it.next(); String oldTopic = next.getKey(); boolean exist = false; //对比当前的订阅信息表和心跳包传过来的最新订阅信息,找到老的中存在,心跳包中不存在的订阅信息,把它删掉 for (SubscriptionData sub : subList) { if (sub.getTopic().equals(oldTopic)) { exist = true; break; } } if (!exist) { log.warn("subscription changed, group: {} remove topic {} {}", this.groupName, oldTopic, next.getValue().toString() ); it.remove(); updated = true; } } this.lastUpdateTimestamp = System.currentTimeMillis(); return updated; }更新方法循环心跳包发送过来的订阅信息,先在本地的(也就是老的)订阅表中根据topic查找,找不到就说明是新增的,将新的订阅信息加入订阅表。如果能找到,接着在比较两者的version,version是个时间戳,新来的肯定比老的version大,也就是新来的都会把老的给覆盖掉。接着步骤二中,对比现在更新后的订阅表,和心跳包发送的心跳信息,将订阅表中存在但心跳包不存在的订阅信息删除,也就会保证更新后的订阅表信息和心跳包的信息一致。接着说一下Tag,虽说我们的的订阅表是以topic为key存储的,但是value每次也会根据version去更新,tag不一样同样每次也会被覆盖。
同样的在启动消费者客户端的时候也会启动负载均衡线程,一层层跟进:
//MQClientInstance.start() this.rebalanceService.start(); //RebalanceService继承了ServiceThread,继续找到RebalanceService.run()方法找到 this.mqClientFactory.doRebalance(); //继续跟进找到RebalanceImpl.doRebalance public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { //关键方法 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }doRebalance首先找到消费者客户端的订阅消息表,然后遍历订阅消息表,以topic为单位进行负载均衡,再看this.rebalanceByTopic(topic, isOrder);我们以集群消费模式为例来看
case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //根据策略进行分配 allocateResult = strategy.allocate(// this.consumerGroup, // this.mQClientFactory.getClientId(), // mqAll, // cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; }首先获取到topic对应的队列信息。然后找到所有消费者id集合,看方法this.mQClientFactory.findConsumerIdList(topic, consumerGroup);点进去看,它是通过topic找到找到所有broker集群信息,然后随机取了一个broker地址,再通过这个broker地址和groupName获取到所有消费者Id的集合。我们前面说过,客户端开启时会向所有broker发送心跳包,所以随便获取一个broker就可以了。接着看strategy.allocate消息队列分配算法,RocketMq实现了五种负载均衡算法,默认使用AllocateMessageQueueAveragely平均分配算法。平均分配算法也很好理解,获取到了所有的该topic下的队列和所有订阅该topic的消费者个数,取模算出每个消费者分配多少个队列,例如:topicA下面有8个messageQueue,有两个消费者consumerA和consumerB订阅该topic,负载均衡后,每个consumer分配四个messageQueue。这时候就会有个问题,在groupA下面有两个消费者客户端A和B,A订阅了topicA,B订阅了topicB,如果topicA下有8个消息队列,topicA经过负载均衡后,会分配给groupA下面的两个消费者客户端A和B各四个消息队列,然后B并没有订阅topicA,却被分配了topicA的消息队列,这样topicA就有一半的消息消费不到,可能需要延迟一段时间重新分配后才能消费。 继续往下,通过负载均衡之后得到队列集合allocateResult,根据allocateResult更新processQueueTable处理队列表。
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);生成PullRequest之后放入pullRequestQueue消息拉取请求队列中。 另外在客户端启动的时候也会启动消息拉取线程
//MQclientInstance.start() this.pullMessageService.start(); //它的run方法 @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //没有停止 try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); //一直循环发拉取消息的请求,过程中被pullRequestQueue阻塞队列阻塞。 } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }消息拉取线程,不断的在pullRequestQueue获取PullRequest,然后向broker发送拉取消息的请求。现假设有GroupA,下面有两个consumer,A和B。两个consumer分别订阅了TopicA和topicB,A先向broker发送心跳包,注册了topicA的订阅,并且A的pullRequest已经添加到pullRequestQueue中,而此时B也向broker发送心跳包,将GroupA下面的订阅信息更新为topicB。这是A继续通过A的pullRequestQueue中的pullRequest去向broker发送拉取消息的请求。这时broker已经找不到topicA的订阅信息,被B覆盖了。就会出现The consumer’s subscription not exist 的错误
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; }