asyncio底层基于selectors实现,看似库,其实就是个框架,包含异步IO、事件循环、协程、任务等内容。
事件循环是asyncio提供的核心运行机制。
方法解释asyncio.get_event_loop()返回一个事件循环对象,是asyncio.BaseEventLoop的实例AbstractEventLoop.stop()停止运行时间循环AbstractEventLoop.run_forever()一直运行,直到stop()AbstractEventLoop.run_until_complete(future)运行直至Future对象运行完,返回Future的结果。参数可以是Future类或子类Task的对象。如果是协程对象也会被封装成Task对象。AbstractEventLoop.close()关闭事件循环AbstractEventLoop.is_running()返回事件循环的是否运行AbstractEventLoop.close()关闭事件循环AbstractEventLoop.create_task(coro)使用协程对象创建任务对象和concurrent.futures.Future类似。通过Future对象可以了解任务执行的状态数据。 事件循环来监控Future对象是否完成。
Task类是Future的子类,它的作用就是把协程包装成一个Future对象。
3.4引入asyncio,使用装饰器,将生成器函数转换成协程函数,就可以在事件循环中执行了。
import asyncio @asyncio.coroutine def sleep1(x): #协程函数 for i in range(3): print('sleep - {}'.format(i)) yield from asyncio.sleep(x) return 'result = {}'.format(1000) #事件循环 loop = asyncio.get_event_loop() # 本质就是tasks的ensure_future,把协程包装进一个Future对象中,并使用create_task返回一个task future = asyncio.ensure_future(sleep1(3)) #内部会调用ensure_future,内部会执行loop.run_forever() loop.run_until_complete(future) print('*' * 50) loop.close() print(future.result()) # 拿result值 print('==============end==============') --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-3-145b7e33264d> in <module> 16 17 #内部会调用ensure_future,内部会执行loop.run_forever() ---> 18 loop.run_until_complete(future) 19 print(2, task) 20 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep - 0 sleep - 1 sleep - 2 import asyncio @asyncio.coroutine def sleep(x): #协程函数 for i in range(3): print('sleep - {}'.format(i)) yield from asyncio.sleep(x) return 'result = {}'.format(1000) #事件循环 loop = asyncio.get_event_loop() # 本质就是tasks的ensure_future,把协程包装进一个Future对象中,并使用create_task返回一个task task = loop.create_task(sleep(3)) print(1, task) #内部会调用ensure_future,内部会执行loop.run_forever() loop.run_until_complete(task) print(2, task) print('*' * 50) loop.close() print(task.result()) # 拿result值 print('==============end==============') 1 <Task pending coro=<sleep() running at <ipython-input-9-870aaa5b9778>:4>> --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-9-870aaa5b9778> in <module> 18 19 #内部会调用ensure_future,内部会执行loop.run_forever() ---> 20 loop.run_until_complete(task) 21 print(2, task) 22 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep - 0 sleep - 1 sleep - 2 import asyncio @asyncio.coroutine def sleep(x): for i in range(3): print('sleep - {}'.format(i)) yield from asyncio.sleep(x) return 'result = {}'.format(1000) # 回调函数,参数必须是future def cb(future): print('in callback. future = {}'.format(future)) print(future.result()) # 事件循环 loop = asyncio.get_event_loop() # create_task返回一个task task = loop.create_task(sleep(3)) task.add_done_callback(cb) # 增加回调 print(1, task) # 内部会调用ensure_future,内部会执行loop.run_forever() loop.run_until_complete(task) print(2, task) # finished print('=' * 50) loop.close() print(task.result()) print('+++++++++++++end++++++++++++') 1 <Task pending coro=<sleep() running at <ipython-input-10-807656e03433>:4> cb=[cb() at <ipython-input-10-807656e03433>:13]> --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-10-807656e03433> in <module> 24 25 # 内部会调用ensure_future,内部会执行loop.run_forever() ---> 26 loop.run_until_complete(task) 27 print(2, task) # finished 28 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep - 0 sleep - 1 sleep - 2 in callback. future = <Task finished coro=<sleep() done, defined at <ipython-input-10-807656e03433>:4> result='result = 1000'> result = 1000 import asyncio @asyncio.coroutine def a(): for x in range(3): print('a.x - {}'.format(x)) yield return 'a.finished' @asyncio.coroutine def b(): for x in range(3, 6): print('b.x - {}'.format(x)) yield return 'b.finished' print(asyncio.iscoroutinefunction(a), asyncio.iscoroutinefunction(b)) t1 = a() t2 = b() print(asyncio.iscoroutine(t1), asyncio.iscoroutine(t2)) #事件大循环 loop = asyncio.get_event_loop() tasks =[t1, t2] # asyncio.wait 会迭代列表中的对象并封装成{f1, f2}, 返回一个协程对象f # 循环执行f,它内部等价yield from {f1, f2} loop.run_until_complete(asyncio.wait(tasks)) loop.close() True True True True --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-18-1b39933dff8d> in <module> 27 # 循环执行f,它内部等价yield from {f1, f2} 28 ---> 29 loop.run_until_complete(asyncio.wait(tasks)) 30 loop.close() c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running a.x - 0 b.x - 3 a.x - 1 b.x - 4 a.x - 2 b.x - 5 import asyncio @asyncio.coroutine def a(): for x in range(3): print('a.x', x) yield return 'a.finished' def b(): for x in range(3): print('b.x', x) yield return 'b.finished' print(asyncio.iscoroutinefunction(a), asyncio.iscoroutinefunction(b)) loop = asyncio.get_event_loop() fs = set() for t in (a(), b()): f = asyncio.ensure_future(t) f.add_done_callback(lambda f: print(f.result())) # 单个完成就调用回调 fs.add(f) results = loop.run_until_complete(asyncio.wait(fs)) loop.close() print(results) True False --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-20-11b0d31e31fe> in <module> 25 fs.add(f) 26 ---> 27 results = loop.run_until_complete(asyncio.wait(fs)) 28 loop.close() 29 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running a.x 0 b.x 0 a.x 1 b.x 1 a.x 2 b.x 2 a.finished b.finished import asyncio async def sleep(x): for i in range(3): print('sleep --> {}'.format(i)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(sleep(3)) loop.close() --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-21-fdc2b2c3b101> in <module> 7 8 loop = asyncio.get_event_loop() ----> 9 loop.run_until_complete(sleep(3)) 10 loop.close() c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep --> 0 sleep --> 1 sleep --> 2 import asyncio @asyncio.coroutine def w(): yield async def a(): for x in range(3): print('a.x', x) await w() return 'a.finished' async def b(): for x in range(3): print('b.x', x) await w() return 'b.finished' print(asyncio.iscoroutinefunction(a), asyncio.iscoroutinefunction(b)) t1 = a() t2 = b() print(asyncio.iscoroutine(t1), asyncio.iscoroutine(t2)) def cb(future): print('in call back ++++++++++', future.result()) loop = asyncio.get_event_loop() tasks = [t1, t2] fs = set() for t in tasks: f = asyncio.ensure_future(t) f.add_done_callback(cb) fs.add(f) results = loop.run_until_complete(asyncio.wait(fs)) loop.close() print(results) True True True True --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-22-c337ba7871cd> in <module> 35 fs.add(f) 36 ---> 37 results = loop.run_until_complete(asyncio.wait(fs)) 38 loop.close() 39 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running a.x 0 b.x 0 a.x 1 b.x 1 a.x 2 b.x 2 in call back ++++++++++ a.finished in call back ++++++++++ b.finished异步HTTP客户端、服务端框架