Asyncio

mac2025-07-12  7

asyncio

asyncio底层基于selectors实现,看似库,其实就是个框架,包含异步IO、事件循环、协程、任务等内容。

多线程版本
import threading import time def a(): for i in range(3): time.sleep(0.2) print(i) def b(): for i in 'abc': time.sleep(0.2) print(i) threading.Thread(target=a, name='a').start() threading.Thread(target=b, name='b').start() 0 a 1 b 2 c
多进程版本
import multiprocessing import time def a(): for i in range(3): time.sleep(0.2) print(i) def b(): for i in 'abc': time.sleep(0.2) print(i) if __name__ == '__main__': multiprocessing.Process(target=a, name='a').start() multiprocessing.Process(target=b, name='b').start()
生成器版本
def a(): for i in range(3): time.sleep(0.2) print(i) yield def b(): for i in 'abc': time.sleep(0.2) print(i) yield x = a() y = b() for i in range(3): next(x) next(y) 0 a 1 b 2 c

事件循环

事件循环是asyncio提供的核心运行机制。

方法解释asyncio.get_event_loop()返回一个事件循环对象,是asyncio.BaseEventLoop的实例AbstractEventLoop.stop()停止运行时间循环AbstractEventLoop.run_forever()一直运行,直到stop()AbstractEventLoop.run_until_complete(future)运行直至Future对象运行完,返回Future的结果。参数可以是Future类或子类Task的对象。如果是协程对象也会被封装成Task对象。AbstractEventLoop.close()关闭事件循环AbstractEventLoop.is_running()返回事件循环的是否运行AbstractEventLoop.close()关闭事件循环AbstractEventLoop.create_task(coro)使用协程对象创建任务对象

协程

协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式。进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,自然也没有多线程切换带来的开销。协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度协程也不需要使用锁机制,因为是在同一个线程中执行多CPU下,可以使用多进程和协程配合,既能进程并发,又能发挥协程在单线程中的优势Python中协程是基于生成器的asyncio.iscorountine(obj)判断是不是协程对象asyncio.iscorountinefunction(func)判断是不是协程函数

Future

和concurrent.futures.Future类似。通过Future对象可以了解任务执行的状态数据。 事件循环来监控Future对象是否完成。

Task任务

Task类是Future的子类,它的作用就是把协程包装成一个Future对象。

协程的使用

3.4引入asyncio,使用装饰器,将生成器函数转换成协程函数,就可以在事件循环中执行了。

