一个python爬虫工具类

mac2022-06-30  31

写了一个爬虫工具类。

# -*- coding: utf-8 -*- # @Time : 2018/8/7 16:29 # @Author : cxa # @File : utils.py # @Software: PyCharm from retrying import retry from decorators.decorators import decorator, parse_decorator from glom import glom from config import headers import datetime import hashlib from tomorrow import threads from requests_html import HTMLSession try: import simplejson as json except ImportError: import json class MetaSingleton(type): _inst = {} def __call__(cls, *args, **kwargs): if cls not in cls._inst: cls._inst[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs) return cls._inst[cls] class Get_Proxies(metaclass=MetaSingleton): ip = None def getproxy(self, change_proxy): if self.ip is None: self.ip = self.get_ip(HTMLSession()) self.proxies = { 'http': self.ip, 'https': self.ip } if change_proxy: self.ip = self.get_ip(HTMLSession()) self.proxies = { 'http': self.ip, 'https': self.ip } return self.proxies def get_ip(self, session): url = 'ip' req = session.get(url) if req.status_code == 200: jsonstr = req.json() isok = glom(jsonstr, "resCode") if isok == "0000": key = glom(jsonstr, ('reData', ['key']))[0] uname = glom(jsonstr, ('reData', ['username']))[0] passwd = glom(jsonstr, ('reData', ['password']))[0] proxies = f"http://{uname}:{passwd}@{key}" return proxies @retry(stop_max_attempt_number=5, wait_random_min=3000, wait_random_max=7000) @decorator def post_html(session, post_url: int, post_data: dict, headers=headers, timeout=30): ''' :param session: 传入session对象 :param post_url: post请求需要的url :param headers: 报头信息,config模块默认提供 :param post_data: post信息 字典类型 :param timeout: :return: ''' post_req = session.post(url=post_url, headers=headers, data=post_data, timeout=timeout, proxies=get_proxies()) if post_req.status_code == 200: post_req.encoding = post_req.apparent_encoding # time.sleep(random.randint(1, 3)) return post_req # 随机等待1-3s @retry(stop_max_attempt_number=5, wait_random_min=3000, wait_random_max=7000) @decorator def get_response(session, url: str, params=None, headers=headers, timeout=10): ''' 获取response :param url:链接 :return: return response object ''' try: req = session.get(url=url, headers=headers, params=params, timeout=timeout, proxies=get_proxies()) except: req = session.get(url=url, headers=headers, params=params, timeout=timeout, proxies=get_proxies(True)) if req.status_code == 200: req.encoding = req.apparent_encoding # time.sleep(random.randint(1, 3)) return req # 随机等待1-3s @decorator def get_html(req): ''' 获取html类型的网页格式 :param req: :return: ''' source = req.text return source @decorator def get_json(req): ''' 获取json类型的网页格式 :param req: response对象 :return: ''' try: jsonstr = req.json() except: source = get_html(req) if source.endswith(';'): jsonstr = json.loads(source.replace(';', '')) return jsonstr @parse_decorator(None) def get_xpath(req, xpathstr: str): ''' xpath操作获取节点 :param req:response对象 :param xpathstr: :return: ''' node = req.html.xpath(xpathstr) return node @decorator def get_link(node): ''' 获取当前节点的链接 :param req:response对象 :return:返回绝对链接 ''' return list(node.absolute_links)[0] @parse_decorator(None) def get_text(node): ''' 获取当前节点下的文本 :param req:response对象 :param xpathstr:xpath表达式 :return: ''' return node.text @parse_decorator(None) def get_all_text(node): ''' 获取该节点包括其子节点下的所有文本 :param req:response对象 :param xpathstr:xpath表达式 :return: ''' if isinstance(node, list): return node[0].full_text else: return node.full_text @decorator def get_json_data(jsonstr: str, pat: str): ''' #通过glom模块操作数据 :param jsonstr:json字符串 :param pat:模板 :return: ''' item = glom(jsonstr, pat) return item @decorator def get_hash_code(key): ''' 获取字符串hash值,md5加密 :param key: :return: ''' value = hashlib.md5(key.encode('utf-8')).hexdigest() return value @parse_decorator(None) def get_next_node(node, xpathstr): ''' 当前节点下面操作xpath :param node: 节点 :param xpathstr: xpath表达式 :return: ''' next_node = node[0].xpath(xpathstr) if next_node: return next_node @decorator def get_datetime_from_unix(unix_time): ''' 时间戳转时间格式 :param unix_time: :return: ''' unix_time_value = unix_time if not isinstance(unix_time_value, int): unix_time_value = int(unix_time) new_datetime = datetime.datetime.fromtimestamp(unix_time_value) return new_datetime def get_proxies(change_proxy=False): ip = Get_Proxies().getproxy(change_proxy) return ip @decorator @threads(20) @retry(stop_max_attempt_number=5) def async_get_response(session, url: str, headers=headers, timeout=10): ''' 获取response :param url:链接 :return: return response object ''' try: req = session.get(url=url, headers=headers, timeout=timeout, proxies=get_proxies()) except: req = session.get(url=url, headers=headers, timeout=timeout, proxies=get_proxies(True)) # if req.status_code==200: # req.encoding=req.apparent_encoding # #time.sleep(random.randint(1, 3)) return req if __name__ == '__main__': print(get_proxies())

以下是headers文件的内容

import random first_num = random.randint(55, 62) third_num = random.randint(0, 3200) fourth_num = random.randint(0, 140) class FakeChromeUA: os_type = [ '(Windows NT 6.1; WOW64)', '(Windows NT 10.0; WOW64)', '(X11; Linux x86_64)', '(Macintosh; Intel Mac OS X 10_12_6)' ] chrome_version = 'Chrome/{}.0.{}.{}'.format(first_num, third_num, fourth_num) @classmethod def get_ua(cls): return ' '.join(['Mozilla/5.0', random.choice(cls.os_type), 'AppleWebKit/537.36', '(KHTML, like Gecko)', cls.chrome_version, 'Safari/537.36'] ) headers = { 'User-Agent': FakeChromeUA.get_ua(), 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Connection': 'keep-alive' }

以下是logger文件的内容

# -*- coding: utf-8 -*- import os import time import logging import sys log_dir1=os.path.join(os.path.dirname(os.path.dirname(__file__)),"logs") today = time.strftime('%Y%m%d', time.localtime(time.time())) full_path=os.path.join(log_dir1,today) if not os.path.exists(full_path): os.makedirs(full_path) log_path=os.path.join(full_path,"t.log") def get_logger(): # 获取logger实例,如果参数为空则返回root logger logger = logging.getLogger("t") if not logger.handlers: # 指定logger输出格式 formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s') # 文件日志 file_handler = logging.FileHandler(log_path,encoding="utf8") file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式 # 控制台日志 console_handler = logging.StreamHandler(sys.stdout) console_handler.formatter = formatter # 也可以直接给formatter赋值 # 为logger添加的日志处理器 logger.addHandler(file_handler) logger.addHandler(console_handler) # 指定日志的最低输出级别,默认为WARN级别 logger.setLevel(logging.INFO) # 添加下面一句,在记录日志之后移除句柄 return logger

转载于:https://www.cnblogs.com/c-x-a/p/9438587.html

最新回复(0)