python 线程池爬取段子,解析库是BeautifulSoup,数据库是mongdb

mac2025-10-27  3

使用线程池,爬取段子,并将段子存入数据库。 -程序中使用了两个类,发送请求类和获取数据类,随机生成User_agent.看代码 import random import re import time from multiprocessing.dummy import Pool # 线程池 from queue import Queue # 线程间的通信 import requests from pymongo import MongoClient from bs4 import BeautifulSoup client = MongoClient(host="localhost",port=27017) col = client["artical"]["joke"] class ParseUrl(object): """发送请求获取相应""" def __init__(self): pass def parse(self,url,header=None): if not header: header = {"User-Agent":self.User_agent()} header["Referer"] = url response=requests.get(url,headers=header) bs = BeautifulSoup(response.text,features="lxml") return response,bs def User_agent(self): # 随机的User_agent first_num = random.randint(55, 62) third_num = random.randint(0, 3200) fourth_num = random.randint(0, 140) os_type = [ '(Windows NT 6.1; WOW64)', '(Windows NT 10.0; WOW64)', '(X11; Linux x86_64)', '(Macintosh; Intel Mac OS X 10_12_6)' ] chrome_version = 'Chrome/{}.0.{}.{}'.format(first_num, third_num, fourth_num) user_agent = ' '.join(['Mozilla/5.0', random.choice(os_type), 'AppleWebKit/537.36', '(KHTML, like Gecko)', chrome_version, 'Safari/537.36'] ) return user_agent class Joke_sipder(ParseUrl): """获取数据及保存"""" def __init__(self): ParseUrl.__init__(self) self.url = "http://duanziwang.com/" self.base_url = "http://duanziwang.com/page/{}/" self.pool = Pool(5) self.is_running = True self.total_requests_num = 0 self.total_response_num=0 self.queue = Queue() def get_parse(self,url): # 解析 _,bs=self.parse(url) article_list = bs.find_all("article",{"class":"post"}) for art in article_list: item = dict() item["title"]=art.find("h1",class_="post-title").get_text() item["time"] = art.find("time",datetime=re.compile(r"T")).get_text() item["hot"] = art.find("time",text=re.compile(r"°C")).get_text() item["praise"] = art.find("a",class_="post-like").get_text() item["content"] = art.find("div",class_="post-content").p.get_text() if art.find("div",class_="post-content").p else None self.save_content(item) self.total_response_num+=1 def get_next(self): # 获取url for i in range(1,16): self.queue.put(self.base_url.format(i)) self.total_requests_num += 1 def save_content(self,data): """保存数据""" col.insert_one(data) def exetute_save(self): url = self.queue.get() self.get_parse(url) def _callback(self,temp): if self.is_running: self.pool.apply_async(self.exetute_save,callback=self._callback) def main(self): self.get_next() for i in range(5): # 控制并发量 self.pool.apply_async(self.exetute_save,callback=self._callback) # 异步请求 while True: time.sleep(0.0001) if self.total_response_num >= self.total_requests_num: self.is_running=False break self.pool.close() # 关闭线程池,防止新的开启 if __name__ == '__main__': jkp=Joke_sipder() jkp.main()
最新回复(0)