爬虫遇到用时间戳作为翻页参数的网站怎么办

mac2024-04-16  55

今天在爬蓝鲸财经的新闻数据的时候,遇到了一个难题,如下api:

https://app.lanjinger.com/news/waterfall?type=6&marked=0&last_time=1572056322000&refresh_type=1

我发现里面控制翻页的参数是last_time,这是个啥东西?感觉它有点像时间戳诶,我就用时间戳转换工具试了一下,发现把它最后三个0去掉就是一个时间戳。 可是知道他是个时间戳又有什么用呢,依靠时间戳翻页我觉得好神奇,网上也查不到资料,这可难坏了我。正确的方向应该是想这个时间戳是哪来的,怎么靠这个时间戳就可以控制翻页,在这种思路下,很自然的想到了这个时间戳可能是具体新闻的时间戳,根据这个新闻的时间戳在后端数据库中取出最近的固定条数的新闻信息。结果被我一验证还真是,那这个问题到这边就完美解决了。 直接把我的代码扔出来把

# -*-coding:utf8 -*- """ uonxhou 这是一个蓝鲸财经的爬虫 """ import re import time from queue import Queue import requests from bs4 import BeautifulSoup from newspaper import Article from newspaper import Config from threading import Thread as Task from common import api class LanJinger: def __init__(self, raw_url): self.web_name = "蓝鲸财经" self.raw_url = raw_url # 创建 url 存储容器 self.url_queue = Queue() # 创建 html 存储容器 self.news_link_queue = Queue() # 创建 数据 存储容器 self.data_queue = Queue() def get_url_list(self): try: url1 = self.raw_url tpye_number = re.search(r'\/(\d+)\/', url1).group(1) except Exception as e: print(e) print(url1) html = requests.get(url1).text soup = BeautifulSoup(html, 'lxml') a_li = soup.find('div', attrs={'telegraph_wrap'}).find_all('a') # 往第二个容器中放入一个字典,包含具体新闻链接和时间戳 for item in a_li: try: print(item['href']) pattern = re.compile(r'ctime="(\d+)"') publish_time = pattern.search(str(item)).group(1) dict_url_timestamp = {'url': item['href'], 'time': publish_time} self.news_link_queue.put(dict_url_timestamp) except Exception as e: # 用来捕捉 正则匹配时间戳 遇到广告的时候报错 pass # 找到最后一条新闻的时间戳用来翻页 last_a = str(a_li[-1]) pattern = re.compile(r'data-last_time="(\d+)"') last_time = pattern.search(last_a).group(1) while True: api = "https://app.lanjinger.com/news/waterfall?type={}&marked=0&last_time={}&refresh_type=1" \ .format(tpye_number, last_time) print(api) self.url_queue.put(api) json_dict = requests.get(api).json() print(json_dict['data']['list'][-1]['last_time']) last_time = json_dict['data']['list'][-1]['last_time'] def get_news_link_list(self): """ 消费第一个容器中的数据,生产第二个容器中的数据 :return: """ while True: url = self.url_queue.get() try: response = requests.get(url).json() li = response['data']['list'] for item in li: dict_url_timestamp = {'url': item['url'], 'time': item['ctime']} self.news_link_queue.put(dict_url_timestamp) # 往第二个容器中放入一个字典,包含具体新闻链接和时间戳 self.url_queue.task_done() print("############第1个容器剩余任务数:", self.url_queue.unfinished_tasks) except Exception as e: self.url_queue.task_done() print("生成数据的时候出错拉!!!!!!!!错误时:", e) print("错误的url:", url) print("############第1个容器剩余任务数:", self.url_queue.unfinished_tasks) continue def generate_upload_data(self): """ 消费第二个容器中存储的新闻url,生产数据存入第三个容器 :return: """ while True: dict_url_timestamp = self.news_link_queue.get() url = dict_url_timestamp['url'] try: # 获取publish_time,把时间戳转换为格式化字符串 timestamp = int(1572428072) _ = time.localtime(timestamp) publish_time = time.strftime("%Y-%m-%d %H:%M:%S", _) # 初始化newspaper user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 ' \ '(KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36' config = Config() config.browser_user_agent = user_agent article = Article(url, config=config, language='zh') article.download() article.parse() # 利用newspaper对象拼接数据 dict_item = {"content_info": {}} dict_item["content_info"]["source_domain_url"] = "app.lanjinger.com" dict_item["content_info"]["publish_time"] = publish_time dict_item["content_info"]["source_website_name"] = "lanjinger" dict_item["content_info"]["source_website_name_cn"] = "蓝鲸财经" dict_item["content_info"]["title_en"] = article.title dict_item["content_info"]["types"] = "news" dict_item["content_info"]["source_full_destination_url"] = url.strip() dict_item["content_info"]["content_en"] = article.text # 把拼好的数据存入第三个容器中 self.data_queue.put(dict_item) self.news_link_queue.task_done() print("############################################第2个容器剩余任务数:", self.news_link_queue.unfinished_tasks) except Exception as e: self.news_link_queue.task_done() print("生成数据的时候出错拉!!!!!!!!错误时:", e) print("############################################第2个容器剩余任务数:", self.news_link_queue.unfinished_tasks) continue def upload_data(self): """ 消费第三个容器中存储的新闻数据,保存进数据库 :return: """ while True: json_data = self.data_queue.get() # 上传拼接好的数据到数据库 try: outside_api_caller = api.IntellifeedAPICaller() response = outside_api_caller.new_content(json_data) if response and response.status_code in [200, 201]: print('{%s scrawl Successfully, status_code is %s }' % ( self.web_name, response.status_code)) elif response.status_code == 400: print(' 400 error pls, check the format ') outside_api_caller.send_slack_notification('[ %s Scrawl Process Fail, ' 'the status code is %s ]' % ( self.web_name, response.status_code)) elif response.status_code == 500: error_info = '500 error message, website name is'.format(self.web_name) outside_api_caller.send_slack_notification(error_info) else: outside_api_caller.send_slack_notification('[ %s Scrawl Process Fail, ' 'the status code is %s ]' % ( self.web_name, response.status_code)) except Exception as e: outside_api_caller.send_slack_notification( '[ %s Scrawl Process Exception ], the error code is %s' % (self.web_name, e)) else: outside_api_caller.send_slack_notification( '[ %s Scrawl Process Success ENDING ]' % self.web_name) self.data_queue.task_done() print("###############################################第3个容器剩余任务数:", self.data_queue.unfinished_tasks) def run(self): # 定义所有的任务列表 tasks = [] # 生产 URL 任务放在子线程上运行 get_url_list_task = Task(target=self.get_url_list) tasks.append(get_url_list_task) # 消费 URL 生产新闻链接 放在子线程上运行 for i in range(2): get_news_link_list_task = Task(target=self.get_news_link_list) tasks.append(get_news_link_list_task) # 消费 新闻链接, 生产拼接好的数据 放在子线程上运行 for i in range(8): generate_upload_data_task = Task(target=self.generate_upload_data) tasks.append(generate_upload_data_task) # 消费数据的任务 for i in range(8): upload_data_task = Task(target=self.upload_data) tasks.append(upload_data_task) # 让所有子线程启动并且以守护线程运行 for task in tasks: task.setDaemon(True) task.start() # 保证子线程必须运行 time.sleep(2) # 设置主线程退出条件 for queue in [self.url_queue, self.news_link_queue, self.data_queue]: queue.join() pass if __name__ == '__main__': url_list = [ "https://app.lanjinger.com/news/21/", # "https://app.lanjinger.com/news/6/", # "https://app.lanjinger.com/news/8/", # "https://app.lanjinger.com/news/25/", # "https://app.lanjinger.com/news/22/", # "https://app.lanjinger.com/news/10/", # "https://app.lanjinger.com/news/13/", # "https://app.lanjinger.com/news/12/", # "https://app.lanjinger.com/news/16/", # "https://app.lanjinger.com/news/19/", # "https://app.lanjinger.com/news/15/", # "https://app.lanjinger.com/news/20/" ] for url in url_list: spider = LanJinger(url) spider.run()
最新回复(0)