aiohttp分流处理

mac2022-06-30  26

# -*- coding: utf-8 -*- # @Time : 2018/12/26 9:55 PM # @Author : cxa # @Software: PyCharm import asyncio import aiohttp from db.mongohelper import save_data import hashlib import pathlib import ujson from logger.log import crawler from utils import proxy_helper from retrying import retry from itertools import islice try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: pass sem = asyncio.Semaphore(1000) url = "https://xxx.xxx.com" @retry(stop_max_attempt_number=5) def get_proxy(): proxy = proxy_helper.get_proxy() host = proxy.get('ip') port = proxy.get('port') ip = f"http://{host}:{port}" return ip async def fetch(item, session, proxy, retry_index=0): try: name = item sf = get_md5(name) data = {"kw": name, "signinfo": sf} async with session.post(url, data=data, proxy=proxy, verify_ssl=False) as req: res_status = req.status if res_status == 200: data = ujson.loads(await req.text()) searchdata = data.get("searchResult") if searchdata: await save_data(searchdata) else: crawler.info(f'<search_name: {name}>, data: {data},') except IndexError as e: print(f"<出错时候的数据:{seq}>,<原因: e>") except Exception as e: data = None crawler.error(f"<Error: {url} {str(e)}>") if not data: crawler.info(f'<Retry url: {url}>, Retry times: {retry_index+1}') retry_index += 1 proxy = get_proxy() return await fetch(item, session, proxy, retry_index) async def bound_fetch(item, session, proxy): async with sem: await fetch(item, session, proxy) async def print_when_done(tasks): [await _ for _ in limited_as_completed(tasks, 2000)] async def run(data): async with aiohttp.ClientSession() as session: proxy = get_proxy() coros = (asyncio.ensure_future(bound_fetch(item, session, proxy)) for item in data) await print_when_done(coros) def limited_as_completed(coros, limit): futures = [ asyncio.ensure_future(c) for c in islice(coros, 0, limit) ] async def first_to_finish(): while True: await asyncio.sleep(0.01) for f in futures: if f.done(): futures.remove(f) try: newf = next(coros) futures.append( asyncio.ensure_future(newf)) except StopIteration as e: pass return f.result() while len(futures) > 0: yield first_to_finish() def get_use_list(): fname = pathlib.Path.joinpath(pathlib.Path.cwd(), "namelist.txt") with open(fname, encoding='utf-8') as fs: data = (i.strip() for i in fs.readlines()) return data def get_md5(key): m = hashlib.md5() m.update(f'{key}0jjj890j0369dce05f9'.encode('utf-8')) a = m.hexdigest() return a if __name__ == '__main__': crawler.info("开始下载") data = get_use_list() loop = asyncio.get_event_loop() loop.run_until_complete(run(data)) loop.close()

转载于:https://www.cnblogs.com/c-x-a/p/10190558.html

相关资源:JAVA上百实例源码以及开源项目
最新回复(0)