import asyncio @asyncio.coroutine def sleep1(x): #协程函数 for i in range(3): print('sleep - {}'.format(i)) yield from asyncio.sleep(x) return 'result = {}'.format(1000) #事件循环 loop = asyncio.get_event_loop() # 本质就是tasks的ensure_future,把协程包装进一个Future对象中,并使用create_task返回一个task future = asyncio.ensure_future(sleep1(3)) #内部会调用ensure_future,内部会执行loop.run_forever() loop.run_until_complete(future) print('*' * 50) loop.close() print(future.result()) # 拿result值 print('==============end==============') --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-3-145b7e33264d> in <module> 16 17 #内部会调用ensure_future,内部会执行loop.run_forever() ---> 18 loop.run_until_complete(future) 19 print(2, task) 20 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep - 0 sleep - 1 sleep - 2 import asyncio @asyncio.coroutine def sleep(x): #协程函数 for i in range(3): print('sleep - {}'.format(i)) yield from asyncio.sleep(x) return 'result = {}'.format(1000) #事件循环 loop = asyncio.get_event_loop() # 本质就是tasks的ensure_future,把协程包装进一个Future对象中,并使用create_task返回一个task task = loop.create_task(sleep(3)) print(1, task) #内部会调用ensure_future,内部会执行loop.run_forever() loop.run_until_complete(task) print(2, task) print('*' * 50) loop.close() print(task.result()) # 拿result值 print('==============end==============') 1 <Task pending coro=<sleep() running at <ipython-input-9-870aaa5b9778>:4>> --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-9-870aaa5b9778> in <module> 18 19 #内部会调用ensure_future,内部会执行loop.run_forever() ---> 20 loop.run_until_complete(task) 21 print(2, task) 22 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep - 0 sleep - 1 sleep - 2 import asyncio @asyncio.coroutine def sleep(x): for i in range(3): print('sleep - {}'.format(i)) yield from asyncio.sleep(x) return 'result = {}'.format(1000) # 回调函数,参数必须是future def cb(future): print('in callback. future = {}'.format(future)) print(future.result()) # 事件循环 loop = asyncio.get_event_loop() # create_task返回一个task task = loop.create_task(sleep(3)) task.add_done_callback(cb) # 增加回调 print(1, task) # 内部会调用ensure_future,内部会执行loop.run_forever() loop.run_until_complete(task) print(2, task) # finished print('=' * 50) loop.close() print(task.result()) print('+++++++++++++end++++++++++++') 1 <Task pending coro=<sleep() running at <ipython-input-10-807656e03433>:4> cb=[cb() at <ipython-input-10-807656e03433>:13]> --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-10-807656e03433> in <module> 24 25 # 内部会调用ensure_future,内部会执行loop.run_forever() ---> 26 loop.run_until_complete(task) 27 print(2, task) # finished 28 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep - 0 sleep - 1 sleep - 2 in callback. future = <Task finished coro=<sleep() done, defined at <ipython-input-10-807656e03433>:4> result='result = 1000'> result = 1000 import asyncio @asyncio.coroutine def a(): for x in range(3): print('a.x - {}'.format(x)) yield return 'a.finished' @asyncio.coroutine def b(): for x in range(3, 6): print('b.x - {}'.format(x)) yield return 'b.finished' print(asyncio.iscoroutinefunction(a), asyncio.iscoroutinefunction(b)) t1 = a() t2 = b() print(asyncio.iscoroutine(t1), asyncio.iscoroutine(t2)) #事件大循环 loop = asyncio.get_event_loop() tasks =[t1, t2] # asyncio.wait 会迭代列表中的对象并封装成{f1, f2}, 返回一个协程对象f # 循环执行f,它内部等价yield from {f1, f2} loop.run_until_complete(asyncio.wait(tasks)) loop.close() True True True True --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-18-1b39933dff8d> in <module> 27 # 循环执行f,它内部等价yield from {f1, f2} 28 ---> 29 loop.run_until_complete(asyncio.wait(tasks)) 30 loop.close() c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running a.x - 0 b.x - 3 a.x - 1 b.x - 4 a.x - 2 b.x - 5 import asyncio @asyncio.coroutine def a(): for x in range(3): print('a.x', x) yield return 'a.finished' def b(): for x in range(3): print('b.x', x) yield return 'b.finished' print(asyncio.iscoroutinefunction(a), asyncio.iscoroutinefunction(b)) loop = asyncio.get_event_loop() fs = set() for t in (a(), b()): f = asyncio.ensure_future(t) f.add_done_callback(lambda f: print(f.result())) # 单个完成就调用回调 fs.add(f) results = loop.run_until_complete(asyncio.wait(fs)) loop.close() print(results) True False --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-20-11b0d31e31fe> in <module> 25 fs.add(f) 26 ---> 27 results = loop.run_until_complete(asyncio.wait(fs)) 28 loop.close() 29 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running a.x 0 b.x 0 a.x 1 b.x 1 a.x 2 b.x 2 a.finished b.finished import asyncio async def sleep(x): for i in range(3): print('sleep --> {}'.format(i)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(sleep(3)) loop.close() --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-21-fdc2b2c3b101> in <module> 7 8 loop = asyncio.get_event_loop() ----> 9 loop.run_until_complete(sleep(3)) 10 loop.close() c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running sleep --> 0 sleep --> 1 sleep --> 2 import asyncio @asyncio.coroutine def w(): yield async def a(): for x in range(3): print('a.x', x) await w() return 'a.finished' async def b(): for x in range(3): print('b.x', x) await w() return 'b.finished' print(asyncio.iscoroutinefunction(a), asyncio.iscoroutinefunction(b)) t1 = a() t2 = b() print(asyncio.iscoroutine(t1), asyncio.iscoroutine(t2)) def cb(future): print('in call back ++++++++++', future.result()) loop = asyncio.get_event_loop() tasks = [t1, t2] fs = set() for t in tasks: f = asyncio.ensure_future(t) f.add_done_callback(cb) fs.add(f) results = loop.run_until_complete(asyncio.wait(fs)) loop.close() print(results) True True True True --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-22-c337ba7871cd> in <module> 35 fs.add(f) 36 ---> 37 results = loop.run_until_complete(asyncio.wait(fs)) 38 loop.close() 39 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running a.x 0 b.x 0 a.x 1 b.x 1 a.x 2 b.x 2 in call back ++++++++++ a.finished in call back ++++++++++ b.finished

