在微博搜索采集时,默认情况下只显示当前页数据。如果搜索的关键词是热词,当前页
数据的时间范围可能只有三五分钟。所以,如果要把数据采集全,则必须登录。
在大批量采集时,必须使用账号构建cookie池
,并根据cookie有效期实时更新已过期的cookie,下面主要实现基于Pyppeteer
的微博登录
,供大家参考。
新浪微博登录DEMO主类:
import asyncio, time
from com.fy.plugs.browser.pyppeteer.PyppeteerBrowser import PyppeteerBrowser
from com.fy.utils.date.DateUtils import Date_Utils
class WeiBoLogin:
def __init__(self):
self.pb = PyppeteerBrowser()
self.du = Date_Utils()
def login(self):
url = "https://passport.weibo.cn/signin/login?entry=mweibo&res=wel&wm=3349&r=https%3A%2F%2Fm.weibo.cn%2F"
userDataDir = "d://pyppeteer" + str(self.du.getCurrentTimeStr_Year())
asyncio.get_event_loop() .run_until_complete(self.pb.getbrowser(False, userDataDir))
asyncio.get_event_loop() .run_until_complete(self.pb.open(url, 60))
time.sleep(10)
asyncio.get_event_loop() .run_until_complete(self.pb.inputKw(None, "#loginName", "用户名"))
time.sleep(1)
asyncio.get_event_loop() .run_until_complete(self.pb.inputKw(None, "#loginPassword", "密码"))
time.sleep(1)
eles = asyncio.get_event_loop() .run_until_complete(self.pb.getElementsByXpaths(None, '//*[@id="loginAction"]'))
asyncio.get_event_loop() .run_until_complete(self.pb.clickByEle(eles[0]))
time.sleep(100)
if __name__ == '__main__':
sbl = WeiBoLogin()
sbl.login()
Pyppeteer公共类:
import asyncio, tkinter, traceback
import time
from pyppeteer import launch
from com.fy.utils.http.UserAgentUtils import UserAgentUtils
from com.fy.utils.hash.HashUtils import Hash_Utils
from com.fy.utils.file.FileUtils import File_Utils
class PyppeteerBrowser:
def __init__(self):
self.hash = Hash_Utils()
self.url = None
self.ua = UserAgentUtils()
def screen_size(self):
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def getbrowser(self, headless=False, userDataDir=None):
args = [ "--start-maximized", '--no-sandbox', "--disable-infobars" , "--log-level=3"]
parameters = {}
if userDataDir == None:
parameters = {'headless': headless,
'args': args,
'dumpio': True
}
else:
parameters = {'headless': headless,
'args': args,
"userDataDir": userDataDir,
'dumpio': True
}
self.browser = await launch(parameters)
self.page = await self.browser.newPage()
width, height = self.screen_size()
await self.page.setViewport({
"width": width,
"height": height
})
await self.page.setJavaScriptEnabled(enabled=True)
await self.page.setUserAgent(self.ua.getheaders())
await self.preventCheckWebdriver(self.page)
async def getPage(self):
return self.page
async def getCurUrl(self, page):
if page == None:
page = self.page
return await page.url
async def getnewpage(self):
return await self.browser.newPage()
async def reload(self):
await self.page.reload()
async def goBack(self):
await self.page.goBack()
async def getPageUrl(self):
await self.page.url()
async def open(self, url, timeout=60):
try:
if url == None:
print("当前传入的【url】不能为空,参数错误!!")
self.url = url
self.res = await self.page.goto(url, options={'timeout':int(timeout * 1000)})
await asyncio.sleep(1)
status = await self.res.status
curUrl = await self.page.url
await self.preventCheckWebdriver(self.page)
return status, curUrl
except:return 404, None
async def preventCheckWebdriver(self, page):
if page == None:
page = self.page
await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''')
await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
async def closeBrowser(self, browser):
if browser == None:
browser = self.browser
try:
await browser.close()
except:pass
async def closePage(self, page):
if page == None:
page = self.page
await page.close()
async def closeNumPage(self, number:"号码从0开始"):
pages = await self.browser.pages()
await pages[number].close()
return True
async def retainLastPage(self):
pages = await self.browser.pages()
num = 0
for page in pages:
if num != (len(pages) - 1):
await page.close()
else:
self.page = page
num += 1
async def gerReponseStatus(self):
try:return await self.res.status # 响应状态
except:return 200
async def screenshot(self, page):
hashCode = self.hash.getMd5Hash(self.url)
if page == None:
page = self.page
await page.screenshot({'path': './screenshots/' + str(hashCode) + '.png'})
async def getHeader(self):
return await self.res.headers # 响应头;
async def scrollToButtom(self, page):
if page == None:
page = self.page
await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
async def getCookies(self, page):
if page == None:
page = self.page
return await page.cookies()
async def getCookieStr(page):
if page == None:
page = self.page
cookies_list = await page.cookies()
cookies = ''
for cookie in cookies_list:
str_cookie = '{0}={1};'
str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
cookies += str_cookie
print(cookies)
return cookies
async def setCookies(self, page, cookies):
if page == None:
page = self.page
return await page.setCookie(*cookies)
async def getHtml(self, page):
if page == None:
page = self.page
return (await page.content())
async def getCurPageTitle(self, page):
if page == None:
page = self.page
return (await page.title())
async def getElementFieldValue(self, page, element, field):
if element == None:
return None
if field == None:
return None
if page == None:
page = self.page
if str(type(element)) == "<class 'list'>":
return None
fieldValue = (await element.getProperty(field)).jsonValue()
return fieldValue
async def getPageWidthHight(self, page):
if page == None:
page = self.page
return await page.evaluate('''() => {
return {
width: document.documentElement.clientWidth,
height: document.documentElement.clientHeight,
deviceScaleFactor: window.devicePixelRatio,
}
}''')
async def getCurBrowserAllPages(self):
return await self.browser.pages()
async def getElementsByXpaths(self, page, xpath ):
if xpath == None:
return None
if page == None:
page = self.page
try:elemList = await page.xpath(xpath)
except:
print("获取xpath路径为【" + str(xpath) + "】的标签对象异常...")
return elemList#返回类型为:list集合;
async def getPageText(self, page):
if page == None:
page = self.page
return await page.evaluate('document.body.textContent', force_expr=True)
async def getElementText(self, page, element):
if element == None:
return None
if page == None:
page = self.page
if str(type(element)) == "<class 'list'>":
return None
return await page.evaluate('(element) => element.textContent', element)
async def getElementBySelector(self, page , selector):
if selector == None:
return None
if page == None:
page = self.page
return await page.querySelector(selector)
async def inputKw(self, page, selector , kw ):
if kw == None:
return None
if selector == None:
return None
if page == None:
page = self.page
print(selector, kw)
await page.type(selector, kw)
return None
async def clickElement(self, page, selector ):
if selector == None:
print("当前传入的【selector】不能为空,参数错误!!")
if page == None:
page = self.page
await page.click(selector)
async def removeInputValue(self, page, idValue):
if idValue == None:
print("当前传入的【idValue】不能为空,参数错误!!")
if page == None:
page = self.page
await page.evaluate("document.querySelector('#" + str(idValue) + "').value=''")
async def clickByEle(self, ele):
if ele == None:
return
print(ele)
return await ele.click()
async def getLastPage(self):
pages = await self.browser.pages()
return pages[-1]
async def getPageTotal(self):
pages = await self.browser.pages()
return len(pages)
async def getFirstPage(self):
pages = await self.browser.pages()
return pages[0]
async def getAllFrames(self, page):
if page == None:
page = self.page
return await page.frames
async def getScreenshotByEle(self, page, ele, screenshotFilePath):
picture = ''
try:
fu = File_Utils(None)
fu = File_Utils(fu.getParentDir(screenshotFilePath))
if not fu.exists(fu.getParentDir(screenshotFilePath)):fu.makeDirs()
time.sleep(3)
try:
for _ in range(6):
clip = await ele.boundingBox()
picture = base64.b64encode(await page.screenshot({
'path': screenshotFilePath,
'clip': clip,
# 'encoding': 'base64',
}))
if picture != '':
break
except Exception as e:
print(traceback.print_exc())
except Exception as e:
print(traceback.print_exc())
return picture
注意事项:
测试过程中发现,基于http://www.weibo.com 的登录界面,在Pyppeteer
浏览器中,登录按钮
无法使用。但是手机端登录界面
可以正常登录