Python进阶----进程之间通信(互斥锁,队列(参数:timeout和block),), ***生产消费者模型...

mac2022-06-30  63

Python进阶----进程之间通信(互斥锁,队列(参数:timeout和block),), ***生产消费者模型

一丶互斥锁

含义:

​ ​ ​ 每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象(串行)

目的:

 ​ ​ 来保证共享数据操作的完整性和安全性(文本数据),保证数据的公平性

区别join:

 ​ ​ 共同点: 都能实现cpu的进程串行

 ​ ​ 不同点: join是人为指定顺序, 不能保证公平性. 互斥锁能够保证公平性

### 加锁处理 from multiprocessing import Lock def task1(loc): loc.acquire() # 上锁 print('task1: 开始打印') time.sleep(random.randint(1,3)) print('task1: 结束打印') loc.release() # 解锁 def task2(loc): loc.acquire() print('task2: 开始打印') time.sleep(random.randint(1,3)) print('task2: 结束打印') loc.release() def task3(loc): loc.acquire() print('task3: 开始打印') time.sleep(random.randint(1,3)) print('task3: 结束打印') loc.release() if __name__ == '__main__': loc=Lock() # 生成锁对象 p1=Process(target=task1,args=(loc,)).start() #把锁对象作为参数传给具体的方法 p2=Process(target=task2,args=(loc,)).start() p3=Process(target=task3,args=(loc,)).start()

锁死:(下一篇细说锁死)

​ ​ ​  当一个进程或者一个线程一直调用或者占用同一锁Lock而不释放资源而导致其他进程/线程无法获得锁,就会出现的死锁状况,一直阻塞在aquire()处

### 当一个锁对象已经被上锁, 试图再次加锁, 就会造成锁死. from multiprocessing import Lock def task1(loc): print('task1') loc.acquire() print('task1: 开始打印') time.sleep(random.randint(1,3)) print('task1: 结束打印') loc.release() def task2(loc): print('task2') loc.acquire() # 第一层锁 loc.acquire() #第二层锁, 试图再次加锁,由于锁对象已经被占用(已经锁上了,还没有释放)再次上锁,就会造成锁死 (程序被卡主)~~~ loc.release() print('task2: 开始打印') time.sleep(random.randint(1,3)) print('task2: 结束打印') loc.release() def task3(loc): print('task3') loc.acquire() print('task3: 开始打印') time.sleep(random.randint(1,3)) print('task3: 结束打印') loc.release() if __name__ == '__main__': loc=Lock() p1=Process(target=task1,args=(loc,)).start() p2=Process(target=task2,args=(loc,)).start() p3=Process(target=task3,args=(loc,)).start()

 ​ ​  案例:模拟抢票(多进程串行执行够任务.)

### db.json 自己提前创建好 with open('db.json', 'w', encoding='utf-8') as f: dic={'count':1} json.dump(dic, f) ### searc方法 打印剩余票数 def search(): time.sleep(random.random()) with open('db.json', encoding='utf-8') as f: dic = json.load(f) print(f'剩余票数:{dic["count"]}') ### 模拟多用户(多进程)抢票 def get(): with open('db.json', encoding='utf-8') as f: dic = json.load(f) time.sleep(random.randint(0, 2)) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', 'w', encoding='utf-8') as f: json.dump(dic, f) print(f'用户:{os.getpid()} ,购买成功~~') else: print(f'{os.getpid()} 没票了~~~~') def task(lock): search() lock.acquire() #给抢票购买, 加锁 . 既保证了数据的安全性,也保证了数据公平性 get() lock.release()# 解锁 if __name__ == '__main__': lock = Lock() for i in range(5): p1 = Process(target=task, args=(lock,)) # 模拟5个用户进程 p1.start()

二丶进程之间的通信: 队列.

含义:

​ ​ ​ 队列就是存在于内存中一个数据容器,一种特殊的线性表

​ ​ ​ 特点:先进先出(FIFO),Queue是多进程安全的队列,自动加锁,自动阻塞

目的:

​ ​ ​ 实现进程之间的通信

multiprocessing模块:

