第三章 主从同步(HA)机制学习笔记

mac2025-10-11  13

一、RocketMQ集群部署模式

1、集群部署模式

1.1、单master模式

也就是只有一个master节点,称不上是集群,一旦这个master节点宕机,那么整个服务就不可用。

1.2、多master模式

多个master节点组成集群,单个master节点宕机或者重启对应用没有影响。

优点:所有模式中性能最高;

缺点:单个master节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。

注意:使用同步刷盘可以保证消息不丢失,同时Topic相对应的queue应该分布在集群中各个节点,而不是只在某各节点上,否则该节点宕机会对订阅该topic的应用造成影响。

1.3、多master多slave异步复制模式

在多master模式的基础上,每个master节点都有至少一个对应的slave;master节点可读可写,但是slave只能读不能写,类似于mysql的主备模式。

优点:一般情况下都是master消费,在master宕机或超过负载时,消费者可以从slave读取消息,消息的实时性不会受影响,性能几乎和多master一样。

缺点:使用异步复制的同步方式有可能会有消息丢失的问题。

1.4、多master多slave同步双写模式

同多master多slave异步复制模式类似,区别在于master和slave之间的数据同步方式。

优点:同步双写的同步模式能保证数据不丢失。

缺点:发送单个消息响应时间会略长,性能相比异步复制低10%左右。

同步方式:同步双写和异步复制(指的一组master和slave之间数据的同步)。

刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储进入磁盘)。

注意:对数据要求较高的场景,建议的持久化策略是主从broker采用同步复制方式和异步刷盘方式。通过同步复制方式,保存数据热备份,通过异步刷盘方式,保证RocketMQ高吞吐量。

2、安装部署过程

RocketMQ提供了初始的集群部署模式下的配置文件,如下图:

 

2.1、双主集群安装

服务器相关配置信息:

注意,默认RocketMQ会吃8G,所以需要修改默认加载内存设置。

修改broker启动脚本runbroker.sh里面的jvm参数

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"改为

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m"

NameServer集群:

192.168.56.102        

192.168.56.103

Broker服务器:

192.168.56.102(主A)

192.168.56.103(主B)

注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-noslave/)需要修改(同时修改nameserver地址为集群地址):

192.168.56.102(主A)

broker-a.properties 增加:

brokerIP1=192.168.56.102

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.103 (主B)

broker-b.properties 增加:

brokerIP1=192.168.56.103

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

启动步骤:(记得关闭防火墙或者要开通9876端口)

第一步:启动NameServer集群,这里使用102和103两台作为集群即可。

1)在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下:然后执行:

nohup sh mqnamesrv &

2)在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下:然后执行:

nohup sh mqnamesrv &

第二步:启动双主集群,顺序是先启动主,然后启动从。

3)启动主A:102服务器进入至MQ文件夹/bin下,执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):

nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log

4)启动主B:103服务器进入至MQ文件夹\bin下,执行以下命令:

nohup sh mqbroker -c ../conf/2m-noslave/broker-b.properties  autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log

5)每台服务器查看日志:

tail -f ~/logs/rocketmqlogs/broker.log

6)如果是要启动控制台,则需要重新打包:

进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。

rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成。在把编译后的jar包丢上服务器:

启动命令:

nohup java -jar rocketmq-console-ng-1.0.1.jar &

2.2、双主双从同步集群安装

服务器相关配置信息:

NameServer集群:

192.168.56.102

192.168.56.103

Broker服务器:

192.168.56.102(主A)

192.168.56.103(主B)

192.168.56.104(从A)

192.168.56.105(从B)

注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-sync/)需要修改(同时修改nameserver地址为集群地址):

192.168.56.102(主A)的broker-a.properties增加:

brokerIP1=192.168.56.102

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.103(主B)的broker-b.properties增加:

brokerIP1=192.168.56.103

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.104从A)的broker-a-s.properties增加:

brokerIP1=192.168.56.104

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.105(从B)broker-b-s.properties增加:

brokerIP1=192.168.56.105

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

启动步骤:(记得关闭防火墙或者要开通9876端口)

第一步:启动NameServer集群,这里使用102和103两台作为集群即可。

