基于pyppeteer的新浪微博登录

十点数据 1年前 ⋅ 4023 阅读

新浪微博登录.jpg

在微博搜索采集时,默认情况下只显示当前页数据。如果搜索的关键词是热词,当前页数据的时间范围可能只有三五分钟。所以,如果要把数据采集全,则必须登录。

在大批量采集时,必须使用账号构建cookie池,并根据cookie有效期实时更新已过期的cookie,下面主要实现基于Pyppeteer的微博登录,供大家参考。

新浪微博登录DEMO主类:

import asyncio, time
from com.fy.plugs.browser.pyppeteer.PyppeteerBrowser import PyppeteerBrowser
from com.fy.utils.date.DateUtils import Date_Utils
class WeiBoLogin:
	def __init__(self):
    	self.pb = PyppeteerBrowser()
    	self.du = Date_Utils()

    def login(self):
	    url = "https://passport.weibo.cn/signin/login?entry=mweibo&res=wel&wm=3349&r=https%3A%2F%2Fm.weibo.cn%2F"
    	userDataDir = "d://pyppeteer" + str(self.du.getCurrentTimeStr_Year())
    	asyncio.get_event_loop() .run_until_complete(self.pb.getbrowser(False, userDataDir))
    	asyncio.get_event_loop() .run_until_complete(self.pb.open(url, 60))
    	time.sleep(10)
    	asyncio.get_event_loop() .run_until_complete(self.pb.inputKw(None, "#loginName", "用户名"))
    	time.sleep(1) 
    	asyncio.get_event_loop() .run_until_complete(self.pb.inputKw(None, "#loginPassword", "密码"))
    	time.sleep(1)
    	eles = asyncio.get_event_loop() .run_until_complete(self.pb.getElementsByXpaths(None, '//*[@id="loginAction"]'))
    	asyncio.get_event_loop() .run_until_complete(self.pb.clickByEle(eles[0]))
    	time.sleep(100)

if __name__ == '__main__':
	sbl = WeiBoLogin()
	sbl.login()

Pyppeteer公共类:

import asyncio, tkinter, traceback 
import time
from pyppeteer import launch
from com.fy.utils.http.UserAgentUtils import UserAgentUtils
from com.fy.utils.hash.HashUtils import Hash_Utils
from com.fy.utils.file.FileUtils import File_Utils 
class PyppeteerBrowser:
def __init__(self):
    self.hash = Hash_Utils()
    self.url = None
    self.ua = UserAgentUtils()

def screen_size(self):
    tk = tkinter.Tk()
    width = tk.winfo_screenwidth()
    height = tk.winfo_screenheight()
    tk.quit()
    return width, height

async def getbrowser(self, headless=False, userDataDir=None):
    args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"]
    parameters = {}
    if userDataDir == None:
        parameters = {'headless': headless,  
                                   'args': args,
                                    'dumpio': True  
                                    }
    else:
        parameters = {'headless': headless,  
                                   'args': args,
                                   "userDataDir": userDataDir,
                                    'dumpio': True  
                                    }
    self.browser = await launch(parameters)
    self.page = await self.browser.newPage() 
   
    width, height = self.screen_size()
    await self.page.setViewport({
        "width": width,
        "height": height
    })
    await self.page.setJavaScriptEnabled(enabled=True)
    await self.page.setUserAgent(self.ua.getheaders())
    await self.preventCheckWebdriver(self.page)
    
async def getPage(self): 
    return  self.page

async def getCurUrl(self, page):
    if page == None:
        page = self.page 
    return  await page.url

async def getnewpage(self): 
    return  await self.browser.newPage()

async def reload(self): 
    await self.page.reload()   

async def goBack(self): 
    await self.page.goBack() 

async def getPageUrl(self): 
    await self.page.url() 

async def open(self, url, timeout=60):
    try:
        if url == None:
            print("当前传入的【url】不能为空,参数错误!!")
        self.url = url
        self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)}) 
        await asyncio.sleep(1) 
        status = await self.res.status
        curUrl = await self.page.url
        await self.preventCheckWebdriver(self.page)
        return  status, curUrl
    except:return  404, None

