现代操作系统都支持多任务,就是说操作系统可以运行多个任务。
早期cpu是单核的,单核cpu实现多任务原理:操作系统轮流让各个任务交替执行,QQ执行2us,再切换到微信,执行2us,再切换…,表面看是每个任务反复执行,但是cpu调度执行速度太快了,导致我们感觉就像是所有任务在同时执行一样。
多核cpu实现多任务原理:真正的并行执行多任务只能是在多核CPU上执行,但是由于任务数量远远多于CPU核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。
并发:看上去一起执行,任务数多于cpu核心数 并行:真正一起执行,任务数小于等于cpu核心数
实现多任务的方式: 1、多进程模式 2、多线程模式 3、协程模式 4、多进程+多线程模式
对于多任务来说,一个任务就是一个进程。 进程是系统中程序执行和资源分配的基本单位,每个进程都有自己的数据段、代码段、堆栈段。
单任务现象:
from time import sleep def run(): while True: print('abcde') sleep(1.2) if __name__ == '__main__': while True: print('12345') sleep(1) #结果是,只能执行主函数里的死循环,run函数执行不到,这就是单任务 #只有上面的死循环结束,才能执行run函数 run()多任务现象:
#启动进程实现多任务 # multiprocessing 库:跨平台版本的实现多进程的模块,提供了Process类来代表一个进程对象 from multiprocessing import Process from time import sleep import os #子进程需要执行的代码 def run(str): while True: #os.getpid()获取当前进程的id号,os.getppid()获取当前进程的父进程的id号 print('abcde-%s-%s-%s'%(str,os.getpid(),os.getppid())) sleep(1.2) if __name__ == '__main__': print('主(父)进程启动-%s'%(os.getpid())) #创建一个进程(子进程) #target说明进程要执行的任务,args可以给进程传数据 p = Process(target=run,args=('nice',)) #启动进程 p.start() while True: print('12345') sleep(1)父子进程的先后顺序:
from multiprocessing import Process from time import sleep def run(str): print('子进程启动') sleep(3) print('子进程结束') if __name__ == '__main__': print('父进程启动') p = Process(target=run,args=('nice',)) p.start()#以后会在这里创建多个子进程,父进程不干活 #父进程的结束不会影响子进程 p.join()#让父进程等待子进程结束再执行父进程 print('父进程结束')全局变量在多个进程中不能共享:
from multiprocessing import Process num = 100#全局变量 def run(): print('子进程启动') global num#这句话相当于在子进程中写了一句num= 100 num +=1 print(num)#101 print('子进程结束') def run1(): print('子进程启动') global num#这句话相当于在子进程中写了一句num= 100 num -=1 print(num)#99,说明不同进程间也不影响 print('子进程结束') if __name__ == '__main__': print('父进程启动') p = Process(target=run) p.start() p.join() #在子进程中修改全局变量,对父进程中的全局变量没有影响 #在创建子进程时,对全局变量做了备份,父进程与子进程中的num是完全不同的两个变量 p2 = Process(target=run1) p2.start() p2.join() print('父进程结束---%s'%num)#父进程结束---100启动大量子进程:
from multiprocessing import Pool import os,time,random def run(name): print('子进程%d启动--%s'%(name,os.getpid())) start = time.time() time.sleep(random.choice([1,2,3])) end = time.time() print('子进程%d结束--%s--耗时%.2f'%(name,os.getpid(),end-start)) if __name__ == '__main__': print('父进程启动') #创建多个进程,Pool为进程池 pp = Pool(2)#表示可以同时执行的进程数量,不写默认大小为cpu核心数 for i in range(4): pp.apply_async(run,args=(i,))#创建进程,放入进程池统一管理 #用pool,在调用join之前,必须先调用close,并且调用close之后就不能再添加新的进程了 # pp.apply_async(run2,) # pp.apply_async(run3)每个进程不同功能时,可以这么添加,不写循环 pp.close() #进程池对象调用join,会等待进程池中所有的子进程结束完毕后再执行父进程 pp.join() print('父进程结束')拷贝文件:
import os from multiprocessing import Pool import time #实现文件的拷贝 def copyFile(rPath,wPath): fr = open(rPath,'rb') fw = open(wPath,'wb') context = fr.read() fw.write(context) fr.close() fw.close() path = r'' toPath = r'' #读取path下的所有文件 fileList = os.listdir(path) start = time.time() #启用for循环,处理每一个文件 for filename in fileList: copyFile(os.path.join(path,filename),os.path.join(toPath,filename)) end = time.time() print(end-start)多进程中拷贝文件:
import os from multiprocessing import Pool import time #实现文件的拷贝 def copyFile(rPath,wPath): fr = open(rPath,'rb') fw = open(wPath,'wb') context = fr.read() fw.write(context) fr.close() fw.close() path = r'' toPath = r'' if __name__ == '__main__': # 读取path下的所有文件 fileList = os.listdir(path) start = time.time() pp = Pool(2) for fileName in fileList: pp.apply_async(copyFile,args=(os.path.join(path,fileName),os.path.join(toPath,fileName))) pp.close() pp.join() end = time.time() print(end-start)封装进程对象:
from yangjiaProcess import YangjiaProcess if __name__ == '__main__': print('父进程启动') #创建子进程 p = YangjiaProcess('test') #自动调用p进程对象的run方法 p.start() p.join() print('父进程结束')yangjiaProcess:
from multiprocessing import Process import time,os class YangjiaProcess(Process): def __init__(self,name): Process.__init__(self) self.name = name def run(self): print('子进程(%s--%d)启动'%(self.name,os.getppid())) #子进程的功能 time.sleep(3) print('子进程(%s--%d)结束'%(self.name,os.getppid()))进程间通信:
from multiprocessing import Process,Queue import os,time def write(q): print('启动写子进程%d'%(os.getpid())) for chr in ['A','B','C','D']: q.put(chr) time.sleep(1) print('结束写子进程%d' % (os.getpid())) def read(q): print('启动读子进程%d' % (os.getpid())) while True: value = q.get(True) print('value='+value) print('结束读子进程%d' % (os.getpid())) if __name__ == '__main__': #父进程创建队列,并让子进程引用 q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() # pr进程里面是个死循环,无法等待其结束,只能强行结束 pr.terminate() print('父进程结束')在一个进程的内部,要同时干多件事,就需要运行多个‘子任务’,我们把进程内的这些‘子任务’叫做线程。
线程通常叫做轻型的进程,线程是共享内存空间,并发执行的多任务,每一个线程都共享一个进程的资源。
线程是最小的执行单元,进程至少由一个线程组成,如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。
线程的模块: _thread模块,是底层模块 threading模块,对_thread模块进行了封装,是高级模块 1、启动一个线程
#启动一个线程 import threading,time a=10 def run(num): print('子线程%s开始'%(threading.current_thread().name)) time.sleep(1) print('打印%d'%num) time.sleep(1) print(a) print('子线程%s结束'% (threading.current_thread().name)) if __name__ == '__main__': #任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程 #current_thread():返回当前线程的实例;.name:返回当前线程的名称 print('主线程%s启动'%(threading.current_thread().name)) #创建子线程-------------------线程的名称 t = threading.Thread(target=run,name='runThread',args=(1,)) t.start() #等待线程结束 t.join() print('主线程%s结束' % (threading.current_thread().name))2、线程间共享数据 多线程和多进程最大的不同在于,多进程中同一个变量各自有一份拷贝在每个进程中,互不影响,即进程间不共享变量,而多线程中所有变量都由所有线程共享,所以任何一个变量都可以被任意一个线程修改,因此线程之间共享数据最大的危险在于多个线程同时修改一个变量,容易把内容改乱。
import threading,time num = 100 def run(n): global num for i in range(1000000): num = num + n num = num - n if __name__ == '__main__': t1 = threading.Thread(target=run,args=(6,)) t2 = threading.Thread(target=run,args=(9,)) t1.start() t2.start() t1.join() t2.join() print('num=',num)3、进程锁解决数据混乱
import threading #创建锁对象 lock = threading.Lock() num = 100 def run(n): global num for i in range(10000): #加锁,确保了这段代码只能由一个线程从头到尾的完整执行,阻止了多线程的并发执行,包含锁的代码实际上只能按单线程模式执行,所以效率大大降低了。 #又可以存在多个锁,而且不同线程可以存在不同的锁,并试图获取其他的锁,可能造成死锁,导致多个线程挂起,只能靠操作系统强制终止。 ''' lock.acquire() try: num = num + n num = num - n finally: #修改完一定要释放锁 lock.release() ''' #与上面代码功能相同,with lock可以自动上锁和解锁 with lock: num = num + n num = num - n if __name__ == '__main__': t1 = threading.Thread(target=run,args=(6,)) t2 = threading.Thread(target=run,args=(9,)) t1.start() t2.start() t1.join() t2.join() print('num=',num)4、ThreadLocal
创建一个全局的ThreadLocal对象,每个线程有独立的存储空间,每个线程对ThreadLocal对象都可以读写,但互不影响。
作用:为每个线程绑定一个数据库链接、HTTP请求、用户身份信息等。这样一个线程的所有调用到的处理函数都可以非常方便的访问这些资源。
import threading num = 100 #创建一个全局的ThreadLocal对象,每个线程有独立的存储空间,每个线程对ThreadLocal对象都可以读写,但互不影响 local = threading.local() def run(x,n): x = x + n x = x - n def func(n): #每个线程都有local.x,就是线程的局部变量 local.x = num for i in range(10000): run(local.x,n) print('%s---%d'%(threading.current_thread().name,local.x)) if __name__ == '__main__': t1 = threading.Thread(target=func,args=(6,)) t2 = threading.Thread(target=func,args=(9,)) t1.start() t2.start() t1.join() t2.join() print('num=',num)到此为止可以改网络编程的TCP,写一个聊天室了。
5、信号量控制线程数量
import threading,time sem = threading.Semaphore(2)#控制一次执行几个线程 def run(): with sem: for i in range(10): print('%s--%d'%(threading.current_thread().name,i)) time.sleep(1) if __name__ == '__main__': for i in range(5): threading.Thread(target=run).start()6、凑够一定数量才能一起执行
import threading,time bar = threading.Barrier(3)#凑够一定数量才能一起执行 def run(): for i in range(5): print('%s-start-%d'%(threading.current_thread().name,i)) time.sleep(1) bar.wait() print('%s-end-%d' % (threading.current_thread().name, i)) if __name__ == '__main__': for i in range(5): threading.Thread(target=run).start()7、定时线程
import threading def run(): print('123456') t = threading.Timer(5,run)#5秒钟之后执行run t.start() t.join() print('父线程结束')8、线程通信
import threading,time def func(): #事件对象 event = threading.Event() def run(): for i in range(5): #阻塞,等待事件的触发 event.wait() #重置 event.clear() print('123456!%d'%i) t = threading.Thread(target=run).start() return event e = func() #触发事件 for i in range(5): time.sleep(1) e.set()9、生产者与消费者
import threading,time,queue,random #生产者生产数据放到队列里,消费者拿走数据,处理数据 #生产者 def product(id,q): while True: num = random.randint(0,10000) q.put(num) print('生产者%d生成了%d数据放入了队列'%(id,num)) time.sleep(2) #任务完成 q.task_done() #消费者 def customer(id,q): while True: item = q.get() if item is None: break print('消费者%d消费量%d数据'%(id,item)) time.sleep(2) if __name__ == '__main__': #消息队列 q = queue.Queue() #启动生产者 for i in range(4): threading.Thread(target=product,args=(i,q)).start() #启动消费者 for i in range(3): threading.Thread(target=customer,args=(i,q)).start()10、线程调度
import threading,time cond = threading.Condition()#线程条件变量 def run1(): with cond: for i in range(0,10,2): print(threading.current_thread().name,i) # time.sleep(1) cond.wait() cond.notify() def run2(): with cond: for i in range(1,10,2): print(threading.current_thread().name,i) # time.sleep(1) cond.notify() cond.wait() threading.Thread(target=run1).start() threading.Thread(target=run2).start()子程序(子函数):在所有语言中都是层级调用,是通过栈实现的,一个线程就是执行一个子程序,子程序的调用总是一个入口,一次返回,调用的顺序是明确的。
概述:协程看上去也是子程序,但执行过程中,在子程序的内部可以中断,然后转而执行别的子程序,不是函数调用。
与线程相比,协程的执行效率极高,因为只有一个线程,也不存在同时写变量的冲突,在协程中共享资源不加锁,只需要判断状态。
协成原理:python对协程的支持是通过generator实现的 协程的最简单风格,控制函数的阶段执行,节约线程或进程的切换,返回值是一个生成器。 1、协程原理
def run(): print(1) yield 10 print(2) yield 20 print(3) yield 30 m = run() print(next(m)) print(next(m)) print(next(m))2、数据传输
def run(): #空变量,存储的作用data始终为空 data = '' r = yield data print(1,r,data) r = yield data print(2,r,data) r = yield data print(3, r, data) r = yield data m = run() #启动m print(m.send(None)) print(m.send('a')) print(m.send('b')) print(m.send('c'))3、生产者和消费者
#生产者 def product(c): c.send(None) for i in range(5): print('生产者产生数据%d'%i) r = c.send(str(i)) print('消费者消费了数据%s'%r) c.close() #消费者 def customer(): data = '' while True: n = yield data if not n: return print('消费者消费了%s'%n) data = '200' c = customer() product(c)