由于RocketMQ操作CommitLog、ConsumeQueue文件,都是基于内存映射方法并在启动的时候,会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要一种机制来删除已过期的文件。
RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会管这个这个文件上的消息是否被全部消费。默认每个文件的过期时间为72小时。通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。接下来详细分析RocketMQ是如何设计与实现上述机制的。 RocketMQ 会每隔10s调度一次cleanFilesPeriodically,已检测是否需要清除过期文件。执行频率可以通过设置cleanResourceInterval,默认为10s。
整个执行过程分为两个大的步骤,第一个步骤:尝试删除过期文件;第二个步骤:重试删除被hange(由于被其他线程引用在第一阶段未删除的文件),在这里再重试一次。
Step2:RocketMQ在如下三种情况任意满足之一的情况下将继续执行删除文件操作。
到了删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。 判断磁盘空间是否充足,如果不充足,则返回true,表示应该触发过期文件删除操作。 预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。
代码@1:获取maxUsedSpaceRatio,表示commitlog、consumequeue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件。
代码@2:通过File#getTotalSpace()获取commitlog所在磁盘分区总的存储容量,通过File#getFreeSpace()获取commitlog目录所在磁盘文件剩余容量并得出当前该分区的物理磁盘使用率physicRatio 。
代码@3:RocketMQ另外提供了两个与磁盘空间使用率相关的系统级参数:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-Drocketmq.broker.diskSpaceWarningLevelRatio=0.90:如果磁盘分区使用率超过该阔值,将设置磁盘不可写,此时会拒绝新消息的写入。 -Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85:如果磁盘分区使用超过该阔值,建议立即执行过期文件清除,但不会拒绝新消息的写入。 判断磁盘是否可用,用当前已使用物理磁盘率maxUsedSpaceRatio、diskSpaceWarningLevelRatio、diskSpaceCleanForciblyRatio,如果当前磁盘使用率达到上述阔值,将返回true表示磁盘已满,需要进行过期文件删除操作。
Step3:然后根据文件的最后一次更新时间与当前时间做比较,判断是否过期,如果已过期,调用MappedFile的destory。 MappedFile#shutdown 如果available为true,表示第一次执行shutdown方法,首先设置available为false,并记录firstShutdownTimestamp 时间戳,如果当前该文件被其他线程引用,则本次不强制删除,如果没有其他线程在使用该文件,则清除MappedFile相关资源,并最终执行File#delete()方法清除文件。在拒绝被删除保护期内(destroyMapedFileIntervalForcibly)每执行一次清理任务,将引用次数减去1000,引用数小于1后,该文件最终将被删除。 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
高可用HA
slave向master发送拉消息请求;解析请求偏移量,从消息文件中检索,把mq数据传给slave
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
RocketMQ 主从同步读写分离机制
RocketMQ根据MessageQueue查找Broker地址的唯一依据便是brokerName,从RocketMQ的Broker组织实现来看,同一组Broker(M-S)服务器,其brokerName相同,主服务器的brokerId为0,从服务器的brokerId大于0,那RocketMQ根据brokerName如何定位到哪一台Broker上来呢?
首先从pullFromWhichNodeTable缓存表中获取该消息消费队列的brokerId,如果找到,则返回,否则返回brokerName的主节点。由此可以看出pullFromWhichNodeTable中存放的是消息队列建议从从哪个Broker服务器拉取消息的缓存表,其存储结构:MessageQueue:AtomicLong,那该信息从何而来呢?
原来消息消费拉取线程PullMessageService根据PullRequest请求从主服务器拉取消息后会返回下一次建议拉取的brokerId,消息消费者线程在收到消息后,会根据主服务器的建议拉取brokerId来更新pullFromWhichNodeTable,消息消费者线程更新pullFromWhichNodeTable的代码如下:
那服务端是如何计算下一次拉取建议从哪台Broker服务器拉取消息呢?
当 GetResult 的 suggestPullingFromSlave 为真是,将会直接返回消息消费组的配置信息whichBrokerWhenConsumeSlowly,默认为1,可以通过客户端命令updateSubGroup配置当主服务器繁忙时,建议从哪个从服务器读取消息。
注意:RocketMQ 读写分离不按套路出牌,并不是主服务器只负责消息发送,消息从服务器主要负责消息拉取,而是只有当主服务器消息拉取出现堆积时才将拉取任务转向从服务器。