1)在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下,然后执行:

nohup sh mqnamesrv &

2)在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下,然后执行:

nohup sh mqnamesrv &

第二步:启动双主双从同步集群,顺序是先启动主,然后启动从。

3)启动主A:,102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true &

4)启动主B,103服务器进入至MQ文件夹\bin下,执行以下命令:

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true &

5)启动从A,104服务器进入至MQ文件夹\bin下,执行以下命令:

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true &

6)启动从B,105服务器进入至MQ文件夹\bin下,执行以下命令:

nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true &

每台服务器查看日志:

tail -f ~/logs/rocketmqlogs/broker.log

如果是要启动控制台,则需要重新打包:

进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。

例如:rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true编译生成jar包,在把编译后的jar包丢上服务器:

启动命令:nohup java -jar rocketmq-console-ng-1.0.1.jar &

2.3、双主双从异步集群安装

服务器相关配置信息:

NameServer集群:

192.168.56.102        

192.168.56.103

Broker服务器:

192.168.56.102(主A)

192.168.56.103(主B)

192.168.56.104(从A)

192.168.56.105(从B)

注意因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-async/)需要修改(同时修改nameserver地址为集群地址):

192.168.56.102(主A)的broker-a.properties增加:

brokerIP1=192.168.56.102

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.103(主B)的broker-b.properties增加:

brokerIP1=192.168.56.103

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.104(从A)的broker-a-s.properties增加:

brokerIP1=192.168.56.104

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

192.168.56.105(从B)的broker-b-s.properties增加:

brokerIP1=192.168.56.105

namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

启动步骤:(记得关闭防火墙或者要开通9876端口)

第一步:启动NameServer集群,这里使用102和103两台作为集群即可。

1)在机器A,启动第1台NameServer: 102服务器进入至MQ文件夹/bin下,然后执行:

nohup sh mqnamesrv &

2)在机器B,启动第2台NameServer: 103服务器进入至MQ文件夹/bin下,然后执行:

nohup sh mqnamesrv &

第二步:启动双主双从同步集群,顺序是先启动主,然后启动从。

3)启动主A: 102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):

nohup sh mqbroker -c ../conf/2m-2s-async/broker-a.properties autoCreateTopicEnable=true &

4)启动主B:103服务器进入至MQ文件夹\bin下:执行以下命令:

nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties autoCreateTopicEnable=true &

5)启动从A: 104服务器进入至MQ文件夹\bin下:执行以下命令:

nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties autoCreateTopicEnable=true &

6)启动从B:105服务器进入至MQ文件夹\bin下:执行以下命令:

nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties autoCreateTopicEnable=true &

每台服务器查看日志:

tail -f ~/logs/rocketmqlogs/broker.log

如果是要启动控制台,则需要重新打包:

进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔):

示例:rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876

进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成,在把编译后的jar包丢上服务器:

执行命令:nohup java -jar rocketmq-console-ng-1.0.1.jar &

3、主从复制原理

RocketMQ主从同步(HA)实现过程如下: 

1)主服务器启动,并在特定端口上监听从服务器的连接。

2)从服务器主动连接主服务器,主服务器接受客户端的连接,并建立相关TCP连接。

3)从服务器主动向服务器发送待拉取消息偏移,主服务器解析请求并返回消息给从服务器。

4)从服务器保存消息并继续发送新的消息同步请求。

核心实现:

从服务器在启动的时候主动向主服务器建立TCP长连接,然后获取服务器的commitlog最大偏移,以此偏移向主服务器主动拉取消息,主服务器根据偏移量,与自身commitlog文件的最大偏移进行比较,如果大于从服务器commitlog偏移,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。

4、读写分离机制

RocketMQ读写分离与他中间件的实现方式完全不同,RocketMQ是消费者首先服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。

那消息服务端是根据何种规则来建议哪个消息消费队列该从哪台Broker服务器上拉取消息呢?

一般都是从主服务器拉取,如果主阶段拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时从从服务器拉取。

如果主服务器繁忙则建议下 次从从服务器拉取消息,设置suggestWhichBrokerld配置文件中whichBrokerWhenConsumeSlowly属性,默认为 1。如果一个Master拥有多台Slave服务器,参与消息拉取负载的从服务器只会是其中一个。

