每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象(串行)
来保证共享数据操作的完整性和安全性(文本数据),保证数据的公平性
共同点: 都能实现cpu的进程串行
不同点: join是人为指定顺序, 不能保证公平性. 互斥锁能够保证公平性
### 加锁处理 from multiprocessing import Lock def task1(loc): loc.acquire() # 上锁 print('task1: 开始打印') time.sleep(random.randint(1,3)) print('task1: 结束打印') loc.release() # 解锁 def task2(loc): loc.acquire() print('task2: 开始打印') time.sleep(random.randint(1,3)) print('task2: 结束打印') loc.release() def task3(loc): loc.acquire() print('task3: 开始打印') time.sleep(random.randint(1,3)) print('task3: 结束打印') loc.release() if __name__ == '__main__': loc=Lock() # 生成锁对象 p1=Process(target=task1,args=(loc,)).start() #把锁对象作为参数传给具体的方法 p2=Process(target=task2,args=(loc,)).start() p3=Process(target=task3,args=(loc,)).start() 当一个进程或者一个线程一直调用或者占用同一锁Lock而不释放资源而导致其他进程/线程无法获得锁,就会出现的死锁状况,一直阻塞在aquire()处
### 当一个锁对象已经被上锁, 试图再次加锁, 就会造成锁死. from multiprocessing import Lock def task1(loc): print('task1') loc.acquire() print('task1: 开始打印') time.sleep(random.randint(1,3)) print('task1: 结束打印') loc.release() def task2(loc): print('task2') loc.acquire() # 第一层锁 loc.acquire() #第二层锁, 试图再次加锁,由于锁对象已经被占用(已经锁上了,还没有释放)再次上锁,就会造成锁死 (程序被卡主)~~~ loc.release() print('task2: 开始打印') time.sleep(random.randint(1,3)) print('task2: 结束打印') loc.release() def task3(loc): print('task3') loc.acquire() print('task3: 开始打印') time.sleep(random.randint(1,3)) print('task3: 结束打印') loc.release() if __name__ == '__main__': loc=Lock() p1=Process(target=task1,args=(loc,)).start() p2=Process(target=task2,args=(loc,)).start() p3=Process(target=task3,args=(loc,)).start() 案例:模拟抢票(多进程串行执行够任务.)
### db.json 自己提前创建好 with open('db.json', 'w', encoding='utf-8') as f: dic={'count':1} json.dump(dic, f) ### searc方法 打印剩余票数 def search(): time.sleep(random.random()) with open('db.json', encoding='utf-8') as f: dic = json.load(f) print(f'剩余票数:{dic["count"]}') ### 模拟多用户(多进程)抢票 def get(): with open('db.json', encoding='utf-8') as f: dic = json.load(f) time.sleep(random.randint(0, 2)) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', 'w', encoding='utf-8') as f: json.dump(dic, f) print(f'用户:{os.getpid()} ,购买成功~~') else: print(f'{os.getpid()} 没票了~~~~') def task(lock): search() lock.acquire() #给抢票购买, 加锁 . 既保证了数据的安全性,也保证了数据公平性 get() lock.release()# 解锁 if __name__ == '__main__': lock = Lock() for i in range(5): p1 = Process(target=task, args=(lock,)) # 模拟5个用户进程 p1.start() 队列就是存在于内存中一个数据容器,一种特殊的线性表
特点:先进先出(FIFO),Queue是多进程安全的队列,自动加锁,自动阻塞
实现进程之间的通信
模块支持两种形式:队列(自动加锁,自动阻塞)和管道(需要自己手动加锁),这两种方式都是用于进程间消息传递
### 队列Queue基本用法 # 1.放值 put(值,block=False,timeout=X) block是否阻塞, timeout是否超时 # 2.取值 get() #get完队列里的所有数据时,程序卡出. 如果队列中有新的数据时,会继续执行 # 3.maxsize 队列中允许最大存放数 # 4.empty():调用此方法时q为空则返回True,该结果不可靠, # 5.full():调用此方法时q已满则返回True,该结果不可靠, # 6.qsize():返回队列中目前项目的正确数量,结果也不可靠, # 7.get_nowait() 和 put_nowait() 同 block=False 不阻塞,不等待 from multiprocessing import Queue q=Queue(3) # 设置队列里最大的元素个数 q.put('1') q.put('2') q.put('3') q.put('4') # 夯住 ,只能放3个,不允许继续添加,程序卡在此处. 下面的程序不再执行 print(q.get()) print(q.get()) print(q.get()) print(q.get()) #### 夯住 只能取3个,程序卡在此处. 如果队列中有新的数据时,会继续执行 # 原理同上 # timeout 超时抛出异常(Full or Empty) , block默认阻塞,block=Fasle不会阻塞 q=Queue(3) q.put(1) q.put(3) q.put(2) q.put(4,block=False,timeout=3) print(q.get()) print(q.get()) print(q.get()) q.get(block=False,timeout=3) 完完全全的实现进程之间的通信.有三个主体:生产者,消费者,存数据的容器(队列).
1.平衡生产者与消费者之间的速度差 2.程序解开耦合 3.支持并发
三二一原则: 三种关系 (生产者与生产者(互斥) , 生产者与消费者(同步与互斥) ,消费者与消费者(互斥)) 两个角色(生产者和消费者) 一个场所 (队列缓冲区)
转载于:https://www.cnblogs.com/dengl/p/11232840.html