消息队列介绍

mac2022-06-30  42

一、前言

 

  RabbitMQ,它是干嘛用的呐?它是用来发消息的,消息队列,那它跟我们之前的学习的python的线程queue和进程的queue有什么区别呢?其实他们干的事情都是一样的。先来说说我们之前学习的python的queue吧。

线程queue:只是用于多个线程之间,进行数据同步交互的。进程queue:只是用户父进程与子进程进行交互,或者属于同一父进程下的多个子进程进行交互。

  如果是两个独立的程序,即便是python 程序,两个完全独立的python程序也依然是不用这个python的这个线程或者进程queue来通信的。

  那么问题来了,我现在两个独立的python程序,或者python跟Java程序,或者跟PHP程序,或者两台独立机器之间的也涉及到生产者消费者模型,这个时候用python的线程queue和进程queue就通信不了了。那怎么办呢?这个时候我们只能搞一个中间代理,这个中间代理就是RabbitMQ。

二、消息发送方式如图

 

 

当然类似于RabbitMQ还有很多,比如:ZeroMQ、ActiveMQ...只不过现在RabbitMQ用的最多,也是最火的一个。

三、安装说明

 

  RabbitMQ依赖的语言 erlang 下载地址:猛击这里

 

  RabbitMQ软件下载:猛击这里

 

  RabbitMQ的安装步骤:猛击这里

 

  安装python rabbitMQ modul:pip install pika 或者 easy_install pika ,源码:猛击这里

 

  RabbitMQ的使用文档:猛击这里

 Centos7安装脚本:

#!/bin/bash #2017-11-20 by junesu yum -y install glibc.i686 socat  #安装erlang wget http://www.rabbitmq.com/releases/erlang/erlang-17.4-1.el6.x86_64.rpm rpm -ivh erlang-19.04-1.e17.centos.x86_64.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-3.6.9-1.el6.noarch.rpm rpm -ivh rabbitmq-server-3.6.9-1.el6.noarch.rpm service rabbitmq-server restart #启动rabbitmq内置web插件 rabbitmq-plugins enable rabbitmq_management #停止防火墙 service firewalld stop echo "安装完成" echo "http://你的地址:15672/#/users" View Code 四、基本示例

 

4.1、send端(producer)

说明:建立socket->声明管道->声明queue->通过一个exchange发送内容至queue->关闭连接

 

import pika #通过这个实例先建立一个socket connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #声明一个管道 channel = connection.channel() #声明queue channel.queue_declare(queue="xiaosusu") #这边给queue起名字叫"xiaosusu" #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange="", routing_key="xiaosusu", #queue的名字 body="hello world") #body是你发送的内容 print("[x] Sent 'hello world'") #直接关闭连接 connection.close()

4.2、receive端(consumers)

说明:创建socket连接->创建管道->声明queue->创建回调函数callback->消费的消息->开启消费

 

## 消费者有可能在其他的机器上 import pika #建立一个socket连接 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #创建一个管道 channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue="xiaosusu") def callback(ch,method,properites,body): print("--->",ch,method,properites) print(" [x] Received %r" % body) channel.basic_consume(#消费的消息 callback, #如果收到消息,就调用callback函数来处理消息 queue="shuaigaogao",#queue的名字 no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') #这个start只要一启动,就一直运行,它不止收一条,而是永远收下去,没有消息就在这边卡住 channel.start_consuming()

  输出:

[*] Waiting for messages. To exit press CTRL+C ---> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket= ('::1', 54136, 0, 0)->('::1', 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.d71ee3fbc0ee4c5e8d0b28a42fdc6411', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=xiaoxiaosu'])> <BasicProperties> [x] Received b'hello world'

 从上面的输出可以看的出callback中的ch,method,properites分别是:

ch:是send端管道的内存对象的地址method:指的send端的是发给谁,发给哪个Q的一些信息,一般不怎么用properites:send端的属性,这边至的send端发过来给recive端的属性body:是send端发过来的消息 五、远程配置RabbitMQ

 

 刚刚我们配置的RabbitMQ是放在Windows的,而且是在本地的。那如果远程连接RabbitMQ Server的话,需要配置权限

5.1、创建自己的账号

首先在RabbitMQ server上创建一个账号 sudo rabbitmqctl add_user 用户名  密码

rabbitmqctl add_user junesu 123456

5.2、配置权限

需要配置权限,允许从外面访问

rabbitmqctl set_permissions -p / junesu ".*" ".*" ".*"

参数说明:

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost :The name of the virtual host to which to grant the user access, defaulting to /. user:The name of the user to grant access to the specified virtual host. conf:A regular expression matching resource names for which the user is granted configure permissions. write:A regular expression matching resource names for which the user is granted write permissions. read:A regular expression matching resource names for which the user is granted read permissions.

5.3、客户端连接的时候需要配置认证参数

 

#认证信息 credentials = pika.PlainCredentials('alex', 'alex3714') #连接信息 connection = pika.BlockingConnection(pika.ConnectionParameters( '10.211.55.5',5672,'/',credentials)) channel = connection.channel()

 

六、总结

 

RabbitMQ的默认端口是15672,在python中使用的模块是pika。consumers中如果不声明queue的话,则如果consumers先启动,则会报错。如果是producer先启动,consumers后启动则不报错。但是如果说consumer声明了,consumer先启动就不会报错。如果是producers先启动的话,则忽略。所有的socket传输都是bytes类型。消费者和生产者不一定在同一台机器上,在其他机器上运行也是可以的。consumers启动以后会一直运行下去,它会永远的收下去。producers可以运行多次,只要运行一次,consumers就会接收一次。

 

转载于:https://www.cnblogs.com/xiangjun555/articles/7867402.html

最新回复(0)