二、与Spring集成

具体代码实现参见rocket-with-spring,Git地址:https://gitee.com/hankin_chj/rocketmq-platform.git

1、pom文件

<!--RocketMQ--><dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-client</artifactId>     <version>4.4.0</version></dependency>

2、生产者

2.1、生产者配置信息:applicationContext.xml

<!-- 生产者配置 --><bean id="rocketMQProducer" class="com.chj.producer.RocketMQProducer" init-method="init" destroy-method="destroy">     <property name="producerGroup" value="ProducerGroup" />     <property name="namesrvAddr" value="192.168.0.128:9876" /></bean>

2.2、生产者代码实现

发送入口代码:

@Controller @RequestMapping("/rocket") public class RocketController {    @Autowired    @Qualifier("rocketMQProducer")    private RocketMQProducer producer;    /**     * 消息发送     */    @ResponseBody    @RequestMapping("spring")    public String queueSender(@RequestParam("message")String message){       String opt="";       try {          Message msg = new Message("rocket-spring-topic", "TAG1", message.getBytes());          SendResult result = producer.getDefaultMQProducer().send(msg);          if(result.getSendStatus() !=null && result.getSendStatus().equals("SEND_OK")){             opt = "suc";          }else{             opt = "err";          }       } catch (Exception e) {          opt = e.getCause().toString();       }       return opt;    }    @ResponseBody    @RequestMapping("springb")    public String topicSender(@RequestParam("message")String message){       String opt = "";       try {          Message msg = new Message("rocket-spring-topic-b", "TAG1", message.getBytes());          SendResult result = producer.getDefaultMQProducer().send(msg);          System.out.println("SendStatus:"+result.getSendStatus());          if(result.getSendStatus() !=null && result.getSendStatus().equals("SEND_OK")){             opt = "suc";          }else{             opt = "err";          }       } catch (Exception e) {          opt = e.getCause().toString();       }       return opt;    } }

生产者代码封装:

public class RocketMQProducer {     private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);     private DefaultMQProducer defaultMQProducer;     private String producerGroup;     private String namesrvAddr;     public void init() throws MQClientException {         this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);         defaultMQProducer.setNamesrvAddr(this.namesrvAddr);         defaultMQProducer.start();         logger.info("rocketMQ初始化生产者完成[producerGroup:" + producerGroup + "]");     }     public void destroy() {         defaultMQProducer.shutdown();         logger.info("rocketMQ生产者[producerGroup: " + producerGroup + "]已停止");     }     public DefaultMQProducer getDefaultMQProducer() {         return defaultMQProducer;     }     public void setProducerGroup(String producerGroup) {         this.producerGroup = producerGroup;     }     public void setNamesrvAddr(String namesrvAddr) {         this.namesrvAddr = namesrvAddr;     } }

3、消费者

3.1、applicationContext.xml中使用监听器的方式

<!-- 消费者监听1 --><bean id="messageListeners" class="com.chj.listener.MessageListenerImpl"></bean><!-- 消费者监听2 --><bean id="bmessageListeners" class="com.chj.listener.BMessageListenerImpl"></bean><!-- 消费者配置1 --><bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"      init-method="start" destroy-method="shutdown">     <property name="consumerGroup" value="ConsumerGroup" />     <property name="namesrvAddr" value="192.168.0.128:9876" />     <property name="messageListener" ref="messageListeners" />     <property name="subscription">         <map>             <entry key="rocket-spring-topic" value="TAG1" />         </map>     </property></bean><!-- 消费者配置2 --><bean id="rocketmqConsumer2" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"      init-method="start" destroy-method="shutdown">     <property name="consumerGroup" value="ConsumerGroup2" />     <property name="namesrvAddr" value="192.168.0.128:9876" />     <property name="messageListener" ref="bmessageListeners" />     <property name="subscription">         <map>             <entry key="rocket-spring-topic-b" value="TAG1" />         </map>     </property></bean>

3.2、消费者代码实现

监听消息代码:

