Python Process 进程间通信

mac2025-04-15  3

队列

概念介绍

创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

方法介绍

Queue([maxsize]):创建共享的进程队列。maxsize是队列中允许的最大项数。省略此参数,则无大小限制

Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ):返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常中。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait() :同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) :将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Full异常。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() :返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

q.empty() :如果调用此方法时 q为空,返回True。同q.qsize()不可靠,原因也同样。

q.full() :如果q已满,返回为True. 同q.qsize()不可靠,原因也同样。

代码实例

1、单独实例用法

from multiprocessing import Queue q=Queue(3) # put ,get ,put_nowait,get_nowait,full,empty简单实例 q.put(3) q.put(3) q.put(3) # q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。 try: q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。 except: print('队列已经满了') # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。 print(q.full()) # True 满了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。 try: q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。 except: print('队列已经空了') print(q.empty()) # True 空了

2 批量数据放入队列再批量数据

import os import time import multiprocessing # 向queue中输入数据的函数 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中输出数据的函数 def outputQ(queue): info = queue.get() print ('%s%s%s'%(str(os.getpid()), '(get):',info)) # Main if __name__ == '__main__': multiprocessing.freeze_support() record1 = [] # 存储输入进程 record2 = [] # 存储输出进程 queue = multiprocessing.Queue(3) # 输入进程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 输出进程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()

3 生产者消费者模型

from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到结束信号则结束 time.sleep(random.randint(1,3)) print('%s 购买了 %s' %(os.getpid(),res)) def worker(q): for i in range(10): time.sleep(random.randint(1,3)) res='汽车%s' %i q.put(res) print('生产了 %s' %(os.getpid(),res)) q.put(None) #发送结束信号 if __name__ == '__main__': q=Queue() # 生产进程:即工人 p1=Process(target=producer,args=(q,)) # 消费进程:即顾客 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() # 也可在主进程加入结束信号,但是要在生产进程结束后,再加入信号 # 如果有多个消费进程,就发送几个结束信号 p1.join() q.put(None) #发送结束信号 print('主进程')

4 可连接的共享队列

常用的两个方法:

q.task_done():使用者使用此方法发出信号,表示q.get()返回的项目已经被处理

q.join():生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。

下面说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列并等待它们被处理。

from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('%s 购买了 %s' %(os.getpid(),res)) q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('生产了 %s' %(os.getpid(),res)) q.join() #生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。 if __name__ == '__main__': q=JoinableQueue() #生产者进程:即工人 p1=Process(target=producer,args=('宝马',q)) p2=Process(target=producer,args=('奔驰',q)) p3=Process(target=producer,args=('法拉利',q)) #消费进程:即顾客 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #开始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主进程') # 主进程等--->p1,p2,p3等--->c1,c2 # p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据 # 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了,所以设置成守护进程就可以了。
最新回复(0)