消息队列
消息队列”是在消息的传输过程中保存消息的容器。
消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
相当于水管,有一个入口和出口,水从入口流入,从出口流出,这就是一个消息队列。左侧线程或者进程往队列里面添加数据,它的任务就结束了,右侧线程或者进程只要依次从出口处读取数据就可以了。
消息队列的思想
比如在京东下单,并付完钱,相当于把消息堆在了水管里面,会返回一个结果给客户,告知客户已经购买此商品,后台会有线程去接收并处理这个订单消息,然后去库房发货、走物流,知道最后接收货物并签收,这个流程就算结束了。所以,在异步处理问题的时候,都会用到消息队列的这种思想。操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。
使用multiprocessing里面的Queue来实现消息队列。
语法格式:
from multiprocessing
import Queue
q =
Queue
q.put(data)
data = q.get(data)
例子:
from multiprocessing
import Queue, Process
# 写数据的进程
def write(q):
for i
in [
'a',
'b',
'c',
'd']:
q.put(i) # 把消息放入队列
print (
'put {0} to queue'.format(i))
# 读取数据的进程
def read(q):
while 1
:
result = q.get()
# 从队列中读取消息
print (
"get {0} from queue".format(result))
def main():
# 父进程创建Queue,并传给各个子进程
q =
Queue()
pw = Process(target=write,args=(q,))
# 使用多进程,传入的参数是消息队列
pr = Process(target=read,args=
(q,))
pw.start() # 启动子进程,写入数据
pr.start()
# 启动子进程,读取数据
pw.join()
# 等待pw进程结束
pr.terminate()
#停止
# 相当于join,等pr完成以后,while是一个死循环,这里强制结束,因为读取数据的进程应该是一直监听是否有数据产生,有就会去读取。
if __name__ ==
'__main__':
main()
结果:
put a to queue
get a from queue
put b to queue
get b from queue
put c to queue
get c from queue
put d to queue
get d from queue
使用multiprocessing里面的PIPE来实现消息队列。
1、Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2只负责发送消息。
2、send和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。
例子:
import time
from multiprocessing
import Pipe, Process
# 发送消息的进程
def proc1(pipe):
for i
in xrange(1, 10
):
pipe.send(i)
print (
"send {0} to pipe".format(i))
time.sleep(1
)
# 接收消息的进程
def proc2(pipe):
n = 9
while n >
0:
result =
pipe.recv()
print (
"recv {0} from pipe".format(result))
n -= 1
def main():
pipe = Pipe(duplex=False)
# 设置半双工模式,p1只负责发送消息,p2只负责接收消息,pipe是一个tuple类型
p1 = Process(target=proc1, args=(pipe[1
],))
p2 = Process(target=proc2, args=(pipe[0],))
#接收写0
p1.start()
p2.start()
p1.join()
p2.join()
pipe[0].close()
pipe[1
].close()
if __name__ ==
'__main__':
main()
结果:
send 1
to pipe
recv 1
from pipe
recv 2
from pipe
send 2
to pipe
send 3
to pipe
recv 3
from pipe
recv 4
from pipe
send 4
to pipe
send 5
to pipe
recv 5
from pipe
recv 6
from pipe
send 6
to pipe
recv 7
from pipe
send 7
to pipe
recv 8
from pipe
send 8
to pipe
send 9
to pipe
recv 9
from pipe
Python提供了Queue模块来专门实现消息队列
Queue对象
Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:
Queue.qsize():返回消息队列的当前空间。返回的值不一定可靠。
Queue.empty():判断消息队列是否为空,返回True或False。同样不可靠。
Queue.full():类似上边,判断消息队列是否满
Queue.put(item, block=True, timeout=None):往消息队列中存放消息。block可以控制是否阻塞,timeout指定阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。
Queue.put_nowait(item):相当于put(item, False).
Queue.get(block=True, timeout=None):获取一个消息,其他同put。
以下两个函数用来判断消息对应的任务是否完成。
Queue.task_done():接受消息的线程通过调用这个函数来说明消息对应的任务已完成。
Queue.join(): 实际上意味着等到队列为空,再执行别的操作
例子:
from multiprocessing
import Queue
from threading
import Thread
import time
"""
一个生产者和两个消费者,
采用多线程继承的方式,
一个消费偶数,一个消费奇数。
"""
class Proceducer(Thread):
def __init__(self, queue):
super(Proceducer, self).__init__()
self.queue =
queue
def run(self):
try:
for i
in xrange(1, 10
):
print (
"put {0} to queue".format(i))
self.queue.put(i)
except Exception as e:
print (
"put data error")
raise e
class Consumer_even(Thread):
def __init__(self, queue):
super(Consumer_even, self).__init__()
self.queue =
queue
def run(self):
try:
while not self.queue.empty():
# 判断队列是否为空
number = self.queue.get(block=True, timeout=3)
# 从队列中获取消息,block=True表示阻塞,设置超时未3s
if number % 2 == 0:
# 如果获取的消息是偶数
print(
"get {0} from queue EVEN, thread name is {1}".format(number, self.getName()))
else:
self.queue.put(number) # 如果获取的消息不是偶数,就接着把它放回队列中
time.sleep(1
)
except Exception as e:
raise e
class Consumer_odd(Thread):
def __init__(self, queue):
super(Consumer_odd, self).__init__()
self.queue =
queue
def run(self):
try:
while not self.queue.empty():
number = self.queue.get(block=True, timeout=3
)
if number % 2 != 0:
# 如果获取的消息是奇数
print(
"get {0} from queue ODD, thread name is {1}".format(number, self.getName()))
else:
self.queue.put(number)
time.sleep(1
)
except Exception as e:
raise e
def main():
queue =
Queue()
p = Proceducer(queue=
queue)
# 开始产生消息
print(
"开始产生消息")
p.start()
p.join() # 等待生产消息的进程结束
time.sleep(1)
# 消息生产完成之后暂停1s
c1 = Consumer_even(queue=
queue)
c2 = Consumer_odd(queue=
queue)
# 开始消费消息
print(
"开始消费消息")
c1.start()
c2.start()
c1.join()
c2.join()
print (
"消息消费完成")
if __name__ ==
'__main__':
main()
结果:
开始产生消息
put 1
to queue
put 2
to queue
put 3
to queue
put 4
to queue
put 5
to queue
put 6
to queue
put 7
to queue
put 8
to queue
put 9
to queue
开始消费消息
get 1
from queue ODD, thread name
is Thread-3
get 2
from queue EVEN, thread name
is Thread-2
get 3
from queue ODD, thread name
is Thread-3
get 4
from queue EVEN, thread name
is Thread-2
get 5
from queue ODD, thread name
is Thread-3
get 6
from queue EVEN, thread name
is Thread-2
get 7
from queue ODD, thread name
is Thread-3
get 8
from queue EVEN, thread name
is Thread-2
get 9
from queue ODD, thread name
is Thread-3
消息消费完成
Celery异步分布式
什么是celery
Celery是一个python开发的异步分布式任务调度模块。
几个概念
broker:
brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列) ,常见的 brokers 有 rabbitmq、redis、Zookeeper 等。
backend:
顾名思义就是结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了 ,常见的 backend 有 redis、Memcached 甚至常用的数据都可以。
worker:
就是 Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行。
task:
就是我们想在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由workers进行处理。
Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等。
这里我们用redis当做celery的broker和backend。
连接url的格式为
redis://:password@hostname:port/
db_number
例如:
BROKER_URL =
'redis://localhost:6379/0'
安装celery
pip install celery
pip install redis
pip install redis-py-with-geo
# 没有安装这个会报错
File "/usr/lib/python2.7/site-packages/kombu/transport/redis.py", line 671,
in _receive
while c.connection.can_read(timeout=
0):
TypeError: can_read() got an unexpected keyword argument 'timeout'
在服务器上安装redis并启动redis,我安装的redis指定端口为5000。
例子:
vi tasks.py
#/usr/bin/env python
#-*- coding:utf-8 -*-
from celery
import Celery
broker=
"redis://110.106.106.220:5000/5"
backend=
"redis://110.106.106.220:5000/6"
app = Celery(
"tasks", broker=broker, backend=
backend)
@app.task
def add(x, y):
return x+y
现在broker、backend、task都有了,接下来我们就运行worker进行工作,在tasks.py目录运行:
celery -A tasks worker -l info
启动后可以看到如下信息:
[root@izwz920j4zsv1q15yhii1qz scripts]
# celery -A celery_test worker -l info
/usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You
're running the worker with superuser privileges: this is absolutely not recommended!
Please specify a different user using the -
u option.
User information: uid=0 euid=0 gid=0 egid=
0
uid=uid, euid=euid, gid=gid, egid=
egid,
-------------- celery@izwz920j4zsv1q15yhii1qz v4.1.1
(latentcall)
---- **** -----
--- * *** * -- Linux-3.10.0-693.2.2.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2018-05-25 14:28:38
-- * - **** ---
- ** ----------
[config]
- ** ---------- .> app: celery_test:0x25a6450
- ** ---------- .> transport: redis://110.106.106.220:5000/5
- ** ---------- .> results: redis://110.106.106.220:5000/6
- *** --- * --- .> concurrency: 1
(prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks
in this worker)
--- ***** -----
--------------
[queues]
.> celery exchange=celery(direct) key=
celery
[tasks]
. tasks.add
[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5
[2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching
for neighbors
[2018-05-25 14:28:39,475: INFO/
MainProcess] mingle: all alone
[2018-05-25 14:28:39,528: INFO/MainProcess] celery@izwz920j4zsv1q15yhii1qz ready.
意思就是运行 tasks 这个任务集合的 worker 进行工作(当然此时broker中还没有任务,worker此时相当于待命状态),最后一步,就是触发任务,最简单方式就是再写一个脚本然后调用那个被装饰成 task 的函数。
vi trigger.py
from tasks
import add
result = add.delay(4, 4)
#不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
# 是否处理
time.sleep(1
)
print 'task done: {0}'.format(result.get())
# 获取结果
print(result.task_id)
delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。
运行trigger.py之后可以看到如下信息:
[root@izwz920j4zsv1q15yhii1qz scripts]
# python trigger.py
task done: 8
celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2
celery的任务状态
在之前启动tasks.py的窗口可以看到如下信息:
[2018-05-25 14:28:38,431: INFO/MainProcess] Connected to redis://110.106.106.220:5000/5
[2018-05-25 14:28:38,443: INFO/MainProcess] mingle: searching
for neighbors
[2018-05-25 14:28:39,475: INFO/
MainProcess] mingle: all alone
[2018-05-25 14:28:39,528: INFO/
MainProcess] celery@izwz920j4zsv1q15yhii1qz ready.
[2018-05-25 14:33:30,340: INFO/MainProcess] Received task: tasks.add[d64def11-6b77-443f-84c2-
0cbd850972f2]
[2018-05-25 14:33:30,373: INFO/ForkPoolWorker-1] Task tasks.add[d64def11-6b77-443f-84c2-0cbd850972f2] succeeded
in 0.0313169739966s: 8
[2018-05-25 14:33:47,082: INFO/MainProcess] Received task: tasks.add[5ae26e89-5d91-496e-8e1c-
e0504fbbd39a]
[2018-05-25 14:33:47,086: INFO/ForkPoolWorker-1] Task tasks.add[5ae26e89-5d91-496e-8e1c-e0504fbbd39a] succeeded
in 0.00259069999447s: 8
在redis中查看:
110.106.106.220:5000[5]> select 5
OK
110.106.106.220:5000[5]> keys *
1)
"_kombu.binding.celeryev"
2)
"_kombu.binding.celery.pidbox"
3)
"_kombu.binding.celery"
110.106.106.220:5000[5]> select 6
OK
110.106.106.220:5000[6]> keys *
1)
"celery-task-meta-5ae26e89-5d91-496e-8e1c-e0504fbbd39a"
2)
"celery-task-meta-d64def11-6b77-443f-84c2-0cbd850972f2"
110.106.106.220:5000[6]> get celery-task-meta-d64def11-6b77-443f-84c2-
0cbd850972f2
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"d64def11-6b77-443f-84c2-0cbd850972f2\", \"children\": []}"
110.106.106.220:5000[6]> get celery-task-meta-5ae26e89-5d91-496e-8e1c-
e0504fbbd39a
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 8, \"task_id\": \"5ae26e89-5d91-496e-8e1c-e0504fbbd39a\", \"children\": []}"
转载于:https://www.cnblogs.com/yangjian319/p/9089167.html