public class MessageListenerImpl implements MessageListenerConcurrently {     @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {         for (MessageExt msg : msgs) {             try {                 System.out.println(">>>>" + new String(msg.getBody(), "UTF-8"));             } catch (UnsupportedEncodingException e) {                 e.printStackTrace();                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;             }         }         // 如果没有异常会认为都成功消费         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } }

三、与SpringBoot集成

具体代码实现参见rocket-with-springboot,Git地址:https://gitee.com/hankin_chj/rocketmq-platform.git

1、配置信息

跟Spring和原生非常类似,maven配置如下:

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-client</artifactId>     <version>4.4.0</version></dependency>

application.properties配置文件:

#============== rocket ===================rocketmq.namesrvaddr = 127.0.0.1:9876

2、代码示例

@RestController @RequestMapping("/rocket") public class RocketController {     protected final Logger logger = LoggerFactory.getLogger(this.getClass());     @Autowired     private MQProducer mqProducer;     @RequestMapping(value = "/send")     public String sendrocket(@RequestParam(required = false) String data,@RequestParam(required = false) String tag) {         try {             logger.info("rocket的消息={}", data);             mqProducer.sendMessage(data,"TopicTest", tag, null);             return "发送rocket成功";         } catch (Exception e) {             logger.error("发送rocket异常:", e);             return "发送rocket失败";         }     } }

生产者代码:

@Component public class MQProducer {     private  static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class);     @Value("${rocketmq.namesrvaddr}")     private String nameservAddr;     private final DefaultMQProducer producer = new DefaultMQProducer("TestProducer");     /*     * 初始化      */     @PostConstruct     public void  start(){         try {             LOGGER.info("MQ:启动生产者");             producer.setNamesrvAddr(nameservAddr);             producer.start();         }catch (MQClientException e){             LOGGER.error("MQ:启动生产者失败:{}-{}",e.getResponseCode(),e.getErrorMessage());             throw  new RuntimeException(e.getErrorMessage(),e);         }     }     /*     *发送消息      */     public void sendMessage(String data,String topic,String tags,String keys){         try {             byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);             Message message = new Message(topic,tags,keys,messageBody);             producer.send(message, new SendCallback() {                 public void onSuccess(SendResult sendResult) {                     LOGGER.info("MQ: 生产者发送消息{}",sendResult);                 }                 public void onException(Throwable e) {                     LOGGER.error(e.getMessage(),e);                 }             });         }catch (Exception e){             LOGGER.error(e.getMessage(),e);         }     }     @PreDestroy     public void stop(){         if(producer !=null){             producer.shutdown();             LOGGER.info("MQ:关闭生产者");         }     } }

消费者代码:

@Component public class MQPushConsumer implements MessageListenerConcurrently {     private  static final Logger LOGGER = LoggerFactory.getLogger(MQPushConsumer.class);     @Value("${rocketmq.namesrvaddr}")     private String nameservAddr;     private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");     /*      * 初始化      */     @PostConstruct     public void  start(){         try {             LOGGER.info("MQ:启动消费者");             consumer.setNamesrvAddr(nameservAddr);             //消息队列从头开始消费             consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);             //集群消费模式             consumer.setMessageModel(MessageModel.CLUSTERING);             consumer.subscribe("TopicTest","*");             //注册消息监听器             consumer.registerMessageListener(this);             consumer.start();         }catch (MQClientException e){             LOGGER.error("MQ:启动消费者失败:{}-{}",e.getResponseCode(),e.getErrorMessage());             throw  new RuntimeException(e.getErrorMessage(),e);         }     }     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {         int index =0;         try {             for(;index<msgs.size();index++){                 MessageExt msg = msgs.get(index);                 String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);                 System.out.printf("消费者监听: queueID:%d:Messages:%s %n",  msgs.get(index).getQueueId(),messageBody);             }         }catch (Exception e){             LOGGER.error(e.getMessage(),e);         }finally {             if(index <msgs.size()){                 context.setAckIndex(index+1);             }         }         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     }     @PreDestroy     public void stop(){         if(consumer !=null){             consumer.shutdown();             LOGGER.error("MQ:关闭消费者");         }     } }

 

最新回复(0)