Linux搭建RocketMQ集群

mac2025-09-03  10

安装配置jdk 必须要有jdk环境,jdk必须要64位,1.7以上

下载安装包 地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.2/

解压文件到/usr/local

进入conf文件 可以看到有3个文件夹 2m-2s-async:多Master多Slave模式,异步复制 2m-2s-sync:多Master多Slave模式,同步双写 2m-noslave:多Master模式 这里的配置选择2m-noslave模式,我只配置了两台服务器 服务器ip分别为:172.16.120.143,172.16.120.144

创建存储路径 mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store/commitlog mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store/consumequeue mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store/index

修改配置文件 broker-a.properties配置

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #brokerClusterName=DefaultCluster #brokerName=broker-a #brokerId=0 #deleteWhen=04 #fileReservedTime=48 #brokerRole=ASYNC_MASTER #flushDiskType=ASYNC_FLUSH # 配置参考官方链接:http://rocketmq.apache.org/docs/rmq-deployment/ # 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=0 # 删除文件时间点,默认凌晨4点。24小时制,单位小时 deleteWhen=04 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # 文件保留时间,默认 72 小时。根据业务情况调整 fileReservedTime=168 # Broker 对外服务的监听端口 listenPort=10911 # nameServer地址,分号分割,这里写自己需要做集群的两台服务器的地址 namesrvAddr=172.16.120.143:9876;172.16.120.144:9876 # Details:Should be configured if having multiple addresses; Default value:InetAddress for network interface # 本机ip地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况 下可以人工配置。 brokerIP1=172.16.120.144 #存储路径 storePathRootDir==/usr/local/rocketmq-all-4.5.2-bin-release/store # commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq-all-4.5.2-bin-release/store/commitlog # 消费队列存储路径存储路径 storePathConsumerQueue=/usr/local/rocketmq-all-4.5.2-bin-release/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index #消息索引存储路径 storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #限制的消息大小 maxMessageSize=65536 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH

broker-b.properties配置

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #brokerClusterName=DefaultCluster #brokerName=broker-a #brokerId=0 #deleteWhen=04 #fileReservedTime=48 #brokerRole=ASYNC_MASTER #flushDiskType=ASYNC_FLUSH # 配置参考官方链接:http://rocketmq.apache.org/docs/rmq-deployment/ # 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=0 # 删除文件时间点,默认凌晨4点。24小时制,单位小时 deleteWhen=04 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # 文件保留时间,默认 72 小时。根据业务情况调整 fileReservedTime=168 # Broker 对外服务的监听端口 listenPort=10911 # nameServer地址,分号分割 namesrvAddr=172.16.120.143:9876;172.16.120.144:9876 # Details:Should be configured if having multiple addresses; Default value:InetAddress for network interface # 本机ip地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况 下可以人工配置。 brokerIP1=172.16.120.144 #存储路径 storePathRootDir==/usr/local/rocketmq-all-4.5.2-bin-release/store # commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq-all-4.5.2-bin-release/store/commitlog # 消费队列存储路径存储路径 storePathConsumerQueue=/usr/local/rocketmq-all-4.5.2-bin-release/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index #消息索引存储路径 storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #限制的消息大小 maxMessageSize=65536 # Broker 的角色 # - ASYNC_MASTER 异步复制Master # - SYNC_MASTER 同步双写Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH

修改日志配置文件(两台服务器) mkdir -p /usr/local/rocketmq-all-4.5.2-bin-release/logs cd /usr/local/rocketmq-all-4.5.2-bin-release/conf && sed -i ‘s#${user.home}#/usr/local/rocketmq-all-4.5.2-bin-release#g’ *.xml

修改占用内存大小(两台服务器) vim /usr/local/rocketmq-all-4.5.2-bin-release/bin/runbroker.sh

启动NameServer(两台服务器) cd /usr/local/rocketmq-all-4.5.2-bin-release/bin nohup sh mqnamesrv &

启动172.16.120.143的BrokerServer cd /usr/local/rocketmq-all-4.5.2-bin-release/bin nohup sh mqbroker -c /usr/local/rocketmq-all-4.5.2-bin-release/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &

启动172.16.120.144的BrokerServer cd /usr/local/rocketmq-all-4.5.2-bin-release/bin nohup sh mqbroker -c /usr/local/rocketmq-all-4.5.2-bin-release/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &

查看是否启动成功 输入jps命令,能看到namesrv和broker证明启动成功

关闭rocketmq cd /usr/local/rocketmq-all-4.5.2-bin-release/bin/ 关闭namesrv服务:sh mqshutdown namesrv 关闭broker服务:sh mqshutdown broker

简单测试

加入依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> </dependency>

创建生产者

public class Producer { public static void main(String[] args) throws MQClientException { //1、分组 DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); //2、服务器集群地址 producer.setNamesrvAddr("172.16.120.143:9876;172.16.120.144:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i=0;i<10;i++){ Thread.sleep(1000); Message message = new Message("my_topic","tagA",("mytopic"+i).getBytes()); SendResult result = producer.send(message); System.out.println(result.toString()); } }catch (Exception e){ e.printStackTrace(); } producer.shutdown(); } }

创建消费者

public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("172.16.120.143:9876;172.16.120.144:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("my_topic","tagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt:list){ String msgId = messageExt.getMsgId(); System.out.println("msgId:"+msgId+",body:"+new String(messageExt.getBody())); } //消费状态:1、消费成功;2、消费失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }

遇到的问题: 启动测试代码的时候报错:org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, my_topic

无法创建topic,Producer没有正确连接到Name Server我这里是因为没有配置jdk的环境变量,正确配置环境变量且关闭防火墙在bin目录下执行命令sh mqadmin clusterList -n localhost:9876 如果看到 证明证明已经连接到nameserver上,再次启动则没有报错了
最新回复(0)