还有这种操作?异步迭代器的切片操作!!!

mac2022-06-30  31

问题引出

对于同步中的迭代器我们可以使用itertools的islice模块来实现

# -*- coding: utf-8 -*- # @Time : 2019/1/2 11:52 AM # @Author : cxa # @File : 切片.py # @Software: PyCharm from itertools import islice la = (x for x in range(20)) print(type(la)) for item in islice(la, 5, 9): # 取下标5-9的元素 print(item)

输出

<class 'generator'> 5 6 7 8

那如何对异步生成器进行类似切片的操作呢?

问题产生

我在使用mongo的异步模块motor的使用,查询得到300万条数据,然后去进行操作,

async def get_data(): data=await get_detail_datas() return data

发现返回结果data为AsyncIOMotorCursor类型,查阅资料得知该类型属于async_generator也就是下面要说的异步生成器。

异步生成器的形式

针对上面的结果可以通过async for进行遍历获取结果

async def get_data(): data=await get_detail_datas() async for item in data: print(item)

但是问题来了,如何对异步迭代器进行切片操作呢。

aiostream

aiostream提供了一组流操作符,可以将它们组合在一起以创建异步操作管道. 它可以看作是itertools的异步版本,所以可以实现异步迭代器的切片功能

要求

python >= 3.6

安装

pip3 install aiostream

使用

import asyncio from aiostream import stream async def generate_numbers(n): for x in range(n): yield x async def consume_some_numbers(n, m): zs = stream.take(generate_numbers(n), m) t=await stream.list(zs) #返回切片的结果列表 print(t) # async with zs.stream() as streamer: #迭代 # # Asynchronous iteration # async for z in streamer: # # Print 1, 9, 25, 49 and 81 # print('->', z) async def get_data(): await consume_some_numbers(10, 5) loop = asyncio.get_event_loop() loop.run_until_complete(get_data())

这里使用了aiostream,和python自带的asyncio库, 首先定义异步函数generate_numbers,作用是生成迭代器,内部含有1-n。 consume_some_numbers是具体的实现方法,stream.take第一个参数可以传入一个异步迭代器函数,第二个参数是异步迭代器方法的参数。 然后函数get_data() 去调用了consume_some_numbers。 后面通过loop创建一个事件循环,然后等结果运行完毕。

分流

async def branch(coros, limit=10): index = 0 while True: xs = stream.preserve(coros) ys = xs[index:index + limit] t = await stream.list(ys) if not t: break await asyncio.ensure_future(asyncio.wait(t)) index += limit + 1

总结

这里主要说的是异步迭代器的用法,需要有一定的异步基础才能看懂。

参考资料 https://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.take

转载于:https://www.cnblogs.com/c-x-a/p/10208179.html

最新回复(0)