比selenium更高效的爬虫界的新神器之Pyppeteer常用方法汇总

十点数据 1年前 ⋅ 2837 阅读

0OZBN4t0zP.jpg

最近写了几十篇文章,发现也就爬虫系列之Pyppeteer:比selenium更高效的爬虫界的新神器阅读数还可以,看来大家对Pyppeteer还是比较感兴趣啊,

今天就把先前使用中整理的常用方法发出来,供大家参考一下。

import asyncio, tkinter, traceback 
import time
from pyppeteer import launch
from com.fy.utils.http.UserAgentUtils import UserAgentUtils
from com.fy.utils.hash.HashUtils import Hash_Utils
from com.fy.utils.file.FileUtils import File_Utils 
class PyppeteerBrowser:
    def __init__(self):
        self.hash = Hash_Utils()
        self.url = None
        self.ua = UserAgentUtils()

    #"""使用tkinter获取屏幕大小""")
    def screen_size(self):
        tk = tkinter.Tk()
        width = tk.winfo_screenwidth()
        height = tk.winfo_screenheight()
        tk.quit()
        return width, height

    #构造一个浏览器对象;如果需要每次初始化新的浏览器对象,则userDataDir路径必须不同,否则,始终是在第一次初始化的浏览器对象上进行操作,且容易出异常;
    async def getbrowser(self, headless=False, userDataDir=None):
        '''
            参数:
            •ignoreHTTPSErrors(bool):是否忽略 HTTPS 错误。默认为 False
            •headless(bool):是否在无头模式下运行浏览器。默认为 True除非appMode或devtools选项True
            •executablePath (str):运行 Chromium 或 Chrome 可执行文件的路径,而不是默认捆绑的 Chromium。如果指定之后就不需要使用默认的 Chromium 了,可以指定为已有的 Chrome 或 Chromium。
            •slowMo (int | float):通过传入指定的时间,可以减缓 Pyppeteer 的一些模拟操作。 (按指定的毫秒数减慢 pyppeteer 操作。)
            •args (List [str]):传递给浏览器进程的附加参数(标志)。
            •dumpio(bool):是否管道浏览器进程 stdout 和 stderr 进入process.stdout和process.stderr。默认为False。为 True时,可以解决chromium浏览器多开页面卡死问题。
            •userDataDir (str):用户数据目录的路径。即用户数据文件夹,即可以保留一些个性化配置和操作记录。(比如登录信息等;可以在以后打开时自动登录;)
            •env(dict):指定浏览器可见的环境变量。默认与 python 进程相同。
            •devtools(bool):是否为每个选项卡自动打开 DevTools 面板。如果是此选项True,headless则将设置该选项 False。
            •logLevel(int | str):用于打印日志的日志级别。默认值与根记录器相同。
            •autoClose(bool):脚本完成时自动关闭浏览器进程。默认为True。
            •loop(asyncio.AbstractEventLoop):事件循环(实验)。
            •args:常用的有['--no-sandbox','--disable-gpu', '--disable-setuid-sandbox','--window-size=1440x900']
            •dumpio: 不知道为什么,如果不加 dumpio=True 有时会出现浏览器卡顿
            •autoClose:默认就好,不过如果你需要保持浏览器状态,可以不关闭,下次直接连接这个已存在的浏览器

            ignoreDefaultArgs (bool): 不使用 Pyppeteer 的默认参数,如果使用了这个参数,那么最好通过 args 参数来设定一些参数,否则可能会出现一些意想不到的问题。这个参数相对比较危险,慎用。
            handleSIGINT (bool): 是否响应 SIGINT 信号,也就是可以使用 Ctrl + C 来终止浏览器程序,默认是 True。
            handleSIGTERM (bool): 是否响应 SIGTERM 信号,一般是 kill 命令,默认是 True。
            handleSIGHUP (bool): 是否响应 SIGHUP 信号,即挂起信号,比如终端退出操作,默认是 True。

            launch_kwargs = {
            # 控制是否为无头模式
            "headless": False,
            # chrome启动命令行参数
            "args": [
                # 浏览器代理 配合某些中间人代理使用
                "--proxy-server=http://127.0.0.1:8008",
                # 最大化窗口
                "--start-maximized",
                # 取消沙盒模式 沙盒模式下权限太小
                "--no-sandbox",
                # 不显示信息栏  比如 chrome正在受到自动测试软件的控制 ...
                "--disable-infobars",
                # log等级设置 在某些不是那么完整的系统里 如果使用默认的日志等级 可能会出现一大堆的warning信息
                "--log-level=3",
                # 设置UA
                "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
            ],
            # 用户数据保存目录 这个最好也自己指定一个目录
            # 如果不指定的话,chrome会自动新建一个临时目录使用,在浏览器退出的时候会自动删除临时目录
            # 在删除的时候可能会删除失败(不知道为什么会出现权限问题,我用的windows) 导致浏览器退出失败
            # 然后chrome进程就会一直没有退出 CPU就会狂飙到99%
            "userDataDir": "",
            }
        '''
        print("构造浏览器对象开始...")
        args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"]
        parameters = {}
        if userDataDir == None:
            parameters = {'headless': headless, #是否打开浏览器;False:打开浏览器;True:进程中运行;
                                       'args': args,
                                        'dumpio': True  #'dumpio': True:解决chromium浏览器多开页面卡死问题。
                                        }
        else:
            parameters = {'headless': headless, #是否打开浏览器;False:打开浏览器;True:进程中运行;
                                       'args': args,
                                       "userDataDir": userDataDir,
                                        'dumpio': True   #'dumpio': True:解决chromium浏览器多开页面卡死问题。
                                        }
        #注意:同一个用户目录(userDataDir)不能被两个chrome进程使用,如果你要多开,记得分别指定用户目录。否则会报编码错误。
        self.browser = await launch(parameters)

        self.page = await self.browser.newPage()#在此浏览器上创建新页面并返回其对象。

        width, height = self.screen_size()
        # 设置网页可视区域大小
        await self.page.setViewport({
            "width": width,
            "height": height
        })

        # 是否启用JS,enabled设为False,则无渲染效果
        await self.page.setJavaScriptEnabled(enabled=True)

        #设置请求头userAgent
        await self.page.setUserAgent(self.ua.getheaders())

        await self.preventCheckWebdriver(self.page)
        print("构造浏览器对象完毕....", self.page)

    #获取当前操作的界面    --OK--
    async def getPage(self): 
        return  self.page

    #获取当前page对象的链接;
    async def getCurUrl(self, page):
        if page == None:
            page = self.page 
        return  await page.url

    #打开一个新的界面;)
    async def getnewpage(self): 
        return  await self.browser.newPage()

    #获取当前操作的界面重新加载    
    async def reload(self): 
        await self.page.reload()   

    #当前操作界面返回   
    async def goBack(self): 
        await self.page.goBack() 

    #获取当前操作的界面的URL   
    async def getPageUrl(self): 
        await self.page.url() 

    #打开连接;--OK--
    async def open(self, url, timeout=60):
        try:
            if url == None:
                print("当前传入的【url】不能为空,参数错误!!")
            self.url = url
            print("打开网页:" + (url))      
            self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)})#打开连接;
            await asyncio.sleep(1)#强行等待3秒
            status = await self.res.status
            curUrl = await self.page.url
            await self.preventCheckWebdriver(self.page)
            return  status, curUrl
        except:return  404, None
    #                            
    #防止 webdriver 检测,如淘宝登录。其实淘宝主要通过 window.navigator.webdriver 来对 webdriver 进行检测,所以我们只需要使用 JavaScript 将它设置为 false 即可
    async def preventCheckWebdriver(self, page):
        if page == None:
            page = self.page
        # 替换淘宝在检测浏览时采集的一些参数。
        # 就是在浏览器运行的时候,始终让window.navigator.webdriver=false
        # navigator是windiw对象的一个属性,同时修改plugins,languages,navigator
        await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')  # 以下为插入中间js,将淘宝会为了检测浏览器而调用的js修改其结果。
        await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }''')
        await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
        await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')

    #关闭当前打开的浏览器;
    async def closeBrowser(self, browser):
        if browser == None:
            browser = self.browser
        try:
            await browser.close()
        except:pass

    #关闭当前打开的浏览器中的一个界面;
    async def closePage(self, page):
        if page == None:
            page = self.page
        await page.close()

    #关闭当前打开的浏览器中的某一个界面;-
    async def closeNumPage(self, number:"号码从0开始"):
        pages = await self.browser.pages()
        await pages[number].close()
        return True


    #关闭除了最后一个所有的界面;
    async def retainLastPage(self):
        pages = await self.browser.pages()
        num = 0
        for  page in pages:
            if num != (len(pages) - 1):
                await page.close()
            else:
                self.page = page
            num += 1

    #获取当前打开页面的响应状态       
    async def gerReponseStatus(self):
        try:return await self.res.status  # 响应状态
        except:return 200

    #截个图                   
    async def screenshot(self, page):
        hashCode = self.hash.getMd5Hash(self.url)
        if page == None:
            page = self.page
        await page.screenshot({'path': './screenshots/' + str(hashCode) + '.png'})

    # 得到响应头;           
    async def getHeader(self):
        return await self.res.headers  # 响应头;

    # 滚动到页面底部         
    async def scrollToButtom(self, page):
        if page == None:
            page = self.page
        await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
        print("滑动到当前界面底部【完毕】")

    # 获取当前页面的cookie   
    async def getCookies(self, page):
        if page == None:
            page = self.page
        return await page.cookies()

    # 获取登录后cookie尚未测试是否正常;
    async def getCookieStr(page):
        if page == None:
            page = self.page
        cookies_list = await page.cookies()
        cookies = ''
        for cookie in cookies_list:
            str_cookie = '{0}={1};'
            str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
            cookies += str_cookie
        try:print(cookies)
        except:pass
        # 将cookie 放入 cookie 池 以便多次请求 封账号 利用cookie 对搜索内容进行爬取
        return cookies

    # 获取当前页面的cookie    
    async def setCookies(self, page, cookies):
        if page == None:
            page = self.page
        return await page.setCookie(*cookies)

    # 获取所有 html源码      
    async def getHtml(self, page):
        if page == None:
            page = self.page
        return (await page.content())

    #当前页标题             
    async def getCurPageTitle(self, page):
        if page == None:
            page = self.page
        return (await page.title())

    #获取对象属性值;
    async def getElementFieldValue(self, page, element, field):
        if element == None:
            print("当前传入的【element】不能为空,参数错误!!")
            return None
        if field == None:
            print("当前传入的【field】不能为空,参数错误!!")
            return None
        if page == None:
            page = self.page
        if str(type(element)) == "<class 'list'>":
            print("当前传入的【element】不是单个对象,为list集合,参数错误!!")
            return None
        fieldValue = (await element.getProperty(field)).jsonValue()
        return  fieldValue

    #获取当前界面的宽、高、像素大小比率三个值    
    async def getPageWidthHight(self, page):
        if page == None:
            page = self.page
        return  await page.evaluate('''() => {
                    return {
                        width: document.documentElement.clientWidth,
                        height: document.documentElement.clientHeight,
                        deviceScaleFactor: window.devicePixelRatio,
                    }
                }''')

    #获取当前浏览器的所有界面集合;        
    async def getCurBrowserAllPages(self):
        return await self.browser.pages()

    #获取当前界面中某个元素的内容;       
    async def getElementsByXpaths(self, page, xpath:'如://div[@class="title-box"]/a'):
        if xpath == None:
            print("当前传入的【xpath】不能为空,参数错误!!")
            return None
        if page == None:
            page = self.page
        try:elemList = await page.xpath(xpath)
        except:
            print("获取xpath路径为【" + str(xpath) + "】的标签对象异常...")
        return elemList#返回类型为:list集合;

    #获取当前界面的所有内容(不带html标签的内容);(效果较差;)
    async def getPageText(self, page):
        if page == None:
            page = self.page
        '''Pyppeteer的evaluate()方法只使用JavaScript字符串,该字符串可以是函数也可以是表达式,
           Pyppeteer会进行自动判断。但有时会判断错误,如果字符串被判断成了函数,并且报错,
             可以添加选项force_expr=True,强制Pyppeteer作为表达式处理。'''
        return await page.evaluate('document.body.textContent', force_expr=True)

    #获取元素内容;                        
    async def getElementText(self, page, element):
        if element == None:
            print("当前传入的【element】不能为空,参数错误!!")
            return None
        if page == None:
            page = self.page
        if str(type(element)) == "<class 'list'>":
            print("当前传入的【element】不是单个对象,为list集合,参数错误!!")
            return None
        return  await page.evaluate('(element) => element.textContent', element)

    #通过selector获取元素内容;                      
    async def getElementBySelector(self, page , selector):
        if selector == None:
            print("当前传入的【selector】不能为空,参数错误!!")
            return None
        if page == None:
            page = self.page
        return  await page.querySelector(selector)

    #向输入框输入数据               
    async def inputKw(self, page, selector:"如:'input#kw.s_ipt':获取input标签中id='kw',class='s_ipt'的对象。不可用xpath路径", kw:'待输入的关键词'):
        if kw == None:
            print("当前传入的【kw】不能为空,参数错误!!")
            return None
        if selector == None:
            print("当前传入的【selector】不能为空,参数错误!!")
            return None
        if page == None:
            page = self.page
        try:print(selector, kw)
        except:pass
        await page.type(selector, kw)
        return None

    #鼠标单击某一个元素;       
    async def clickElement(self, page, selector:"如:'input#kw.s_ipt':获取input标签中id='kw',class='s_ipt'的对象。。不可用xpath路径"):
        if selector == None:
            print("当前传入的【selector】不能为空,参数错误!!")
        if page == None:
            page = self.page
        await page.click(selector)#如果selector获取的对象是list集合,则执行第一个元素的点击;

    #清空某个input的值
    async def removeInputValue(self, page, idValue):
        if idValue == None:
            print("当前传入的【idValue】不能为空,参数错误!!")
        if page == None:
            page = self.page
        await page.evaluate("document.querySelector('#" + str(idValue) + "').value=''")
        print("清空【" + str(idValue) + "】的内容")

    #
    async def clickByEle(self, ele):
        if ele == None:
            return
        return await ele.click()

    #获取当前浏览器打开的【最后一个】界面对象  
    async def getLastPage(self):
        pages = await self.browser.pages()
        return pages[-1]

    #获取当前浏览器打开的【最后一个】界面对象    
    async def getPageTotal(self):
        pages = await self.browser.pages()
        return len(pages)

    #获取当前浏览器打开的【最一个】界面对象 --OK--
    async def getFirstPage(self):
        pages = await self.browser.pages()
        return pages[0]

    #获取当前界面中所有的frame对象        --OK--
    async def getAllFrames(self, page):
        if page == None:
            page = self.page
        return  await page.frames

    #根据原始截取图片
    async def getScreenshotByEle(self, page, ele, screenshotFilePath:"目前测试只有.png图片可正常生成,jpg异常;"):
        picture = ''
        try:
            fu = File_Utils(None)
            fu = File_Utils(fu.getParentDir(screenshotFilePath))
            if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()#如果图片的保存目录不存在,则创建;
            # 进行截图
            time.sleep(3)
            print("验证码路径:", screenshotFilePath)
            try:
                for _ in range(6):
                    clip = await ele.boundingBox()
                    picture = base64.b64encode(await page.screenshot({
                        'path': screenshotFilePath, # 图片路径, 不指定就不保存
                        'clip': clip, # 指定图片位置,大小
                        # 'encoding': 'base64',                           #  返回的图片格式, 默认二进制
                    }))
                    if picture != '':
                        break
            except Exception as e:
                print('截图获取失败')
                print(traceback.print_exc())
        except Exception as e:
            print('截图获取失败')
            print(traceback.print_exc())
        return picture

上面这段代码大家可以作为Pyppeteer的一个工具类加到自己的项目中,再根据自己的实际需求对其优化。

希望对大家有所帮助.

相关阅读:

一套价值十万的微信公众号采集解决方案(免费送)

数据采集采集架构中各模块详细分析

基于大数据平台的互联网数据采集平台基本架构

一套价值十万的微信公众号采集解决方案(免费送)

教你一种1分钟下载1万个网页的方法,你学吗?

uvloop:一个比gevent还要快两倍的 Python 异步网络框架

数据采集中,如何建立一套行之有效的监控体系?

3人团队,如何管理10万采集网站?(最全、最细解读)

爬虫系列之基于XPosed框架的微信公众号采集

全部评论: 0

    我有话说: