一款比Selenium更高效的利器 Pyppeteer

mac2024-03-15  27

GitHub地址是:https://miyakogi.github.io/pyppeteer 参考链接:https://www.jianshu.com/p/611ed6b75d47 报错信息参考:https://segmentfault.com/a/1190000018873537

使用pip install pyppeteer命令就能完成pyppeteer库的安装 至于chromium浏览器,只需要一条pyppeteer-install命令就会自动下载对应的最新版本chromium浏览器到pyppeteer的默认位置

注意:pyppeteer是基于asyncio构建的,所以在使用的时候需要用到async/await结构。

pyppeteer使用代理

import asyncio from pyppeteer import launch async def register(): browser = await launch({ 'headless': False, # 代理ip 'args': ['--proxy-server=47.105.111.124:15525', ] }) page = await browser.newPage() # 设置代理ip验证 await page.authenticate({ 'username': '17475066', 'password': '1icmm7xk' }); await page.setJavaScriptEnabled(enabled=True) await page.goto('http://www.ip138.com/') time.sleep(300) asyncio.get_event_loop().run_until_complete(register())

用今日头条练习一下

import asyncio from pyppeteer import launch async def main(): # headless参数设为False,则变成有头模式 browser = await launch( # headless=False ) page = await browser.newPage() # 设置页面视图大小 await page.setViewport(viewport={'width':1280, 'height':800}) # 是否启用JS,enabled设为False,则无渲染效果 await page.setJavaScriptEnabled(enabled=True) await page.goto('https://www.toutiao.com/') # 打印页面cookies print(await page.cookies()) # 打印页面文本 print(await page.content()) # 打印当前页标题 print(await page.title()) # 抓取新闻标题 title_elements = await page.xpath('//div[@class="title-box"]/a') for item in title_elements: # 获取文本 title_str = await (await item.getProperty('textContent')).jsonValue() print(await item.getProperty('textContent')) # 获取链接 title_link = await (await item.getProperty('href')).jsonValue() print(title_str) print(title_link) # 关闭浏览器 await browser.close() asyncio.get_event_loop().run_until_complete(main())

与百度首页交互

import time import asyncio from pyppeteer import launch async def main(): browser = await launch(headless=False) page = await browser.newPage() await page.setViewport({'width': 1200, 'height': 800}) await page.goto('https://www.baidu.com') # 在搜索框中输入python await page.type('input#kw.s_ipt','python') # 点击搜索按钮 await page.click('input#su') # 等待元素加载,第一种方法,强行等待5秒 # await asyncio.sleep(5) # 第二种方法,在while循环里强行查询某元素进行等待 while not await page.querySelector('.t'): pass # 滚动到页面底部 await page.evaluate('window.scrollBy(0, window.innerHeight)') # 这些等待方法都不好用 # await page.waitForXPath('h3', timeout=300) # await page.waitForNavigation(waitUntil="networkidle0") # await page.waitForFunction('document.getElementByTag("h3")') # await page.waitForSelector('.t') # await page.waitFor('document.querySelector("#t")') # await page.waitForNavigation(waitUntil='networkidle0') # await page.waitForFunction('document.querySelector("").inner‌​Text.length == 7') title_elements = await page.xpath('//h3[contains(@class,"t")]/a') for item in title_elements: title_str = await (await item.getProperty('textContent')).jsonValue() print(title_str) await browser.close() asyncio.get_event_loop().run_until_complete(main())

官方两个示例

# 1 打开一个网页并做截图 # 首次运行示例时,pyppeteer会自动下载对应操作系统的chromium import asyncio from pyppeteer import launch async def main(): browser = await launch() page = await browser.newPage() await page.goto('http://example.com') await page.screenshot({'path': 'example.png'}) await browser.close() asyncio.get_event_loop().run_until_complete(main()) # 2 在网页上执行一段脚本 import asyncio from pyppeteer import launch async def main(): browser = await launch() page = await browser.newPage() await page.goto('http://example.com') await page.screenshot({'path': 'example.png'}) dimensions = await page.evaluate('''() => { return { width: document.documentElement.clientWidth, height: document.documentElement.clientHeight, deviceScaleFactor: window.devicePixelRatio, } }''') print(dimensions) # >>> {'width': 800, 'height': 600, 'deviceScaleFactor': 1} await browser.close() asyncio.get_event_loop().run_until_complete(main())

Pyppeteer和Puppeteer的不同点

1.Pyppeteer支持字典和关键字传参,Puppeteer只支持字典传参

# Puppeteer只支持字典传参 browser = await launch({'headless': True}) # Pyppeteer支持字典和关键字传参 browser = await launch({'headless': True}) browser = await launch(headless=True)

2.元素选择器方法名 $变为querySelector

# Puppeteer使用$符 Page.$()/Page.$$()/Page.$x()`` # Pyppeteer使用Python风格的函数名 Page.querySelector()/Page.querySelectorAll()/Page.xpath() # 简写方式为: Page.J(), Page.JJ(), and Page.Jx()

3.Page.evaluate() 和 Page.querySelectorEval()的参数 Puppeteer的evaluate()方法使用JavaScript原生函数或JavaScript表达式字符串。Pyppeteer的evaluate()方法只使用JavaScript字符串,该字符串可以是函数也可以是表达式,Pyppeteer会进行自动判断。但有时会判断错误,如果字符串被判断成了函数,并且报错,可以添加选项force_expr=True,强制Pyppeteer作为表达式处理。

获取页面内容: content = await page.evaluate('document.body.textContent', force_expr=True) 获取元素的内部文字: element = await page.querySelector('h1') title = await page.evaluate('(element) => element.textContent', element)

实战写法

async def get_res(self,url,sql_data): proxies = self.proxies while True: print(proxies) browser = await launch({ 'headless':True, 'args': ['--proxy-server={}'.format(proxies), '--no-sandbox'] }) page = await browser.newPage() await page.setViewport(viewport={'width': 1280, 'height': 800}) # 是否启用JS,enabled设为False,则无渲染效果 await page.setJavaScriptEnabled(enabled=False) try: await page.goto(url) # 标题包含404 Not found - Kompass title = await page.title() if 'Not' in title: print('404 Not found - Kompass') self.get_sql_res_que.put({'res': False, 'sql_data': sql_data}) self.proxies = proxies await browser.close() return False page = await page.content() self.get_sql_res_que.put({'res':page,'sql_data':sql_data}) self.proxies = proxies # print(page) await browser.close() return True except: # 关闭浏览器 try: await browser.close() except: print('浏览器已关闭') print('再次请求') proxies = get_proxy_ip().get('https')
最新回复(0)