实现多生产者多消费者
import multiprocessing from multiprocessing import Manager, Queue, JoinableQueue import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': # que = Queue() # que = JoinableQueue() que = Manager().Queue() consumers = producers = [] for i in range(3): consumers.append(multiprocessing.Process(target=producer, args=(que, "producer" + str(i)))) consumers[-1].start() for i in range(2): producers.append(multiprocessing.Process(target=consumer, args=(que, "consumer" + str(i)))) producers[-1].start() for c in consumers: c.join() for p in producers: p.join() """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer1生产了数据:1 producer2生产了数据:1 consumer0消费了数据:0 consumer1消费了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 Process finished with exit code 0 """ """ 注意:queue.Queue(),multiprocessing.Queue(),multiprocessing.JoinableQueue(),multiprocessing.Manager().Queue()四个队列的区别。 """实现多生产者多消费者
from concurrent.futures import ProcessPoolExecutor import time from multiprocessing import Manager def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = Manager().Queue() pool = ProcessPoolExecutor(max_workers=5) for i in range(3): pool.submit(producer, que, "producer" + str(i)) for i in range(2): pool.submit(consumer, que, "consumer" + str(i)) pool.shutdown(wait=True) print('主进程完成') """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer1生产了数据:1 producer2生产了数据:1 consumer0消费了数据:0 consumer1消费了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 主进程完成 Process finished with exit code 0 """实现多生产者多消费者
import multiprocessing from multiprocessing import Manager import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = Manager().Queue() pool = multiprocessing.Pool(processes=5) for i in range(3): pool.apply_async(producer, (que, "producer" + str(i))) for i in range(2): pool.apply_async(consumer, (que, "consumer" + str(i))) pool.close() pool.join() print('主进程完成') """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer1生产了数据:1 producer2生产了数据:1 consumer0消费了数据:0 consumer1消费了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 主进程完成 Process finished with exit code 0 """实现多生产者多消费者
import threading import queue import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = queue.Queue() consumers = producers = [] for i in range(3): consumers.append(threading.Thread(target=producer, args=(que, "consumer" + str(i)))) consumers[-1].start() for i in range(2): producers.append(threading.Thread(target=consumer, args=(que, "producer" + str(i)))) producers[-1].start() for c in consumers: c.join() for p in producers: p.join() """ 运行结果: consumer0生产了数据:0 consumer1生产了数据:0 consumer2生产了数据:0 producer0消费了数据:0 producer1消费了数据:0 consumer1生产了数据:1 producer0消费了数据:0 consumer2生产了数据:1 producer1消费了数据:1 consumer0生产了数据:1 producer1消费了数据:1 producer0消费了数据:1 Process finished with exit code 0 """实现多生产者多消费者
from concurrent.futures import ThreadPoolExecutor import queue import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = queue.Queue() pool = ThreadPoolExecutor(max_workers=5) for i in range(3): pool.submit(producer, que, "producer" + str(i)) for i in range(2): pool.submit(consumer, que, "consumer" + str(i)) pool.shutdown(wait=True) print('主线程完成') """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 consumer0消费了数据:0 producer2生产了数据:1 producer1生产了数据:1 consumer1消费了数据:1 consumer1消费了数据:1 consumer0消费了数据:1 主线程完成 Process finished with exit code 0 """生成器只能有一个生产者一个消费者。
def producer(c): c.send(None) for data in range(5): print("生产者生成发送了数据%d" % data) c.send(data) c.close() def consumer(): response = None while True: data = yield response print("消费者接受消费了数据%d" % data) response = 200 if __name__ == '__main__': c = consumer() producer(c) """ 运行结果: 生产者生成发送了数据0 消费者接受消费了数据0 生产者生成发送了数据1 消费者接受消费了数据1 生产者生成发送了数据2 消费者接受消费了数据2 生产者生成发送了数据3 消费者接受消费了数据3 生产者生成发送了数据4 消费者接受消费了数据4 Process finished with exit code 0 """multiprocessing.Pipe + multiprocessing.Process/threading.Thread只能有一个生产者一个消费者。
from multiprocessing import Pipe, Process import time from threading import Thread def producer(pipe): for i in range(5): pipe.send(i) time.sleep(0.5) print("send {0} to pipe".format(i)) def consumer(pipe): n = 5 while n > 0: result = pipe.recv() time.sleep(0.5) print("recv {0} from pipe".format(result)) n -= 1 if __name__ == '__main__': pipe = Pipe(duplex=False) print(type(pipe)) # producer_ = Process(target=producer, args=(pipe[1],)) # consumer_ = Process(target=consumer, args=(pipe[0],)) producer_ = Thread(target=producer, args=(pipe[1],)) consumer_ = Thread(target=consumer, args=(pipe[0],)) producer_.start() consumer_.start() producer_.join() consumer_.join() pipe[0].close() pipe[1].close() """ 运行结果: <class 'tuple'> send 0 to pipe recv 0 from pipe send 1 to pipe recv 1 from pipe recv 2 from pipe send 2 to pipe send 3 to pipe recv 3 from pipe recv 4 from pipe send 4 to pipe Process finished with exit code 0 """ """ 当Pipe的参数duplex=True时,此时Pipe实现全双工通信。 """