centos7上使用消息中间键RabbitMQ

mac2024-10-15  50

1.安装和配置

安装RabbitMQ之前需要erlang运行环境,需要注意erlang和RabbitMQ的版本对应关系,具体可以参考RabbitMQ官网的版本对照说明:https://www.rabbitmq.com/which-erlang.html,本文安装的erlang版本是19.3.x, RabbitMQ版本是3.6.8 安装步骤:

使用yum安装erlang

yum install erlang

安装完成后可以检测是否安装成功,使用如下命令:

erl

安装成功会输出版本信息

Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false] Eshell V5.10.4 (abort with ^G)

然后就可以安装RabbitMQ了 首先下载一个RabbitMQ,可以在本地下载上传到服务器,也可以直接下载,命令如下:

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.8/rabbitmq-server-3.6.8-1.el7.noarch.rpm

然后使用如下命令:

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum install rabbitmq-server-3.6.8-1.el7.noarch.rpm rpm -i --nodeps rabbitmq-server-3.6.8-1.el7.noarch.rpm

启动RabbitMQ

/sbin/service rabbitmq-server start

创建用户admin

rabbitmqctl add_user admin admin

给用户admin授权

rabbitmqctl set_permissions -p "/" admin '.*' '.*' '.*'

给用户admin赋予administrator角色

rabbitmqctl set_user_tags admin administrator

开启RabbitMQ控制台

rabbitmq-plugins enable rabbitmq_management

然后可以远程访问http://服务器外网ip:15672方法RabbitMQ控制台

2.用最原生的方式连接RabbitMQ测试简单的生产者和消费者

pom 依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>

连接测试代码

package cn.ckh2019.test; import cn.ckh2019.pawnshop.service.cms.PawnshopServiceCmsApplication; import com.rabbitmq.client.*; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author Chen Kaihong * 2019-10-31 15:02 */ public class RabbitMQTest { static final String QUEUE = "queue-01"; /** * Producer */ @Test public void test1() { ConnectionFactory factory = new ConnectionFactory(); //设置连接信息 factory.setHost("129.211.17.8"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { System.out.println(123); connection = factory.newConnection(); System.out.println(456); channel = connection.createChannel(); /** * queue: 队列名称, 没有则创建 * durable: 是否持久化 * exclusive: 是否都占连接 * autoDelete: 自动删除 * arguments: 参数 */ channel.queueDeclare(QUEUE, true, false, false, null); /** * exchange: 交换机, * routingKey: 路由key,交换机根据路由key将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称 * props: 消息的属性 * body: 消息内容 */ channel.basicPublish("",QUEUE,null,"ckh".getBytes()); System.out.println("发送完成"); } catch (Exception e){ e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * Consumer */ @Test public void test2() { ConnectionFactory factory = new ConnectionFactory(); //设置连接信息 factory.setHost("129.211.17.8"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, true, false, false, null); /** * 监听队列 * queue: 队列名称 * autoAsk: 自动回复 * callback: 消费方法 */ channel.basicConsume(QUEUE,true, new DefaultConsumer(channel){ /** * * @param consumerTag 消费者标签 * @param envelope 信封 * @param properties 消息的属性 * @param body 消息内容 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String exchange = envelope.getExchange(); //消息id, mq在channel 中用来标识消息的id long deliveryTag = envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("msg =======> " + msg); } }); } catch (Exception e){ } finally { } } }
最新回复(0)