也就是只有一个master节点,称不上是集群,一旦这个master节点宕机,那么整个服务就不可用。
多个master节点组成集群,单个master节点宕机或者重启对应用没有影响。
优点:所有模式中性能最高;
缺点:单个master节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
注意:使用同步刷盘可以保证消息不丢失,同时Topic相对应的queue应该分布在集群中各个节点,而不是只在某各节点上,否则该节点宕机会对订阅该topic的应用造成影响。
在多master模式的基础上,每个master节点都有至少一个对应的slave;master节点可读可写,但是slave只能读不能写,类似于mysql的主备模式。
优点:一般情况下都是master消费,在master宕机或超过负载时,消费者可以从slave读取消息,消息的实时性不会受影响,性能几乎和多master一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
同多master多slave异步复制模式类似,区别在于master和slave之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息响应时间会略长,性能相比异步复制低10%左右。
同步方式:同步双写和异步复制(指的一组master和slave之间数据的同步)。
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储进入磁盘)。
注意:对数据要求较高的场景,建议的持久化策略是主从broker采用同步复制方式和异步刷盘方式。通过同步复制方式,保存数据热备份,通过异步刷盘方式,保证RocketMQ高吞吐量。
RocketMQ提供了初始的集群部署模式下的配置文件,如下图:
服务器相关配置信息:
注意,默认RocketMQ会吃8G,所以需要修改默认加载内存设置。
修改broker启动脚本runbroker.sh里面的jvm参数
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"改为
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m"
NameServer集群:
192.168.56.102
192.168.56.103
Broker服务器:
192.168.56.102(主A)
192.168.56.103(主B)
注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-noslave/)需要修改(同时修改nameserver地址为集群地址):
192.168.56.102(主A)
broker-a.properties 增加:
brokerIP1=192.168.56.102
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.103 (主B)
broker-b.properties 增加:
brokerIP1=192.168.56.103
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
启动步骤:(记得关闭防火墙或者要开通9876端口)
第一步:启动NameServer集群,这里使用102和103两台作为集群即可。
1)在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下:然后执行:
nohup sh mqnamesrv &
2)在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下:然后执行:
nohup sh mqnamesrv &
第二步:启动双主集群,顺序是先启动主,然后启动从。
3)启动主A:102服务器进入至MQ文件夹/bin下,执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):
nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log
4)启动主B:103服务器进入至MQ文件夹\bin下,执行以下命令:
nohup sh mqbroker -c ../conf/2m-noslave/broker-b.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log
5)每台服务器查看日志:
tail -f ~/logs/rocketmqlogs/broker.log
6)如果是要启动控制台,则需要重新打包:
进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。
rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成。在把编译后的jar包丢上服务器:
启动命令:
nohup java -jar rocketmq-console-ng-1.0.1.jar &
服务器相关配置信息:
NameServer集群:
192.168.56.102
192.168.56.103
Broker服务器:
192.168.56.102(主A)
192.168.56.103(主B)
192.168.56.104(从A)
192.168.56.105(从B)
注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-sync/)需要修改(同时修改nameserver地址为集群地址):
192.168.56.102(主A)的broker-a.properties增加:
brokerIP1=192.168.56.102
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.103(主B)的broker-b.properties增加:
brokerIP1=192.168.56.103
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.104从A)的broker-a-s.properties增加:
brokerIP1=192.168.56.104
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.105(从B)broker-b-s.properties增加:
brokerIP1=192.168.56.105
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
启动步骤:(记得关闭防火墙或者要开通9876端口)
第一步:启动NameServer集群,这里使用102和103两台作为集群即可。
1)在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下,然后执行:
nohup sh mqnamesrv &
2)在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下,然后执行:
nohup sh mqnamesrv &
第二步:启动双主双从同步集群,顺序是先启动主,然后启动从。
3)启动主A:,102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true &
4)启动主B,103服务器进入至MQ文件夹\bin下,执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true &
5)启动从A,104服务器进入至MQ文件夹\bin下,执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true &
6)启动从B,105服务器进入至MQ文件夹\bin下,执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true &
每台服务器查看日志:
tail -f ~/logs/rocketmqlogs/broker.log
如果是要启动控制台,则需要重新打包:
进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。
例如:rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true编译生成jar包,在把编译后的jar包丢上服务器:
启动命令:nohup java -jar rocketmq-console-ng-1.0.1.jar &
服务器相关配置信息:
NameServer集群:
192.168.56.102
192.168.56.103
Broker服务器:
192.168.56.102(主A)
192.168.56.103(主B)
192.168.56.104(从A)
192.168.56.105(从B)
注意:因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-async/)需要修改(同时修改nameserver地址为集群地址):
192.168.56.102(主A)的broker-a.properties增加:
brokerIP1=192.168.56.102
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.103(主B)的broker-b.properties增加:
brokerIP1=192.168.56.103
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.104(从A)的broker-a-s.properties增加:
brokerIP1=192.168.56.104
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
192.168.56.105(从B)的broker-b-s.properties增加:
brokerIP1=192.168.56.105
namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
启动步骤:(记得关闭防火墙或者要开通9876端口)
第一步:启动NameServer集群,这里使用102和103两台作为集群即可。
1)在机器A,启动第1台NameServer: 102服务器进入至MQ文件夹/bin下,然后执行:
nohup sh mqnamesrv &
2)在机器B,启动第2台NameServer: 103服务器进入至MQ文件夹/bin下,然后执行:
nohup sh mqnamesrv &
第二步:启动双主双从同步集群,顺序是先启动主,然后启动从。
3)启动主A: 102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):
nohup sh mqbroker -c ../conf/2m-2s-async/broker-a.properties autoCreateTopicEnable=true &
4)启动主B:103服务器进入至MQ文件夹\bin下:执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties autoCreateTopicEnable=true &
5)启动从A: 104服务器进入至MQ文件夹\bin下:执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties autoCreateTopicEnable=true &
6)启动从B:105服务器进入至MQ文件夹\bin下:执行以下命令:
nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties autoCreateTopicEnable=true &
每台服务器查看日志:
tail -f ~/logs/rocketmqlogs/broker.log
如果是要启动控制台,则需要重新打包:
进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔):
示例:rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成,在把编译后的jar包丢上服务器:
执行命令:nohup java -jar rocketmq-console-ng-1.0.1.jar &
RocketMQ主从同步(HA)实现过程如下:
1)主服务器启动,并在特定端口上监听从服务器的连接。
2)从服务器主动连接主服务器,主服务器接受客户端的连接,并建立相关TCP连接。
3)从服务器主动向服务器发送待拉取消息偏移,主服务器解析请求并返回消息给从服务器。
4)从服务器保存消息并继续发送新的消息同步请求。
核心实现:
从服务器在启动的时候主动向主服务器建立TCP长连接,然后获取服务器的commitlog最大偏移,以此偏移向主服务器主动拉取消息,主服务器根据偏移量,与自身commitlog文件的最大偏移进行比较,如果大于从服务器commitlog偏移,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。
RocketMQ读写分离与他中间件的实现方式完全不同,RocketMQ是消费者首先服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
那消息服务端是根据何种规则来建议哪个消息消费队列该从哪台Broker服务器上拉取消息呢?
一般都是从主服务器拉取,如果主阶段拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时从从服务器拉取。
如果主服务器繁忙则建议下 次从从服务器拉取消息,设置suggestWhichBrokerld配置文件中whichBrokerWhenConsumeSlowly属性,默认为 1。如果一个Master拥有多台Slave服务器,参与消息拉取负载的从服务器只会是其中一个。
具体代码实现参见rocket-with-spring,Git地址:https://gitee.com/hankin_chj/rocketmq-platform.git
<!--RocketMQ--><dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version></dependency>
<!-- 生产者配置 --><bean id="rocketMQProducer" class="com.chj.producer.RocketMQProducer" init-method="init" destroy-method="destroy"> <property name="producerGroup" value="ProducerGroup" /> <property name="namesrvAddr" value="192.168.0.128:9876" /></bean>
发送入口代码:
@Controller @RequestMapping("/rocket") public class RocketController { @Autowired @Qualifier("rocketMQProducer") private RocketMQProducer producer; /** * 消息发送 */ @ResponseBody @RequestMapping("spring") public String queueSender(@RequestParam("message")String message){ String opt=""; try { Message msg = new Message("rocket-spring-topic", "TAG1", message.getBytes()); SendResult result = producer.getDefaultMQProducer().send(msg); if(result.getSendStatus() !=null && result.getSendStatus().equals("SEND_OK")){ opt = "suc"; }else{ opt = "err"; } } catch (Exception e) { opt = e.getCause().toString(); } return opt; } @ResponseBody @RequestMapping("springb") public String topicSender(@RequestParam("message")String message){ String opt = ""; try { Message msg = new Message("rocket-spring-topic-b", "TAG1", message.getBytes()); SendResult result = producer.getDefaultMQProducer().send(msg); System.out.println("SendStatus:"+result.getSendStatus()); if(result.getSendStatus() !=null && result.getSendStatus().equals("SEND_OK")){ opt = "suc"; }else{ opt = "err"; } } catch (Exception e) { opt = e.getCause().toString(); } return opt; } }生产者代码封装:
public class RocketMQProducer { private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class); private DefaultMQProducer defaultMQProducer; private String producerGroup; private String namesrvAddr; public void init() throws MQClientException { this.defaultMQProducer = new DefaultMQProducer(this.producerGroup); defaultMQProducer.setNamesrvAddr(this.namesrvAddr); defaultMQProducer.start(); logger.info("rocketMQ初始化生产者完成[producerGroup:" + producerGroup + "]"); } public void destroy() { defaultMQProducer.shutdown(); logger.info("rocketMQ生产者[producerGroup: " + producerGroup + "]已停止"); } public DefaultMQProducer getDefaultMQProducer() { return defaultMQProducer; } public void setProducerGroup(String producerGroup) { this.producerGroup = producerGroup; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } }<!-- 消费者监听1 --><bean id="messageListeners" class="com.chj.listener.MessageListenerImpl"></bean><!-- 消费者监听2 --><bean id="bmessageListeners" class="com.chj.listener.BMessageListenerImpl"></bean><!-- 消费者配置1 --><bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown"> <property name="consumerGroup" value="ConsumerGroup" /> <property name="namesrvAddr" value="192.168.0.128:9876" /> <property name="messageListener" ref="messageListeners" /> <property name="subscription"> <map> <entry key="rocket-spring-topic" value="TAG1" /> </map> </property></bean><!-- 消费者配置2 --><bean id="rocketmqConsumer2" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown"> <property name="consumerGroup" value="ConsumerGroup2" /> <property name="namesrvAddr" value="192.168.0.128:9876" /> <property name="messageListener" ref="bmessageListeners" /> <property name="subscription"> <map> <entry key="rocket-spring-topic-b" value="TAG1" /> </map> </property></bean>
监听消息代码:
public class MessageListenerImpl implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(">>>>" + new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 如果没有异常会认为都成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }具体代码实现参见rocket-with-springboot,Git地址:https://gitee.com/hankin_chj/rocketmq-platform.git
跟Spring和原生非常类似,maven配置如下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version></dependency>
application.properties配置文件:
#============== rocket ===================rocketmq.namesrvaddr = 127.0.0.1:9876
生产者代码:
@Component public class MQProducer { private static final Logger LOGGER = LoggerFactory.getLogger(MQProducer.class); @Value("${rocketmq.namesrvaddr}") private String nameservAddr; private final DefaultMQProducer producer = new DefaultMQProducer("TestProducer"); /* * 初始化 */ @PostConstruct public void start(){ try { LOGGER.info("MQ:启动生产者"); producer.setNamesrvAddr(nameservAddr); producer.start(); }catch (MQClientException e){ LOGGER.error("MQ:启动生产者失败:{}-{}",e.getResponseCode(),e.getErrorMessage()); throw new RuntimeException(e.getErrorMessage(),e); } } /* *发送消息 */ public void sendMessage(String data,String topic,String tags,String keys){ try { byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET); Message message = new Message(topic,tags,keys,messageBody); producer.send(message, new SendCallback() { public void onSuccess(SendResult sendResult) { LOGGER.info("MQ: 生产者发送消息{}",sendResult); } public void onException(Throwable e) { LOGGER.error(e.getMessage(),e); } }); }catch (Exception e){ LOGGER.error(e.getMessage(),e); } } @PreDestroy public void stop(){ if(producer !=null){ producer.shutdown(); LOGGER.info("MQ:关闭生产者"); } } }消费者代码:
@Component public class MQPushConsumer implements MessageListenerConcurrently { private static final Logger LOGGER = LoggerFactory.getLogger(MQPushConsumer.class); @Value("${rocketmq.namesrvaddr}") private String nameservAddr; private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer"); /* * 初始化 */ @PostConstruct public void start(){ try { LOGGER.info("MQ:启动消费者"); consumer.setNamesrvAddr(nameservAddr); //消息队列从头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //集群消费模式 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest","*"); //注册消息监听器 consumer.registerMessageListener(this); consumer.start(); }catch (MQClientException e){ LOGGER.error("MQ:启动消费者失败:{}-{}",e.getResponseCode(),e.getErrorMessage()); throw new RuntimeException(e.getErrorMessage(),e); } } public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { int index =0; try { for(;index<msgs.size();index++){ MessageExt msg = msgs.get(index); String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.printf("消费者监听: queueID:%d:Messages:%s %n", msgs.get(index).getQueueId(),messageBody); } }catch (Exception e){ LOGGER.error(e.getMessage(),e); }finally { if(index <msgs.size()){ context.setAckIndex(index+1); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } @PreDestroy public void stop(){ if(consumer !=null){ consumer.shutdown(); LOGGER.error("MQ:关闭消费者"); } } }
