[7]python 多线程

mac2022-06-30  24

GIL 全局解析器锁

python中的线程只有获取到GIL后才能执行。 在某一时刻,只有一个线程在一个cpu上运行。无法将多个线程映射到多个cpu上运行。 线程在时间片用完、执行多条字节代、遇到IO阻塞,会释放GIL

多线程实现

通过定义函数 import threading def A(): print("thread A") def B(): print("thread B") if __name__ == "__main__": t1 = threading.Thread(target=A) t2 = threading.Thread(target=B) t1.start() t2.start() 通过继承 threading.Thread类 import threading class A(threading.Thread): def run(self): print("thread A") class B(threading.Thread): def run(self): print("thread B") if __name__ == "__main__": t1 = A() t2 = B() t1.start() t2.start()

线程间通信

通过共享变量进行通信 import threading #定义一个全局 var = 0 def A(): global var var += 1000 def B(): global var print("var=",var) if __name__ == "__main__": t1 = threading.Thread(target=A) t2 = threading.Thread(target=B) t1.start() t2.start() 通过消息队列进行通信 import threading import queue def A(q): """ producter :param queue.Queue q: """ for i in range(100): q.put('are you ok '+str(i)) def B(q): """ comsumer :param queue.Queue q: :return: """ while True: print(q.get()) if __name__ == "__main__": q = queue.Queue(1000) t1 = threading.Thread(target=A, args=(q,)) t2 = threading.Thread(target=B, args=(q,)) t1.start() t2.start() t1.join() t2.join()

Lock, Rlock 锁

解决对同一资源的并发访问

import threading n = 0 def A(): global n for i in range(1000000): n += 1 def B(): global n for i in range(1000000): n -= 1 if __name__ == "__main__": t1 = threading.Thread(target=A) t2 = threading.Thread(target=B) t1.start() t2.start() t1.join() t2.join() # n每次运行的结果不一致,正常结果应该等于0 # 原因时A,B两个线程交替运行,他们读取到的共享变量n不一定时最新的,在他们更新变量n之前,n已经被修改过 print(n)

通过使用锁,来解决并发访问共享资源

import threading n = 0 l = threading.Lock() def A(): global n for i in range(1000000): l.acquire() n += 1 # 将对共享资源的操作写在 这两个函数调用之间 l.release() def B(): global n for i in range(1000000): l.acquire() n -= 1 l.release() if __name__ == "__main__": t1 = threading.Thread(target=A) t2 = threading.Thread(target=B) t1.start() t2.start() t1.join() t2.join() print(n)

使用Rlock,能够避免在同一个线程内重复申请锁,造成死锁

import threading n = 0 l = threading.RLock() def A(): global n for i in range(1000000): l.acquire() l.acquire() # 在同一个线程内,重复申请锁,也不会造成死锁 n += 1 # 需要注意,申请了多少次锁,就要释放多次,不然线程会一直占用该锁 l.release() l.release() def B(): global n for i in range(1000000): l.acquire() n -= 1 l.release() if __name__ == "__main__": t1 = threading.Thread(target=A) t2 = threading.Thread(target=B) t1.start() t2.start() t1.join() t2.join() print(n)

使用condition ,令两个线程配合工作

# 按顺序打印 1、2、3、4、5、6... import threading class A(threading.Thread): def __init__(self,cond): """ :param threading.Condition cond: """ self.cond = cond super().__init__(name="A") def run(self): with self.cond: print(1) self.cond.notify() self.cond.wait() print(3) self.cond.notify() class B(threading.Thread): def __init__(self, cond): """ :param threading.Condition cond: """ self.cond = cond super().__init__(name="B") def run(self): with self.cond: self.cond.wait() print(2) self.cond.notify() self.cond.wait() print(4) if __name__ == '__main__': cond = threading.Condition() t1 = A(cond) t2 = B(cond) t2.start() t1.start()

信号量,合理的分配多个共享资源给线程

import threading import time def A(i, sem): """ :param threading.Semaphore sem: """ # 先申请使用信号量 sem.acquire() time.sleep(i//2) print("this is {}".format(i)) # 业务处理完毕后释放 sem.release() if __name__ == "__main__": sem = threading.Semaphore(3) for n in range(10): t = threading.Thread(target=A, args=(n,sem)) t.run()

多线程池