​ ​ ​ 模块支持两种形式:队列(自动加锁,自动阻塞)和管道(需要自己手动加锁),这两种方式都是用于进程间消息传递

### 队列Queue基本用法 # 1.放值 put(值,block=False,timeout=X) block是否阻塞, timeout是否超时 # 2.取值 get() #get完队列里的所有数据时,程序卡出. 如果队列中有新的数据时,会继续执行 # 3.maxsize 队列中允许最大存放数 # 4.empty():调用此方法时q为空则返回True,该结果不可靠, # 5.full():调用此方法时q已满则返回True,该结果不可靠, # 6.qsize():返回队列中目前项目的正确数量,结果也不可靠, # 7.get_nowait() 和 put_nowait() 同 block=False 不阻塞,不等待 from multiprocessing import Queue q=Queue(3) # 设置队列里最大的元素个数 q.put('1') q.put('2') q.put('3') q.put('4') # 夯住 ,只能放3个,不允许继续添加,程序卡在此处. 下面的程序不再执行 print(q.get()) print(q.get()) print(q.get()) print(q.get()) #### 夯住 只能取3个,程序卡在此处. 如果队列中有新的数据时,会继续执行 # 原理同上 # timeout 超时抛出异常(Full or Empty) , block默认阻塞,block=Fasle不会阻塞 q=Queue(3) q.put(1) q.put(3) q.put(2) q.put(4,block=False,timeout=3) print(q.get()) print(q.get()) print(q.get()) q.get(block=False,timeout=3)

三丶进程之间的通信实例

### 队列模拟进程之间 ,30个进程,队列只获取10个. from multiprocessing import Process from multiprocessing import Queue import os def task(q): try: q.put(os.getpid(),block=False) except Exception: return if __name__ == '__main__': q=Queue(10) # 生成Queue队列 for i in range(30): Process(target=task,args=(q,)).start() for j in range(1,11): print(f'第{j}用户:{q.get()}')

四丶生产者消费者模型(常用于并发)

含义:

​ ​ ​ 完完全全的实现进程之间的通信.有三个主体:生产者,消费者,存数据的容器(队列).

好处:

​ ​ ​ 1.平衡生产者与消费者之间的速度差 ​ ​ ​ 2.程序解开耦合 ​ ​ ​ 3.支持并发

构成:

​ ​ ​  三二一原则: ​ ​ ​  ​ ​ ​  三种关系 (生产者与生产者(互斥) , 生产者与消费者(同步与互斥) ,消费者与消费者(互斥)) ​ ​ ​  ​ ​ ​  两个角色(生产者和消费者) ​ ​ ​  ​ ​ ​  一个场所 (队列缓冲区)

No BB see 代码:

# -*-coding:utf-8-*- # Author:Ds ### 合理的去调控多个进程去生成数据以及提取数据,中间有个必不可少的环节容器队列. from multiprocessing import Process from multiprocessing import Queue import time import random # 生产者 def Producer(name,q): for el in range(1,11): time.sleep(random.randint(1,2)) # 随机 res=f'生产者:{name} , 生产的---第 {el} 号包子 ' q.put(res) #放到队列容器中 print(f'\033[0;35m {res} \033[0m') # 消费者 def Consumer(name,q): while 1: # 循环从队列里面取出数据, 如果队列中不存在,就会卡住,等待数据. 一但队列中有了数据,等待的消费者进程就会获得数据. try: time.sleep(random.randint(1,3)) # 增加随机性 ret=q.get(timeout=5) # 从队列中取数据,并设置超时. 一旦生成者不再往队列中添加数据,5秒之后消费者直接抛出empty异常 print(f'消费者{name}: 吃了 {ret}') except Exception: return ### if __name__ == '__main__': q=Queue() # 实例化队列对象 # 2 生产者对象 for i in range(1,3): Process(target=Producer,args=(i,q)).start() # args() 接收参数队列对象,确保使用的是同一个队列 # 3 个消费者对象 for j in range(1,4): Process(target=Consumer,args=(j,q)).start()

转载于:https://www.cnblogs.com/dengl/p/11232840.html

最新回复(0)