流畅python学习笔记:第十七章:并发处理

mac2022-06-30  71

第十七章:并发处理

本章主要讨论Python3引入的concurrent.futures模块。在python2.7中需要用pip install futures来安装。concurrent.futures 是python3新增加的一个库,用于并发处理,提供了多线程和多进程的并发功能 类似于其他语言里的线程池(也有一个进程池),他属于上层的封装,对于用户来说,不用在考虑那么多东西了。

使用方法: 1 Executor:两个子类ThreadPoolExecutor和ProcessPoolExecutor分别是线程和进程 submit(fn,*args,**kwargs): fn是需要异步执行的函数,args,kwargs为给函数传递的参数

2 map(func, *iterables, timeout=None)  此map函数和Python自带的map函数功能类似,只不过concurrent模块的map函数从迭代器获得参数后异步执行。并且,每一个异步操作,能用timeout参数来设置超时时间,timeout的值可以是int或float型,如果操作timeout的话,会raisesTimeoutError。如果timeout参数不指定的话,则不设置超时间。  func:为需要异步执行的函数  iterables:可以是一个能迭代的对象,例如列表等。每一次func执行,会从iterables中取参数。  timeout:设置每次异步操作的超时时间 

 

3 Future: Future实例是由Executor.submit()创建的。Future提供了丰富的方法来处理调用。

Future.cancel: 用cancel(),可以终止某个线程和进程的任务,返回状态为 True False

Future.cancelled():判断是否真的结束了任务。

Future.running():判断是否还在运行

Future.done():判断是正常执行完毕的。

Future.result(timeout=None): 针对result结果做超时的控制。

4 Wait: wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。三个参数的意义分别如下:

FIRST_COMPLETED - Return when any future finishes or is                   cancelled. FIRST_EXCEPTION - Return when any future finishes by raising an                   exception. If no future raises an exception                   then it is equivalent to ALL_COMPLETED. ALL_COMPLETED -   Return when all futures finish or are cancelled.

 

下面来看一个实际的例子:

def caculate_value_by_wait(x):     time.sleep(1)     print 'The value of x*x=%d' % (x*x)if __name__=="__main__":     num=[1,2,3,4,5,6]     start_time=time.clock()     for n in num:         caculate_value_by_wait(n)     (1)     print 'The toal time is %d' % (time.clock()-start_time)     start_time1=time.clock()     with futures.ThreadPoolExecutor(max_workers=6) as executor: (2)         for n in num:             executor.submit(caculate_value_by_wait,n)     print 'Thread pool consume time is %d' % (time.clock()-start_time1)     start_time2=time.clock()     with futures.ProcessPoolExecutor(max_workers=6) as executor: (3)         for n in num:             executor.submit(caculate_value_by_wait,n)     print 'Process pool consume time is %d' % (time.clock()-start_time2)

 

在这个例子中,分别用线性,多线程和多进程执行了caculate_value_by_wait。执行结果如下:在caculate_value_by_wait中每一次操作都会等待1秒。因此线性的执行总的时间为6秒。而多线程和多进程执行则总共耗时1秒

E:\python2.7.11\python.exe E:/py_prj/fluent_python/chapter17.py

The value of x*x=1

The value of x*x=4

The value of x*x=9

The value of x*x=16

The value of x*x=25

The value of x*x=36

The toal time is 6

The value of x*x=4

The value of x*x=1

The value of x*x=9

The value of x*x=16

The value of x*x=25The value of x*x=36

Thread pool consume time is 1

The value of x*x=1

The value of x*x=4

The value of x*x=9

The value of x*x=16

The value of x*x=25

The value of x*x=36

Process pool consume time is 1

如果是用map函数来改造的话,可以写成如下:

with futures.ProcessPoolExecutor(max_workers=6) as executor:     executor.map(caculate_value_by_wait,num)   在上面的多线程或者多进程中,我们还可以进一步对每个线程进行监控。方法就是用Future。代码如下 def caculate_value_by_wait(x):     time.sleep(1)     return x*xif __name__=="__main__":     num=[1,2,3,4,5,6]     with futures.ThreadPoolExecutor(max_workers=6) as executor:         future_task=[executor.submit(caculate_value_by_wait,n) for n in num]   (1)         for f in future_task:             if f.running():    (2)                 print '%s is running' % str(f)         for f in as_completed(future_task):   (3)             try:                 ret=f.done()                 (4)                 if ret:                     f_ret=f.result()          (5)                     print '%s done,result is %s' % (str(f),str(f_ret))             except BaseException,e:                 f.cancel()                 print e (1)  future_task得到所有运行的实例对象 (2)  判断线程是否在运行 (3)  得到完成线程的列表 (4)  判断是否真的完成,是返回True,否则返回False (5)  得到各个线程返回的对象 得到的结果如下: E:\python2.7.11\python.exe E:/py_prj/fluent_python/chapter17.py <Future at 0x17cfed0 state=running> is running <Future at 0x17d9050 state=running> is running <Future at 0x17d9210 state=running> is running <Future at 0x17d93d0 state=running> is running <Future at 0x17d9590 state=running> is running <Future at 0x17d9750 state=running> is running <Future at 0x17d9210 state=finished returned int> done,result is 9 <Future at 0x17cfed0 state=finished returned int> done,result is 1 <Future at 0x17d93d0 state=finished returned int> done,result is 16 <Future at 0x17d9050 state=finished returned int> done,result is 4 <Future at 0x17d9750 state=finished returned int> done,result is 36 <Future at 0x17d9590 state=finished returned int> done,result is 25

 

再来看下wait的用法:

if __name__=="__main__":     num=[1,2,3,4,5,6]     with futures.ThreadPoolExecutor(max_workers=6) as executor:         future_task=[executor.submit(caculate_value_by_wait,n) for n in num]         for f in future_task:             if f.running():                 print '%s is running' % str(f)         results=wait(future_task)   (1)         done=results[0]   (2)         not_done=results[1]     (3)         print 'The threads that have finished %s' % done         print 'The threads that not have finished %s' % not_done         for x in done:             print x         for y in not_done:             print y

(1)    得到所有的线程

(2)    得到已完成的线程

(3)    得到未完成的线程

运行结果如下:

E:\python2.7.11\python.exe E:/py_prj/fluent_python/chapter17.py

<Future at 0x177def0 state=running> is running

<Future at 0x1788070 state=running> is running

<Future at 0x1788230 state=running> is running

<Future at 0x17883f0 state=running> is running

<Future at 0x17885b0 state=running> is running

<Future at 0x1788770 state=running> is running

The threads that have finished set([<Future at 0x1788230 state=finished returned int>, <Future at 0x1788070 state=finished returned int>, <Future at 0x177def0 state=finished returned int>, <Future at 0x1788770 state=finished returned int>, <Future at 0x17885b0 state=finished returned int>, <Future at 0x17883f0 state=finished returned int>])

The threads that not have finished set([])

<Future at 0x1788230 state=finished returned int>

<Future at 0x1788070 state=finished returned int>

<Future at 0x177def0 state=finished returned int>

<Future at 0x1788770 state=finished returned int>

<Future at 0x17885b0 state=finished returned int>

<Future at 0x17883f0 state=finished returned int>

转载于:https://www.cnblogs.com/zhanghongfeng/p/7367937.html

最新回复(0)