import concurrent.futures # 使用线程池,能够限制创建的线程数量 # 能够查看线程的执行状态,获取线程返回的结果 def A(no): str = "this is {}".format(no) print(str) return "result :"+str if __name__ == '__main__': ex = concurrent.futures.ThreadPoolExecutor(max_workers=2) #max_workers,限制能够同时运行的线程数量 t1 = ex.submit(A,1) # 提交线程任务,方法会立即放回 t2 = ex.submit(A,2) print(t1.cancel()) #取消线程,只有在线程未开始执行时,才能取消 print(t1.done()) #查看线程是否执行完毕 print(t1.result()) #查看线程放回的结果

通过 as_completed 获取已完成任务的线程

import concurrent.futures import time def A(no): str = "this is {no}\n".format(no=no) #print(str) time.sleep(no) return str if __name__ == '__main__': ex = concurrent.futures.ThreadPoolExecutor(3) pool = {ex.submit(A,i) for i in range(4)} #1、 as_completed 获取所有已完成线程的返回结果,如果线程未完成,会一直阻塞。as_completed调用后会返回一个生成器 for item in concurrent.futures.as_completed(pool): print(item.result())

通过 ThreadPoolExecutor 的map方法 ,批量提交线程,并且获取结果

import concurrent.futures import time def A(no): str = "this is {no}\n".format(no=no) time.sleep(no) return str if __name__ == '__main__': ex = concurrent.futures.ThreadPoolExecutor(3) for result in ex.map(A,range(3)): # 能够通过遍历 map的返回值,获取线程执行结果,遇到没有执行完的线程,会阻塞 print(result)

多进程编成

python 中的多线程,在同一个时间点中只有一个线程占用cpu运行,无法多个线程映射到多个cpu上。 多进程能解决pyhon多线程的缺点,适合于计算密集型的场景。但是多进程的创建和切换系统开销比多线程大,在io密集型的场景中,多线程更加合适。

# 比较多线程 和 多进程在计算密集型场景下的执行时间 import concurrent.futures import time def fib(n): if n <= 2: return 1 else: return fib(n-1) + fib(n-2) def thread(): pre_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=20) as ex: pool = [ex.submit(fib, i) for i in range(30,35)] for item in concurrent.futures.as_completed(pool): pass after_time = time.time() print("thread execute {}".format(after_time-pre_time)) def process(): pre_time = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=20) as ex: for item in ex.map(fib, range(30,35)): pass after_time = time.time() print("process execute {}".format(after_time-pre_time)) if __name__ == '__main__': thread() #输出 thread execute 2.5126142501831055 process() #输出 process execute 1.0493500232696533

进程编程

# 创建进程 import multiprocessing def A(i): print("this is process {}".format(i)) if __name__ == "__main__": p = multiprocessing.Process(target=A, args=(1,)) p.start() #启动进程 p.join() # 等待子进程完成 # 创建进程池 import multiprocessing def A(i): return "this is process {}".format(i) if __name__ == "__main__": # 创建线程池 pool = multiprocessing.Pool(5) # 提交进程任务任务 result = pool.apply_async(A,(1,)) # 关闭进程池,不能继续提交任务 pool.close() # 等待进程都处理完毕 pool.join() # 获取进程的返回值 print(result.get()) #imap

进程间通信

管道 import multiprocessing import time import os def A(p): """ :param multiprocessing.Connection p: """ p.send("are you ok\n") pass def B(p): """ :param multiprocessing.Connection p: """ print("B recv: ",p.recv()) if __name__ == "__main__": # 使用 multiprocessing.Pipe 创建管道 pr, pw = multiprocessing.Pipe() p1 = multiprocessing.Process(target=A, args=(pw,)) p2 = multiprocessing.Process(target=B, args=(pr,)) p1.start() p2.start() p1.join() p2.join() 消息队列 import multiprocessing import time import os def A(q): for i in range(20): q.put(i) print(i) time.sleep(0.5) def B(q): while True: i = q.get() print("pid = {},get i = {}".format(os.getpid(), i)) if __name__ == "__main__": # 使用消息队列通信,必须使用Manager创建队列 q = multiprocessing.Manager().Queue(10) pool = multiprocessing.Pool(4) result = [] result.append(pool.apply_async(A,(q,))) result.extend([pool.apply_async(B,(q,)) for i in range(3)]) pool.close() pool.join() 共享内存 可以通过Manager创建,Array、dict、list等共享内存 import multiprocessing import time import os def A(d): """ :param dict d: """ d.update({'name':'tom'}) def B(d): """ :param dict d: """ print(d.get('name','cat')) if __name__ == "__main__": # 使用 multiprocessing.Pipe 创建管道 d = multiprocessing.Manager().dict() p1 = multiprocessing.Process(target=A, args=(d,)) p2 = multiprocessing.Process(target=B, args=(d,)) p1.start() p2.start() p1.join() p2.join()
最新回复(0)