TCP Echo Server举例

import asyncio # 处理的回调 from asyncio.streams import StreamReader, StreamWriter async def handle(reader:StreamReader, writer:StreamWriter): while True: # 和socketserver类似,一个连接一个handle data = await reader.read(1024) if not data or data == b'quit': print('quit!!!!!!!!!!!!!!!!') break print(type(reader), type(writer)) client = writer.get_extra_info('peername') message = '{} your msg --> {}'.format(client, data.decode()).encode() writer.write(message) await writer.drain() # 注意不是flush方法 writer.close() loop = asyncio.get_event_loop() ip = '127.0.0.1' port = 9999 # 返回一个协程对象 coro = asyncio.start_server(handle, ip, port, loop=loop) print(asyncio.iscoroutine(coro), coro) print('*' * 50) server = loop.run_until_complete(coro) # 返回server print(server) try: loop.run_forever() except KeyboardInterrupt: pass finally: server.close() loop.run_until_complete(server.wait_closed()) loop.close() True <generator object start_server at 0x000001808F9E0C50> ************************************************** --------------------------------------------------------------------------- RuntimeError Traceback (most recent call last) <ipython-input-23-f9b193695d05> in <module> 27 28 print('*' * 50) ---> 29 server = loop.run_until_complete(coro) # 返回server 30 print(server) 31 c:\programdata\miniconda3\lib\asyncio\base_events.py in run_until_complete(self, future) 453 future.add_done_callback(_run_until_complete_cb) 454 try: --> 455 self.run_forever() 456 except: 457 if new_task and future.done() and not future.cancelled(): c:\programdata\miniconda3\lib\asyncio\base_events.py in run_forever(self) 407 self._check_closed() 408 if self.is_running(): --> 409 raise RuntimeError('This event loop is already running') 410 if events._get_running_loop() is not None: 411 raise RuntimeError( RuntimeError: This event loop is already running

aiohttp库

异步HTTP客户端、服务端框架

HTTP Server

from aiohttp import web asyncio def indexhandle(request:web.Request): return web.Response(text=request.path, status=201) asyncio def handle(request:web.Request): print(request.match_info) print(request.query_string) # http://127.0.0.1:8080/1?name=12314 return web.Response(text=request.match_info.get('id', '0000'), status=200) app = web.Application() app.router.add_get('/', indexhandle) app.router.add_get('/{id}', handle) web.run_app(app, host='0.0.0.0', port=9966)

HTTP Client

import asyncio from aiohttp import ClientSession async def get_html(url:str): async with ClientSession() as session: async with session.get(url) as res: print(res.status) print(await res.text()) url = 'http://127.0.0.1:8080/2?age=20&name=jerry' loop = asyncio.get_event_loop() loop.run_until_complete(get_html(url)) loop.close()
最新回复(0)