rocketMQ学习记录

mac2024-11-22  30

rocketMQ学习记录

一、apache/rocketMQ-spring版本1.1、pom.xml最小依赖1.2、API操作1.3、1.4、1.5、 二、apache/rocketMQ官网版本2.1、pom.xml最小依赖2.2、API

一、apache/rocketMQ-spring版本

这个版本,是基于官网,包装,集成Springboot等特点,可能存在着依赖复杂,维护周期的问题(无意冒犯该作者,自己使用的时候,遇到不确定的问题,依赖版本问题,遇到一些棘手的问题)

1.1、pom.xml最小依赖

<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring-boot-starter-version}</version> </dependency> </dependencies>

不过要注意,rocketmq-spring-boot-starter,依赖着

<parent> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-parent</artifactId> <version>2.0.3</version> <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath> </parent> <artifactId>rocketmq-spring-boot-starter</artifactId> <packaging>jar</packaging> <name>RocketMQ Spring Boot Starter</name> <description>SRocketMQ Spring Boot Starter</description> <url>https://github.com/apache/rocketmq-spring</url> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> </dependencies>

还有,rocketmq-spring-boot,依赖着

<dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies>

1.2、API操作

生产者基于RocketMQTemplate 消费者

@Service @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer") public class StringConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.printf("------- StringConsumer received: %s \n", message); } }

1.3、

1.4、

1.5、

二、apache/rocketMQ官网版本

2.1、pom.xml最小依赖

<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.3.0</version> </dependency> </dependencies>

2.2、API

生产者

public class SyncProducer { public static void main(String[] args) throws Exception { // 定义 生产者的组,一般一个broker对应一个 group,也可以多个broker指定同一个Group // DefaultMQProducer producer = new DefaultMQProducer("group-1"); DefaultMQProducer producer = new DefaultMQProducer("SyncGroup"); // NameServer 地址 producer.setNamesrvAddr("192.168.1.79:9876"); // 加载生产者实例 producer.start(); // 创建一个消息(Topic,Tag,Body[]),消息内容是bytes数组形式传输 // Message msg = new Message("BenchmarkTest","TagA",("消息 body").getBytes(RemotingHelper.DEFAULT_CHARSET)); // 这里测试到,如果这里指定了服务端不存在的 Topic,会有如下异常 // No route info of this topic, TL // producer.createTopic("accessKey","TL",2); // 可创建Topic,指定分片 queue num Message msg = new Message("BenchmarkTest","TagA",("消息 body").getBytes(RemotingHelper.DEFAULT_CHARSET)); //调用broker,生产消息 SendResult sendResult = producer.send(msg); System.out.println("生产消息反馈: "+ sendResult.getSendStatus()); // 生产完毕,shutdown producer.shutdown(); } }

消费者

public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 实例消费者 Group // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-1"); // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncGroup"); // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OnewayProducer_group"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("AsyncProducerGroup"); // 指定NameServer地址 consumer.setNamesrvAddr("192.168.1.79:9876"); // 订阅Topic下,指定Tag,或者 所有的 // consumer.subscribe("BenchmarkTest", "*"); // consumer.subscribe("AsyncTopic", "*"); consumer.subscribe("AsyncTopic", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // String s = Arrays.toString(msgs); System.out.printf("%s 接收的消息: %n", msgs.get(0)); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started."); } }
最新回复(0)