async def preventCheckWebdriver(self, page):
    if page == None:
        page = self.page
    await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')  
    await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }''')
    await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
    await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')

async def closeBrowser(self, browser):
    if browser == None:
        browser = self.browser
    try:
        await browser.close()
    except:pass

async def closePage(self, page):
    if page == None:
        page = self.page
    await page.close()
    
async def closeNumPage(self, number:"号码从0开始"):
    pages = await self.browser.pages()
    await pages[number].close()
    return True
    
async def retainLastPage(self):
    pages = await self.browser.pages()
    num = 0
    for  page in pages:
        if num != (len(pages) - 1):
            await page.close()
        else:
            self.page = page
        num += 1
    
async def gerReponseStatus(self):
    try:return await self.res.status  # 响应状态
    except:return 200

async def screenshot(self, page):
    hashCode = self.hash.getMd5Hash(self.url)
    if page == None:
        page = self.page
    await page.screenshot({'path': './screenshots/' + str(hashCode) + '.png'})

async def getHeader(self):
    return await self.res.headers  # 响应头;

async def scrollToButtom(self, page):
    if page == None:
        page = self.page
    await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')

async def getCookies(self, page):
    if page == None:
        page = self.page
    return await page.cookies()

async def getCookieStr(page):
    if page == None:
        page = self.page
    cookies_list = await page.cookies()
    cookies = ''
    for cookie in cookies_list:
        str_cookie = '{0}={1};'
        str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
        cookies += str_cookie
    print(cookies)
    return cookies

async def setCookies(self, page, cookies):
    if page == None:
        page = self.page
    return await page.setCookie(*cookies)

async def getHtml(self, page):
    if page == None:
        page = self.page
    return (await page.content())

async def getCurPageTitle(self, page):
    if page == None:
        page = self.page
    return (await page.title())

async def getElementFieldValue(self, page, element, field):
    if element == None:
        return None
    if field == None:
        return None
    if page == None:
        page = self.page
    if str(type(element)) == "<class 'list'>":
        return None
    fieldValue = (await element.getProperty(field)).jsonValue()
    return  fieldValue

async def getPageWidthHight(self, page):
    if page == None:
        page = self.page
    return  await page.evaluate('''() => {
                return {
                    width: document.documentElement.clientWidth,
                    height: document.documentElement.clientHeight,
                    deviceScaleFactor: window.devicePixelRatio,
                }
            }''')

async def getCurBrowserAllPages(self):
    return await self.browser.pages()

async def getElementsByXpaths(self, page, xpath ):
    if xpath == None:
        return None
    if page == None:
        page = self.page
    try:elemList = await page.xpath(xpath)
    except:
        print("获取xpath路径为【" + str(xpath) + "】的标签对象异常...")
    return elemList#返回类型为:list集合;

async def getPageText(self, page):
    if page == None:
        page = self.page
    return await page.evaluate('document.body.textContent', force_expr=True)
    
async def getElementText(self, page, element):
    if element == None:
        return None
    if page == None:
        page = self.page
    if str(type(element)) == "<class 'list'>":
        return None
    return  await page.evaluate('(element) => element.textContent', element)

async def getElementBySelector(self, page , selector):
    if selector == None:
        return None
    if page == None:
        page = self.page
    return  await page.querySelector(selector)

async def inputKw(self, page, selector , kw ):
    if kw == None:
        return None
    if selector == None:
        return None
    if page == None:
        page = self.page
    print(selector, kw)
    await page.type(selector, kw)
    return None

async def clickElement(self, page, selector ):
    if selector == None:
        print("当前传入的【selector】不能为空,参数错误!!")
    if page == None:
        page = self.page
    await page.click(selector)

async def removeInputValue(self, page, idValue):
    if idValue == None:
        print("当前传入的【idValue】不能为空,参数错误!!")
    if page == None:
        page = self.page
    await page.evaluate("document.querySelector('#" + str(idValue) + "').value=''")

async def clickByEle(self, ele):
    if ele == None:
        return
    print(ele)
    return await ele.click()
        
async def getLastPage(self):
    pages = await self.browser.pages()
    return pages[-1]
        
async def getPageTotal(self):
    pages = await self.browser.pages()
    return len(pages)
        
async def getFirstPage(self):
    pages = await self.browser.pages()
    return pages[0]
        
async def getAllFrames(self, page):
    if page == None:
        page = self.page
    return  await page.frames

async def getScreenshotByEle(self, page, ele, screenshotFilePath):
    picture = ''
    try:
        fu = File_Utils(None)
        fu = File_Utils(fu.getParentDir(screenshotFilePath))
        if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()
        time.sleep(3)
        try:
            for _ in range(6):
                clip = await ele.boundingBox()
                picture = base64.b64encode(await page.screenshot({
                    'path': screenshotFilePath, 
                    'clip': clip, 
                    # 'encoding': 'base64',    
                }))
                if picture != '':
                    break
        except Exception as e:
            print(traceback.print_exc())
    except Exception as e:
        print(traceback.print_exc())
    return picture

注意事项:

测试过程中发现,基于http://www.weibo.com 的登录界面,在Pyppeteer浏览器中,登录按钮无法使用。但是手机端登录界面可以正常登录

相关阅读:

一套价值十万的微信公众号采集解决方案(免费送)

数据采集采集架构中各模块详细分析

基于大数据平台的互联网数据采集平台基本架构

一套价值十万的微信公众号采集解决方案(免费送)

教你一种1分钟下载1万个网页的方法,你学吗?

uvloop:一个比gevent还要快两倍的 Python 异步网络框架

数据采集中,如何建立一套行之有效的监控体系?

3人团队,如何管理10万采集网站?(最全、最细解读)

爬虫系列之基于XPosed框架的微信公众号采集

全部评论: 0

    我有话说: