一、锁
当线程安全,多线程操作时,内部会让所有线程排队处理。如:list、dict、Queue
当线程不安全时,那么我们就需要:不安全线程 + 锁 =》 排队处理
例如:
a. 创建100个线程,在列表中追加8 : 线程安全
b. 创建100个线程
v = []
锁
- 把自己的添加到列表中
- 在读取列表中的最后一个
(锁住以后可以保证最后一个是自己放进去的那一个)
解锁
1. 锁:Lock(同步锁)
# 一次放一个
# 锁 Lock
import threading
import time
v =
[]
lock =
threading.Lock()
def func(arg):
lock.acquire()
v.append(arg)
time.sleep(0.1
)
m = v[-1
]
print(arg, m)
lock.release()
for i
in range(10
):
t = threading.Thread(target=func, args=
(i, ))
t.start()
time.sleep(0.5
)
print(v)
>>>
0 0
[0]
1 1
[0, 1
]
2 2
[0, 1, 2
]
3 3
[0, 1, 2, 3
]
4 4
[0, 1, 2, 3, 4
]
5 5
[0, 1, 2, 3, 4, 5
]
6 6
[0, 1, 2, 3, 4, 5, 6
]
7 7
[0, 1, 2, 3, 4, 5, 6, 7
]
8 8
[0, 1, 2, 3, 4, 5, 6, 7, 8
]
9 9
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Lock多次“上锁”会造成死锁状态,不能继续执行(不会报错)
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
v =
[]
lock =
threading.Lock()
def func(arg):
lock.acquire()
lock.acquire() # 多次锁 就会死锁
v.append(arg)
time.sleep(arg)
m = v[-1
]
print(arg, m)
lock.release()
lock.release()
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
2. 锁:RLock(递归锁)
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
# 锁 RLock
v =
[]
lock =
threading.RLock()
def func(arg):
lock.acquire()
lock.acquire() # 可加多个锁
v.append(arg)
time.sleep(0.1
)
m = v[-1
]
print(m, arg)
lock.release()
lock.release() # 用多少锁,就要释放几次
for i
in range(10
):
t = threading.Thread(target=func, args=
(i, ))
t.start()
3. 锁:BoundedSemaphore(信号量)
一个工厂函数,返回一个新的有界信号量对象。有界信号量会确保他的值不会超过初始值;如果超出则会抛出ValueError异常。初始值默认为1。
# 一次放n个
v =
[]
lock = threading.BoundedSemaphore(3)
# 一次放n个
def func(arg):
lock.acquire()
print(arg)
time.sleep(1
)
lock.release()
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
4.锁:Condition(条件)
使得线程等待,只有满足某条件时,才释放n个线程
# 一次方指定个,并可循环指定
# notify通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件
v =
[]
lock =
threading.Condition()
i =
0
def func(arg):
ident =
threading.current_thread()
print(ident)
lock.acquire()
lock.wait() # 在此处加锁
print(arg)
num =
threading.get_ident()
print(num)
time.sleep(0.1
)
lock.release()
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
# 主线程
while True:
inp = int(input(
"请输入:"))
lock.acquire()
lock.notify(inp)
lock.release()
# wait进行条件判断 是否为真
v =
[]
lock =
threading.Condition()
def cond():
ct = threading.current_thread()
# 获取当前线程
cn = ct.getName()
# 获取当前线程名称
print(
"进入cond函数,线程%s准备好了!" %
cn)
input("按任意键继续!\n")
return True
def func(arg):
print(
"线程进来了")
lock.wait_for(cond) # 在此处加锁 当函数返回为真 才能解锁
print(arg)
time.sleep(1
)
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
5.锁:Event(事件)
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
# 一次放所有
lock =
threading.Event()
def func(arg):
ct = threading.current_thread()
# 获取当前线程
print(ct.getName())
lock.wait() # 加锁
print(arg)
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
input(">>>")
lock.set() # 此处解锁 释放所有
# wait 条件加锁
lock =
threading.Event()
def func(arg):
ct = threading.current_thread()
# 获取当前线程
print(ct.getName())
lock.wait() # 加锁
print(arg)
# 第一次循环还没有将锁解开,只能显示线程名称
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
input(">>>")
lock.set() # 开锁 释放全部
time.sleep(3)
# 睡3秒是为了给时间把上一步骤全部释放完再执行下一步
input(
"<<<")
# 第二次执行的时候线程名称和结果同时出现,因为在上一步骤中set已经将锁打开
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
# 再次加锁
lock =
threading.Event()
def func(arg):
print(
"线程进来了")
lock.wait() # 加锁
print(arg)
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
input(">>>")
lock.set()
time.sleep(1
)
lock.clear() # 再次加速
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
input(">>>")
lock.set()
总结:
线程安全,列表和字典线程安全
为什么加锁?
- 非线程安全
- 控制一段代码
二、threading.local(本地线程)
- 不同的线程来了,内部会为不同的线程创建不同的空间用于存储
作用:
内部自动为每个线程维护一个空间(字典),用于当前线程存取属于自己的值。保证线程之间的数据隔离。
v =
threading.local()
def func(arg):
# 内部会为当前吸纳从创建一个空间用于存储, phone = 自己的值
v.phone =
arg
time.sleep(2
)
print(v.phone, arg)
# 去当前线程自己空间取值
for i
in range(10
):
t = threading.Thread(target=func, args=
(i,))
t.start()
三. 线程池
# 线程池`
from concurrent.futures
import ThreadPoolExecutor
import time
def task(a1,a2):
time.sleep(1
)
print(a1, a2)
# 创建一个线程池(最多3个)
pool = ThreadPoolExecutor(3
)
for i
in range(40
):
# 去线程池中申请一个线程,让线程执行task函数
pool.submit(task, i, i+1)
四. 生产者消费者模型
生产者
队列、栈
消费者
# 生产者消费者模型
import queue
q =
queue.Queue()
q.put(3
)
q.put(2
)
q.put(1
)
while 1
:
v1 =
q.get()
print(v1)
q = queue.Queue()
# 线程安全
def producer(id):
"""
生产者
:return:
"""
while True:
time.sleep(2
)
q.put('包子')
print(
"厨师%s生产了一个包子" %
id)
for i
in range(1,4
):
t = threading.Thread(target=producer, args=
(i,))
t.start()
def consumer(id):
"""
消费者
:return:
"""
while True:
time.sleep(1
)
q.get()
print(
"顾客%s 吃了了一个包子" %
id)
for i
in range(1, 4
):
t = threading.Thread(target=consumer, args=
(i,))
t.start()
转载于:https://www.cnblogs.com/jiumo/p/9635997.html