python--基础知识点--生产者、消费者

mac2025-01-30  23

1、使用进程

1.1 使用multiprocessing.Process() + 列表 + multiprocessing.Manager().Queue()

  实现多生产者多消费者

import multiprocessing from multiprocessing import Manager, Queue, JoinableQueue import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': # que = Queue() # que = JoinableQueue() que = Manager().Queue() consumers = producers = [] for i in range(3): consumers.append(multiprocessing.Process(target=producer, args=(que, "producer" + str(i)))) consumers[-1].start() for i in range(2): producers.append(multiprocessing.Process(target=consumer, args=(que, "consumer" + str(i)))) producers[-1].start() for c in consumers: c.join() for p in producers: p.join() """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer1生产了数据:1 producer2生产了数据:1 consumer0消费了数据:0 consumer1消费了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 Process finished with exit code 0 """ """ 注意:queue.Queue(),multiprocessing.Queue(),multiprocessing.JoinableQueue(),multiprocessing.Manager().Queue()四个队列的区别。 """
1.2、使用ProcessPoolExecutor() + multiprocessing.Manager().Queue()

  实现多生产者多消费者

from concurrent.futures import ProcessPoolExecutor import time from multiprocessing import Manager def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = Manager().Queue() pool = ProcessPoolExecutor(max_workers=5) for i in range(3): pool.submit(producer, que, "producer" + str(i)) for i in range(2): pool.submit(consumer, que, "consumer" + str(i)) pool.shutdown(wait=True) print('主进程完成') """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer1生产了数据:1 producer2生产了数据:1 consumer0消费了数据:0 consumer1消费了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 主进程完成 Process finished with exit code 0 """
1.3 使用multiprocessing.Pool() + multiprocessing.Manager().Queue()

  实现多生产者多消费者

import multiprocessing from multiprocessing import Manager import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = Manager().Queue() pool = multiprocessing.Pool(processes=5) for i in range(3): pool.apply_async(producer, (que, "producer" + str(i))) for i in range(2): pool.apply_async(consumer, (que, "consumer" + str(i))) pool.close() pool.join() print('主进程完成') """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer1生产了数据:1 producer2生产了数据:1 consumer0消费了数据:0 consumer1消费了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 主进程完成 Process finished with exit code 0 """

2、使用线程

2.1、使用threading.Thread() + [ ] + queue.Queue()

  实现多生产者多消费者

import threading import queue import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = queue.Queue() consumers = producers = [] for i in range(3): consumers.append(threading.Thread(target=producer, args=(que, "consumer" + str(i)))) consumers[-1].start() for i in range(2): producers.append(threading.Thread(target=consumer, args=(que, "producer" + str(i)))) producers[-1].start() for c in consumers: c.join() for p in producers: p.join() """ 运行结果: consumer0生产了数据:0 consumer1生产了数据:0 consumer2生产了数据:0 producer0消费了数据:0 producer1消费了数据:0 consumer1生产了数据:1 producer0消费了数据:0 consumer2生产了数据:1 producer1消费了数据:1 consumer0生产了数据:1 producer1消费了数据:1 producer0消费了数据:1 Process finished with exit code 0 """
2.2 使用ThreadPoolExecutor() + queue.Queue()

  实现多生产者多消费者

from concurrent.futures import ThreadPoolExecutor import queue import time def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) que.put(i) time.sleep(0.5) def consumer(que, name): while True: if not que.empty(): data = que.get() que.task_done() print(name + "消费了数据:%d" % data) else: time.sleep(0.6) if que.empty(): break time.sleep(0.5) if __name__ == '__main__': que = queue.Queue() pool = ThreadPoolExecutor(max_workers=5) for i in range(3): pool.submit(producer, que, "producer" + str(i)) for i in range(2): pool.submit(consumer, que, "consumer" + str(i)) pool.shutdown(wait=True) print('主线程完成') """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 consumer0消费了数据:0 producer2生产了数据:1 producer1生产了数据:1 consumer1消费了数据:1 consumer1消费了数据:1 consumer0消费了数据:1 主线程完成 Process finished with exit code 0 """

3、使用生成器

  生成器只能有一个生产者一个消费者。

def producer(c): c.send(None) for data in range(5): print("生产者生成发送了数据%d" % data) c.send(data) c.close() def consumer(): response = None while True: data = yield response print("消费者接受消费了数据%d" % data) response = 200 if __name__ == '__main__': c = consumer() producer(c) """ 运行结果: 生产者生成发送了数据0 消费者接受消费了数据0 生产者生成发送了数据1 消费者接受消费了数据1 生产者生成发送了数据2 消费者接受消费了数据2 生产者生成发送了数据3 消费者接受消费了数据3 生产者生成发送了数据4 消费者接受消费了数据4 Process finished with exit code 0 """

4. 使用multiprocessing.Pipe + multiprocessing.Process/threading.Thread

  multiprocessing.Pipe + multiprocessing.Process/threading.Thread只能有一个生产者一个消费者。

from multiprocessing import Pipe, Process import time from threading import Thread def producer(pipe): for i in range(5): pipe.send(i) time.sleep(0.5) print("send {0} to pipe".format(i)) def consumer(pipe): n = 5 while n > 0: result = pipe.recv() time.sleep(0.5) print("recv {0} from pipe".format(result)) n -= 1 if __name__ == '__main__': pipe = Pipe(duplex=False) print(type(pipe)) # producer_ = Process(target=producer, args=(pipe[1],)) # consumer_ = Process(target=consumer, args=(pipe[0],)) producer_ = Thread(target=producer, args=(pipe[1],)) consumer_ = Thread(target=consumer, args=(pipe[0],)) producer_.start() consumer_.start() producer_.join() consumer_.join() pipe[0].close() pipe[1].close() """ 运行结果: <class 'tuple'> send 0 to pipe recv 0 from pipe send 1 to pipe recv 1 from pipe recv 2 from pipe send 2 to pipe send 3 to pipe recv 3 from pipe recv 4 from pipe send 4 to pipe Process finished with exit code 0 """ """ 当Pipe的参数duplex=True时,此时Pipe实现全双工通信。 """

5.使用 asyncio.Queue + asyncio

import asyncio from asyncio import Queue async def producer(que, name): for i in range(2): if not que.full(): print(name + "生产了数据:%d" % i) await que.put(i) await asyncio.sleep(0.5) async def consumer(que, name): while True: if not que.empty(): data = await que.get() que.task_done() print(name + "消费了数据:%d" % data) else: await asyncio.sleep(5) if que.empty(): break await asyncio.sleep(0.5) if __name__ == '__main__': que = Queue() loop = asyncio.get_event_loop() tasks_producer = [asyncio.ensure_future(producer(que, "producer" + str(i))) for i in range(3)] tasks_consumer = [asyncio.ensure_future(consumer(que, "consumer" + str(i))) for i in range(2)] tasks = tasks_producer + tasks_consumer results = loop.run_until_complete(asyncio.wait(tasks)) loop.close() """ 运行结果: producer0生产了数据:0 producer1生产了数据:0 producer2生产了数据:0 consumer0消费了数据:0 consumer1消费了数据:0 producer0生产了数据:1 producer2生产了数据:1 consumer1消费了数据:0 producer1生产了数据:1 consumer0消费了数据:1 consumer1消费了数据:1 consumer0消费了数据:1 Process finished with exit code 0 """
最新回复(0)