叠甲:本文写作目的仅为本人学习参考、实现功能为课程实验要求、本文写作过程处于作者对于爬虫技术学习的过程,部分代码编写时水平还不够,出现的错误比较多、部分变量名字又臭又长,水平有限,请各位给我的作业汇报留条活路[拜托],请不要卷我,如果发现有什么好想法给我说一下那就太好了
<----------------------------------------------->
>>>> 项目源码(Github) <<<<
>>>> 项目源码(百度网盘) <<<<
<----------------------------------------------->
零、写在前面
2023年:开始做实验时,在网上找知乎自动登录的代码,使用cookie的都还可以用,使用账号密码的已经不能在用了,也就是知乎在2022-202S3年期间升级了反爬功能,使用自动化工具控制浏览器直接登录知乎已经不可行。知乎貌似还在加强反爬,也不知道这些代码还能用到什么时候。
调用的库
# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from time import sleep
import json
import os
import re
import threading
from concurrent.futures import ThreadPoolExecutor
import time,sys
一、自动登录:数据采集前,个人账号应通过程序自动登录,若遇到验证码可在程序中手动输入并继续登录
(注意,本文采用远程调试模式使用chrome_driver
,以绕过知乎的反爬机制,但是事实证明,绕过反扒机制,使用selenium
edge_driver
也可达成目的,推荐使用后者,但是由于本文一开始就使用了前者,已经不便于再修改了)
这一部分和下一部分在写代码时对于selenium及xpath的设计不太熟悉,因此很多xpath看上去用的很蠢(以后可能会进行优化,暂时没有修改的想法)
按照本实验要求:实现手动扫描二维码即可,移动端如果取消双重登录验证的选项,可不扫描二维码,微博两种登陆、QQ两种登陆方式,以及微信一种登陆方式,按序号排序0-4分别为微博账号密码登录(如果没有取消双重登录验证会需要扫码)、微博扫码登陆、QQ扫码登陆、QQ已在当前电脑登陆后免密免扫码登录和微信扫码登录
(main函数中实现通过微博第三方登录的“自动”登录)
if __name__ == '__main__':
WeiBo_usr = '3247842625@qq.com'
WeiBo_pwd = 'irontys'
Login = Login_ZhiHu('https://zhihu.com',chrome_ports[0])
Login.third_party_WeiBo_login(WeiBo_usr,WeiBo_pwd)
Login.sign_cookie()
# 在实际项目中删去sleep
sleep(100000)
# 之后会用做多线程加速的端口号列表,可修改,
# 注:不要使用range(9243,9253)的端口号,
# 后于多线程加速时会用到
chrome_ports = ['9222','9223','9224','9225','9226']
# 设置信息搜集的浏览器界面是否可见
# 若不可见,visible = 0
# 不可见的实现方法是
# 把打开的浏览器窗口的position
# 设置到屏幕以外的区域,不同电脑
# 根据分辨率不同可能需要修改
# window_position变量中的x的值
visible = 1
if visible:
window_position = {'x':'0','y':'0'}
else:
window_position = {'x':'4000','y':'0'}
class Login_ZhiHu():
def __init__(self,url,chrome_port):
# 连接chrome浏览器,否则会出现知乎搜索功能不可用,某用户的回答不可见,使用前需要将chrome.exe所在文件夹的路径添加到全局变量中
cmd = 'chrome.exe --remote-debugging-port='+ chrome_port + ' --window-position='+ window_position['x'] + ',' + window_position['y'] + ' --user-data-dir=\"E:\Iront\StudyItems\TC\Crouses\ContentSecurity\EX1_ZhiHu_Info_collention\project\chrome_user_data_'+ chrome_port + '\"'
os.popen(cmd)
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", "127.0.0.1:" + chrome_port) # 前面设置的端口号
# chrome在后台运行,没有显示在屏幕最上方,可能会导致一些元素无法被定位或加载
# 下两行的option可以避免后台运行chrome导致元素无法加载的问题
options.add_argument("--headless") # 不清楚这个option是否有用
self.url = url
self.driver = webdriver.Chrome(options=options)
self.login_cookie = None
self.driver.set_window_position(int(window_position['x']), int(window_position['y']))
Login_ZhiHu类中的third_party_WeiBo_login函数
def third_party_WeiBo_login(self,usr,pwd):
self.driver.get(self.url)
self.driver.maximize_window()
# 点击通过微博登录
self.driver.find_element(By.XPATH,'//*[@id="root"]/div/main/div/div/div/div/div[2]/div/div[3]/span/button[3]').click()
# 操作刚打开的微博第三方登陆界面
all_handles = self.driver.window_handles
# 稍后从第三方登录页面切换回知乎主页会用到
ZhiHu_Handle = all_handles[0]
# 切换到微博登陆界面句柄
WeiBo_Handle = all_handles[1]
self.driver.switch_to.window(WeiBo_Handle)
# 切换到输入用户名和密码的界面
self.driver.find_element(By.XPATH,'//*[@id="jump_login_url_a"]').click()
# 等待加载页面,下同
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="username"]')))
except:
print("Login Failed! Check u network statue")
# 输入微博账号和密码
self.driver.find_element(By.XPATH,'//*[@id="username"]').send_keys(usr)
self.driver.find_element(By.XPATH,'//*[@id="password"]').send_keys(pwd)
sleep(1)
# 登录
self.driver.find_element(By.XPATH,'//*[@id="vForm"]/div[2]/div/ul/li[7]/div[1]/input').click()
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="message_sms_login"]')))
except:
print("Login Failed! Check u network statue")
# 点击扫码验证
self.driver.find_element(By.XPATH,'//*[@id="qrCodeCheck"]').click()
# find 验证码的链接
img_src = self.driver.find_element(By.XPATH,'//*[@id="qrcode"]')
src = img_src.get_attribute("src")
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="outer"]/div/div[2]/div/div[2]/div[2]/p/a[1]')))
except:
print("Login Failed! Check u network statue")
# 确认授权
self.driver.find_element(By.XPATH,'//*[@id="outer"]/div/div[2]/div/div[2]/div[2]/p/a[1]').click()
self.driver.switch_to.window(ZhiHu_Handle)
Login_ZhiHu类中的其他函数:sign_cookie、cookie_login、prepared_drive
# 将cookie存储起来到指定文件
def sign_cookie(self):
try:
WebDriverWait(self.driver,10000).until(EC.presence_of_element_located((By.XPATH, '//*[@id="Popover1-toggle"]')))
except:
print("Waiting for you to scan for a long time -_-")
dictCookies = self.driver.get_cookies() # 获取list的cookies
jsonCookies = json.dumps(dictCookies) # 转换成字符串保存
with open('ZhiHu_cookies.txt', 'w') as f:
f.write(jsonCookies)
print('cookies保存成功!')
# 为下一步多线程操作做准备,也可用作调试,即可避免每次调试重新扫码登录
def cookie_login(self):
self.driver.get(self.url)
# 还需要在开头加上:# -*- coding: utf-8 -*-,否则open会出问题
if self.login_cookie == None:
with open('ZhiHu_cookies.txt', 'r', encoding='utf-8') as f:
self.login_cookie = listCookies = json.loads(f.read())
else:
listCookies = self.login_cookie
# 往driver里添加cookies
for cookie in listCookies:
cookie_dict = {
'domain': '.zhihu.com',
'name': cookie.get('name'),
'value': cookie.get('value'),
"expires": '',
'path': '/',
'httpOnly': False,
'HostOnly': False,
'Secure': False
}
self.driver.add_cookie(cookie_dict)
# 此处没有必要sleep
self.driver.refresh()
# 为下一个用于搜集指定用户信息的类:User_ZhiHu传递driver
def prepared_drive(self):
return self.driver
二、指定用户基本属性信息采集
基本属性信息:用户名、性别、一句话介绍、居住地、所在行业、职业经历、个人简介
if __name__ == '__main__':
WeiBo_usr = '3247842625@qq.com'
WeiBo_pwd = 'irontys'
login_url = 'https://www.zhihu.com'
Login = Login_ZhiHu(login_url,chrome_ports[0])
Login.cookie_login()
driver_ZhiHu = Login.prepared_drive()
# 用于用户信息搜集的类
User = User_ZhiHu(driver_ZhiHu)
# 该用户只是我随机找的,如有冒犯,万分抱歉
username_ZhiHu = '孟冬石榴'
home_page_url = User.goto_user_home_page(username_ZhiHu)
User.user_basic_information_collection('basic_information.txt')
用于用户信息搜集的类User_ZhiHu
class User_ZhiHu():
def __init__(self,driver):
# 继承Login_ZhiHu的driver
self.driver = driver
# 下述参数是为实现定时检测用户信息更新情况而设置的
# 第零个元素为用户asks/answers的总数
self.answers_edit_statue = ['']*10
self.asks_edit_statue = ['']*10
self.valid_asks_count = 0
self.valid_answers_count = 0
User_ZhiHu类中的goto_user_home_page函数,这部分xpath没有非常丑,后期有可能会修改
目的是将driver切换到“目标”用户(username)的主页,返回值获取url是为了后续多线程加速而设计的
def goto_user_home_page(self,username):
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="Popover1-toggle"]')))
except:
print("Search Failed! Check u network statue")
# //*[@id="SearchMain"]/div/div/div/div/div[2]/div/div
# //*[@id="SearchMain"]/div/div/div/div/div[2]/div/div
# 输入搜索内容
self.driver.find_element(By.XPATH,'//*[@id="Popover1-toggle"]').send_keys(username)
# 点击搜索
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="root"]/div/div[2]/header/div[2]/div[1]/div/form/div/div/label/button/span')))
except:
print("Search Failed! Check u network statue")
self.driver.find_element(By.XPATH,'//*[@id="root"]/div/div[2]/header/div[2]/div[1]/div/form/div/div/label/button/span').click()
# 点击搜索内容为用户
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="root"]/div/main/div/div[1]/div/div/ul/li[2]/a')))
except:
print("Search Failed! Check u network statue")
self.driver.find_element(By.XPATH,'//*[@id="root"]/div/main/div/div[1]/div/div/ul/li[2]/a').click()
# 上述操作会被知乎检测出来(可能会被识别出来是爬虫?反正不管怎么sleep,除了综合那一栏,其他栏都不能正常加载,但是人为点击不会出错),但是可以通过刷新页面绕过检测
self.driver.refresh()
# 等待搜索页面加载
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="SearchMain"]/div/div/div/div/div[2]/div/div/div/div[1]/div/span/div/a/img')))
except:
print("Search Failed! Check u network statue")
# 将第一个搜索结果视为目标搜索结果
self.driver.find_element(By.XPATH,'//*[@id="SearchMain"]/div/div/div/div/div[2]/div/div/div/div[1]/div/span/div/a/img').click()
# 切换到目的用户主页的句柄
all_handles = self.driver.window_handles
user_home_Handle = all_handles[1]
# 始终保持最多仅有两个窗口,一个是正在处理的,一个是处理完当前串口需要返回的
self.driver.close()
self.driver.switch_to.window(user_home_Handle)
# 切换窗口后,再次使用该语句,可以新打开的窗口解决元素在headless模式下不可见的问题 # 看着晃眼睛,而且现在我质疑这条语句的有效性,先删掉
# self.driver.set_window_size(1920, 1080)
# self.driver.maximize_window()
return self.driver.current_url
User_ZhiHu类中的user_basic_information_collection函数,最终的信息存储在 user_information = {}这个字典中,界面没有显示的个人信息视为用户未设置,其值设为’Not Found’
def user_basic_information_collection(self,output_filename):
user_information = {}
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//*[@id="ProfileHeader"]/div/div[2]/div/div[2]/div[1]/h1/span[1]')))
except:
print("Search Failed! Check u network statue")
# xpath value is not //*[@id="ProfileHeader"]/div/div[2]/div/div[2]/div[1]/h1/span[1]/text()
username = self.driver.find_element(By.XPATH,'//*[@id="ProfileHeader"]/div/div[2]/div/div[2]/div[1]/h1/span[1]').text
user_information['用户名'] = username
####################获取一句话介绍#######################
declaration = self.driver.find_element(By.XPATH,'//*[@id="ProfileHeader"]/div/div[2]/div/div[2]/div[1]/h1/span[2]').text
if declaration == '':
declaration = 'empty'
user_information['一句话介绍'] = declaration
####################获取用户性别#######################
try:
self.driver.find_element(By.CSS_SELECTOR,'svg.Zi.Zi--Male').get_attribute("class")
gender = 'Male'
except:
try:
self.driver.find_element(By.CSS_SELECTOR,'svg.Zi.Zi--Female').get_attribute("class")
gender = 'Female'
except:
gender = 'Not Found'
user_information['性别'] = gender
####################查找'居住地', '所在行业','职业经历','个人简介'等信息#######################
# 点击“查看详细资料”
self.driver.find_element(By.XPATH,'//*[@id="ProfileHeader"]/div/div[2]/div/div[2]/div[3]/button').click()
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//*[@id="ProfileHeader"]/div/div[2]/div/div[2]/div[2]/div/div')))
except:
print("Search Failed! Check u network statue")
elements = self.driver.find_elements(By.CSS_SELECTOR,'div.ProfileHeader-detailItem')
# labels = []
# 遍历每个元素,并获取其子元素<span class="ProfileHeader-detailLabel">和<div class="ProfileHeader-detailValue">的inner HTML字符串
for element in elements:
label = element.find_element(By.CSS_SELECTOR,'span.ProfileHeader-detailLabel').text
raw_value = element.find_element(By.CSS_SELECTOR,'div.ProfileHeader-detailValue')
value = re.sub("<[^>]+>", "", raw_value.get_attribute("innerHTML"))
# labels.append(label)
if label in ['居住地', '所在行业','职业经历']:
if value == '':
value = 'empty'
user_information[label] = value
if label == '个人简介':
# 使用正则表达式去除所有尖括号及尖括号以内的内容re.sub(r'<[^>]*>', '', html)
personal_profile = re.sub(r'<[^>]*>', '', self.driver.find_element(By.CSS_SELECTOR,'div.ztext.ProfileHeader-detailValue').get_attribute("innerHTML"))
user_information[label] = personal_profile
# print(labels)
user_information_tag_list = ['用户名','性别','一句话介绍','居住地','所在行业','职业经历','个人简介']
for user_information_tag in user_information_tag_list:
if user_information_tag not in user_information:
user_information[user_information_tag] = 'Not Found'
三、社交关系信息
所有关注人和粉丝(如果关注人数量或者粉丝数量超过10,则只采集前10个),每个人的信息包括用户昵称、链接地址、回答问题数、文章数、关注者人数。
main函数
if __name__ == '__main__':
WeiBo_usr = '3247842625@qq.com'
WeiBo_pwd = 'irontys'
login_url = 'https://www.zhihu.com'
Login = Login_ZhiHu(login_url,chrome_ports[0])
Login.cookie_login()
driver_ZhiHu = Login.prepared_drive()
# 用于用户信息搜集的类
User = User_ZhiHu(driver_ZhiHu)
# 该用户只是我随机找的,如有冒犯,万分抱歉
username_ZhiHu = '孟冬石榴'
home_page_url = User.goto_user_home_page(username_ZhiHu)
User.user_basic_information_collection('basic_information.txt')
following_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[1]'
User.user_relationship_information_collection(following_XPATH,'followings','followings_information.txt')
User_ZhiHu类中的user_relationship_information_collection函数
# 关注的人和粉丝
# followers_information存储关注的人或粉丝所有信息,是个字典数组,有效则"isEmpty"= 0,否则为1
def user_relationship_information_collection(self,follower_XPATH,followings_or_followers,output_filename):
# 点击关注按钮(ps:这里如果是需要搜集“关注该用户的人“,是不需要进行下面这条语句的,但是执行搜索“该用户关注的人”的信息时,又是只需要下面这条语句,而不需要下下条语句,但是为了调用接口函数的统一性,这两条语句都写在这里)
self.driver.find_element(By.XPATH,'//*[@id="ProfileMain"]/div[1]/ul/li[9]/a').click()
# 点击“该用户关注的人”或者“关注该用户的人”按钮
self.driver.find_element(By.XPATH,follower_XPATH).click()
# 点击后若不刷新界面,受反爬机制的的影响,页面不能正常显示,此处通过刷新页面的方法绕过反扒机制
self.driver.refresh()
# 分类分别搜集关注着和粉丝信息的情况,并统计关注着和粉丝的总数
followings_or_followers_count = self.driver.find_elements(By.XPATH,'//strong[@class="NumberBoard-itemValue"]')
if followings_or_followers == 'followings':
followers_count = int(followings_or_followers_count[0].get_attribute("title"))
else:
followers_count = int(followings_or_followers_count[1].get_attribute("title"))
# 设置搜集信息是的有效采集信息条数
# 如果粉丝或关注的人数量多于10,取10
# 否则取相应数目
valid_followers_count = min(followers_count,10)
# followers_information存储关注的人或粉丝所有信息,是个字典数组,有效则"isEmpty"= 0,否则为1
followers_information = [{"isEmpty": 0} for i in range(10)]
if valid_followers_count == 0:
followers_information = [{"isEmpty": 1} for i in range(10)]
else:
for i in range(valid_followers_count,10):
followers_information[i]['isEmpty'] = 1
follower_information_tag_list = ['用户昵称','链接地址','回答数','文章数','关注者数']
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
except:
print("Search Failed! Check u network statue")
# 定位一个用户信息栏
list_items = self.driver.find_elements(By.XPATH,'//div[@class="List-item"]')
# 从这里开始我才开始了解xpath的细节,而不是再复制源码中的xpath值..
index = 0
for list_item in list_items[0:valid_followers_count]:
# name和链接地址在同一个a标签中
follower_href_and_name = list_item.find_element(By.XPATH,'.//span[@class="UserLink"]/div/a')
followers_information[index]['用户昵称'] = follower_href_and_name.text
followers_information[index]['链接地址'] = follower_href_and_name.get_attribute("href")
# 是当前follower的[{'回答':'62' },{'文章':'1'},{'关注者':'151'}]信息
followers_answers_articles_followers = list_item.find_elements(By.XPATH,'.//div[@class="ContentItem-status"]/span')
tag_list = ['回答','文章','关注者']
for tag in tag_list:
followers_information[index][tag+'数'] = '0'
for follower_answers_articles_followers in followers_answers_articles_followers:
if tag == follower_answers_articles_followers.text.split(' ')[1]:
followers_information[index][tag+'数'] = follower_answers_articles_followers.text.split(' ')[0]
index += 1
四、动态信息
(一)本文在多线程加速方面做了许多尝试:
但是最终确定的使用版本是最后一版的多线程加速,前面几条仅是在讲述尝试过程
1.所有的信息处理按顺序执行
这部分代码实现了顺序执行所有信息处理函数,其中:回答信息处理、提问信息处理对于每一个帖子,都将其点击开,在新打开的网页处理每一个回答或提问所包含的信息。
缺点:
回答信息处理、提问信息处理都比较耗时间,而提问信息处理必须等待回答信息处理完毕后,才能够开始进行,非常耗时间:代码运行时间135.39s
main函数:
if __name__ == '__main__':
WeiBo_usr = '3247842625@qq.com'
WeiBo_pwd = 'irontys'
# 清空意欲写入的目标文件
output_filenames = ['basic_information.txt','followings_information.txt','followers_information.txt','answers_information.txt','asks_information.txt']
for output_filename in output_filenames:
with open(output_filename, "w", encoding='utf-8') as in_file:
in_file.write('')
# 'https://zhihu.com'
login_url = 'https://www.zhihu.com'
Login = Login_ZhiHu(login_url,chrome_ports[0])
# 由于直接账号密码登录会被识别出来使用自动化工具,使用第三方登录来绕过反爬
# Login.third_party_WeiBo_login(WeiBo_usr,WeiBo_pwd)
# # 将登录后的得到的cookie保存下来,以便后续再次登录使用
# Login.sign_cookie()
Login.cookie_login()
driver_ZhiHu = Login.prepared_drive()
User = User_ZhiHu(driver_ZhiHu)
# 该用户只是我随机找的,如有冒犯,万分抱歉
username_ZhiHu = '孟冬石榴'
home_page_url = User.goto_user_home_page(username_ZhiHu)
# 搜集基本信息
User.user_basic_information_collection('basic_information.txt')
# 搜集关注的人的信息
following_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[1]'
User.user_relationship_information_collection(following_XPATH,'followings','followings_information.txt')
# 搜集粉丝的信息
follower_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[2]'
User.user_relationship_information_collection(follower_XPATH,'followers','followers_information.txt')
# 搜集回答信息
answers_href_list = User.user_answers_information_collection()
# 搜集提问信息
answers_href_list = User.user_asks_information_collection()
driver_ZhiHu.close()
User_ZhiHu中的user_answers_information_collection和answers_information_collection,此处对于每个回答对应的帖子,打开一个网页,处理信息后关闭该网页。
def answers_information_collection(self,answers_information,index):
# 切换到新打开的回答问题的页面的句柄
all_handles = self.driver.window_handles
user_home_Handle = all_handles[0]
new_answer_page = all_handles[1]
self.driver.switch_to.window(new_answer_page)
self.driver.maximize_window()
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="ContentItem-time"]/a/span')))
except:
print("Search Failed! Check u network statue")
# time_raw 是一个列表:['发布于' '年-月-日' '时:分']
# time_raw = self.driver.find_element(By.XPATH,'//div[@class="ContentItem-time"]/a/span').get_attribute("aria-label").split(' ')
time_raw = self.driver.find_element(By.XPATH,'//div[@class="ContentItem-time"]/a/span').text.split(' ')
# ['发帖时间','发帖内容','评论次数','点赞次数','评论信息']
answers_information[index]['发帖时间'] = time_raw[1] + '-' + time_raw[2]
# 回答内容的HTML代码,因为不确定内容中是否有图片等内容,后续计划将这部分内容使用flask呈现出来
# 若为视频回答,则使用except的代码
ask_title = self.driver.find_element(By.XPATH,'//h1[@class="QuestionHeader-title"]').text
try:
answers_information[index]['发帖内容'] = ask_title + '\n' + self.driver.find_element(By.XPATH,'//*[@id="root"]/div/main/div/div/div[3]/div[1]/div/div[2]/div/div/div/div/div[2]/span[1]/div/div/span').text
except:
# 若为视频回答,则使用except的代码
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//img[@class="css-lawu0e"]')))
except:
print("Search Failed! Check u network statue")
answers_information[index]['发帖内容'] = ask_title + '\n' + self.driver.find_element(By.XPATH,'//img[@class="css-lawu0e"]').get_attribute("src")
comment_description = self.driver.find_element(By.XPATH,'//button[@class="Button ContentItem-action FEfUrdfMIKpQDJDqkjte Button--plain Button--withIcon Button--withLabel fEPKGkUK5jyc4fUuT0QP B46v1Ak6Gj5sL2JTS4PY RuuQ6TOh2cRzJr6WlyQp"]').text
if comment_description == '添加评论':
answers_information[index]['评论次数'] = '0'
else:
answers_information[index]['评论次数'] = comment_description.split(' ')[0]
# 经验教训:下面注释掉的4行代码使用的xpath完全不如跟具class定位的xpath要精确,有的页面注释掉的部分的xpath值会有些变动
answers_information[index]['点赞次数'] = self.driver.find_element(By.XPATH,'//button[@class="Button VoteButton VoteButton--up FEfUrdfMIKpQDJDqkjte"]').get_attribute("aria-label").split(' ')[1]
####### 评论 # 有的文章过长会出现固定在页面下方的评论框,并弹窗出现评论
if comment_description == '添加评论':
comments_information = [{"isEmpty": 1} for i in range(10)]
else:
comments_information = [{"isEmpty": 0} for i in range(10)]
self.driver.find_element(By.XPATH,'//button[@class="Button ContentItem-action FEfUrdfMIKpQDJDqkjte Button--plain Button--withIcon Button--withLabel fEPKGkUK5jyc4fUuT0QP B46v1Ak6Gj5sL2JTS4PY RuuQ6TOh2cRzJr6WlyQp"]').click()
# 等待至第一个评论加载出来 # 有的评论是js脚本加载出的弹窗处理与下述代码有异需要做特殊处理
unfold_comment_buttons = self.driver.find_elements(By.CLASS_NAME, 'Button--secondary')
for button in unfold_comment_buttons:
self.driver.execute_script("arguments[0].click();", button)
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="CommentContent css-1ygdre8"]')))
except:
print("Search Failed! Check u network statue")
comments_content = self.driver.find_elements(By.XPATH, '//div[@class="CommentContent css-1ygdre8"]')
comments_times = self.driver.find_elements(By.XPATH,'//span[@class="css-12cl38p"]')
comments_href_and_usernames =self.driver.find_elements(By.XPATH,'//a[@class="css-1rd0h6f"]')
comments_likes = self.driver.find_elements(By.CSS_SELECTOR, '.Button--plain.Button--grey.Button--withIcon.Button--withLabel.css-h1yvwn')
all_comments_count = len(comments_likes)
valid_comment_count = 10
if all_comments_count < 10:
for i in range(all_comments_count,10):
comments_information[i]['isEmpty'] = 1
valid_comment_count = all_comments_count
for i in range(0,valid_comment_count):
# 评论人ID、评论人昵称、评论时间、评论内容、点赞次数
comments_information[i]['评论人昵称'] = comments_href_and_usernames[i].text
if comments_href_and_usernames[i].text != '匿名用户':
comments_information[i]['评论人ID'] = comments_href_and_usernames[i].get_attribute("href").split('/')[-1]
else:
comments_information[i]['评论人ID'] = 'Not Found because of anonymous'
comments_information[i]['评论时间'] = comments_times[i].text
comments_information[i]['评论内容'] = comments_content[i].text
if comments_likes[i].text == '赞':
comments_information[i]['点赞次数'] = '0'
else:
comments_information[i]['点赞次数'] = comments_likes[i].text
answers_information[index]['评论信息'] = comments_information
def user_answers_information_collection(self):
# 处理回答数为0的特殊情况:
answers_tag_in_this_list = self.driver.find_elements(By.XPATH, '//a[@class="Tabs-link"]')
for answers_tag in answers_tag_in_this_list:
if answers_tag.get_attribute("href").split('/')[-1] == 'answers' and answers_tag.find_element(By.XPATH, './/span').text != '':
answers_tag_clickable = answers_tag
answers_count = int(answers_tag.find_element(By.XPATH, './/span').text)
break
answers_tag.click() # 这个click方法不好
self.driver.refresh()
answers_information = [{"isEmpty": 0} for i in range(10)]
valid_answers_count = min(answers_count,10)
if valid_answers_count == 0:
print("No Answers!!")
else:
# 等待加载出回答
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
except:
print("Search Failed! Check u network statue")
# 获取当前页面所有回答的element列表
answers_pages = self.driver.find_elements(By.XPATH, '//a[@data-za-detail-view-element_name="Title"]')
answer_index = 0
for answer_page in answers_pages[:valid_answers_count]:
answer_page.click()
self.answers_information_collection(answers_information,answer_index)
answer_index += 1
for i in range(valid_answers_count,10):
answers_information[i]['isEmpty'] = 1
User_ZhiHu中的user_asks_information_collection和asks_information_collection,此处对于每个回答对应的帖子,打开一个网页,处理信息后关闭该网页。
def user_asks_information_collection(self):
asks_tag_in_this_list = self.driver.find_elements(By.XPATH, '//a[@class="Tabs-link"]')
# 这里会有个空字符串是我妹能理解的
for asks_tag in asks_tag_in_this_list:
if asks_tag.get_attribute("href").split('/')[-1] == 'asks' and asks_tag.find_element(By.XPATH, './/span').text != '':
asks_tag_clickable = asks_tag
asks_count = int(asks_tag.find_element(By.XPATH, './/span').text)
break
self.driver.execute_script("arguments[0].click();", asks_tag_clickable)
# asks_tag_clickable.click()
self.driver.refresh()
asks_information = [{"isEmpty": 1} for i in range(10)]
valid_asks_count = min(asks_count,10)
if valid_asks_count == 0:
print("No Asks!!")
else:
# 等待加载出回答
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
except:
print("Search Failed! Check u network statue")
# 获取当前页面所有回答的element列表
asks_list_items = self.driver.find_elements(By.XPATH, '//div[@class="List-item"]')
ask_index = 0
for asks_list_item in asks_list_items[0:valid_asks_count]:
asks_information[ask_index]['isEmpty'] = 0
asks_time_answers_followers = asks_list_item.find_elements(By.XPATH,'.//span[@class="ContentItem-statusItem"]')
asks_information[ask_index]['提问时间'] = asks_time_answers_followers[0].text
answers_count = asks_time_answers_followers[1].text.split(' ')[0]
asks_information[ask_index]['回答数'] = answers_count
asks_information[ask_index]['关注人数'] = asks_time_answers_followers[2].text.split(' ')[0]
asks_list_item.find_element(By.XPATH, './/a[@data-za-detail-view-name="Title"]').click()
self.asks_information_collection(asks_information,ask_index,answers_count)
ask_index += 1
def asks_information_collection(self,asks_information,index,answers_count):
all_handles = self.driver.window_handles
user_home_Handle = all_handles[0]
new_answer_page = all_handles[1]
self.driver.switch_to.window(new_answer_page)
# 避免页面中有未能显示出的内容,使其最大化
self.driver.maximize_window()
try:
# 依据关注问题按钮问题标题是否出现判断页面加载完全(感觉其实是有问题的)
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//h1[@class="QuestionHeader-title"]')))
except:
print("Search Failed! Check u network statue")
try:
self.driver.find_element(By.XPATH,'//Button[@class="Button QuestionRichText-more FEfUrdfMIKpQDJDqkjte Button--plain fEPKGkUK5jyc4fUuT0QP"]').click()
except:
do_nothing = 1
# 有的是视频回答
try:
ask_content = self.driver.find_element(By.XPATH,'//span[@class="RichText ztext css-1g0fqss"]').text
except:
ask_content = ''
ask_title = [title.text for title in self.driver.find_elements(By.XPATH,'//h1[@class="QuestionHeader-title"]') if title.text != ''][0]
asks_information[index]['提问内容'] = ask_title + '\n' + ask_content
asks_tags = ['提问时间','回答数','关注人数','提问内容']
ask_answers_information = [{"isEmpty": 0} for i in range(10)]
answers_count = int(answers_count)
valid_answers_count = min(answers_count,10)
# 使最少加载出valid_answer_count数量的回答
try:
# 依据关注问题按钮问题标题是否出现判断页面加载完全(感觉其实是有问题的)
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
except:
print("Search Failed! Check u network statue")
# 知乎的提问中,如果提问的回答内容过多过长,则可能不能够一次性加载出全部所需要统计的数量的问题
# 通过以下代码将界面滚动直至出现所需数量的回答
ask_answers = self.driver.find_elements(By.XPATH,'//div[@class="List-item"]')
ask_answers_count = len(ask_answers)
# Scroll down until the count is at least 10
while ask_answers_count < valid_answers_count:
# Scroll by 100 pixels
self.driver.execute_script("window.scrollBy(0, 100);")
# Get the updated ask_answers_count of div elements with class "List-item"
ask_answers = self.driver.find_elements(By.XPATH,'//div[@class="List-item"]')
ask_answers_count = len(ask_answers)
if valid_answers_count == 0:
print("No Ask_answers!!")
ask_answer_tags = ['回答人ID','回答人昵称','回答时间','回答内容','点赞次数']
ask_answers_index = 0
# 回答数为零不会执行for循环的内容
for ask_answer in ask_answers[0:valid_answers_count]:
ask_answers_information[ask_answers_index]['回答人ID'] = ask_answer.find_element(By.XPATH,'.//div[@class="css-1gomreu"]/a').get_attribute("href").split('/')[-1]
ask_answers_information[ask_answers_index]['回答人昵称'] = ask_answer.find_elements(By.XPATH,'.//div[@class="css-1gomreu"]/a')[1].text
# 有时会报错?
# 有一些多余的list items
# 确实是有问题的,parallel_asks_information_collection中的思路是对的,我在这里不想继续修改了
try:
WebDriverWait(self.driver, 50).until(EC.presence_of_element_located((By.XPATH, '//div[@class="ContentItem-time"]/a')))
except:
print("Search Failed! Check u network statue")
try:
raw_time = ask_answer.find_element(By.XPATH,'.//div[@class="ContentItem-time"]/a').text.split(' ')
except:
continue
ask_answers_information[ask_answers_index]['回答时间'] = raw_time[1] + '-' + raw_time[2]
ask_answers_information[ask_answers_index]['回答内容'] = ask_answer.find_element(By.XPATH,'.//span[@class="RichText ztext CopyrightRichText-richText css-1g0fqss"]').text
try:
ask_answers_information[ask_answers_index]['点赞次数'] = ask_answer.find_element(By.XPATH,'.//Button[@class="Button VoteButton VoteButton--up FEfUrdfMIKpQDJDqkjte"]').text.split(' ')[1]
except:
ask_answers_information[ask_answers_index]['点赞次数'] = '0'
with open('asks_information.txt', 'a', encoding='utf-8') as f:
f.write('>>第'+ str(ask_answers_index) + "条回答:" +'\n')
for tag in ask_answer_tags:
f.write(tag + ":" + ask_answers_information[ask_answers_index][tag] + '\n')
ask_answers_index += 1
self.driver.close()
self.driver.switch_to.window(user_home_Handle)
2.将所有的信息处理统统并行(X)
并行处理关注的人和粉丝信息处理、回答信息处理、提问信息处理,其中后两个对于每个帖子都打开一个新网页,对二十个新打开的网页再做并行处理,即意欲开启24个线程。
(1)缺点
24个线程同时进行会导致信息的处理特别的慢,而且像基础信息处理、关注的人和粉丝信息处理都处理得比较快,后20个新打开的回答信息处理、提问信息处理的子页面又在回答信息处理、提问信息处理两个线程结束后才会启动,导致信息处理的时间甚至不如所有处理函数顺序执行,但事实上尝试了一次,貌似要快一些,运行时间:116.09秒
提速仅10%左右
main函数
if __name__ == '__main__':
start_time = time.time()
WeiBo_usr = '3247842625@qq.com'
WeiBo_pwd = 'irontys'
# 'https://zhihu.com'
login_url = 'https://www.zhihu.com'
Login = Login_ZhiHu(login_url,chrome_ports[0])
# 由于直接账号密码登录会被识别出来使用自动化工具,使用第三方登录来绕过反爬
# Login.third_party_WeiBo_login(WeiBo_usr,WeiBo_pwd)
# # 将登录后的得到的cookie保存下来,以便后续再次登录使用
# Login.sign_cookie()
Login.cookie_login()
driver_ZhiHu = Login.prepared_drive()
User = User_ZhiHu(driver_ZhiHu)
# 该用户只是我随机找的,如有冒犯,万分抱歉
username_ZhiHu = '孟冬石榴'
home_page_url = User.goto_user_home_page(username_ZhiHu)
# 搜集基本信息
User.user_basic_information_collection('basic_information.txt')
driver_ZhiHu.close()
functions = [do_parallel.follower,do_parallel.following,do_parallel.answers,do_parallel.asks]
threads = [threading.Thread(target=function,args=()) for function in functions]
# 启动四个线程
start = [thread.start() for thread in threads]
# 等待四个线程结束
join = [thread.join() for thread in threads]
end_time = time.time()
print("代码执行时间:{:.2f}秒".format(end_time - start_time))
执行并行操作的paralle类
class parallel():
def __init__(self,home_page_url):
self.home_page_url = home_page_url
self.half_window_width = 1936 //2
self.half_window_height = 1056//2
# 关注者
def get_driver(self,page_url,chrome_port,position_x,position_y,width,height):
Login_user_home_page = Login_ZhiHu(page_url,chrome_port)
Login_user_home_page.cookie_login()
Login_user_home_page_driver = Login_user_home_page.prepared_drive()
Login_user_home_page_driver.set_window_position(int(window_position['x'])+position_x, position_y)
Login_user_home_page_driver.set_window_size(width, height)
return Login_user_home_page_driver
def follower(self):
follower_driver = self.get_driver(self.home_page_url,chrome_ports[1],0,0,self.half_window_width,self.half_window_height)
following_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[1]'
User_ZhiHu(follower_driver).user_relationship_information_collection(following_XPATH,'followings','followings_information.txt')
follower_driver.close()
def following(self):
driver = self.get_driver(self.home_page_url,chrome_ports[2],self.half_window_width,0,self.half_window_width,self.half_window_height)
follower_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[2]'
User_ZhiHu(driver).user_relationship_information_collection(follower_XPATH,'followers','followers_information.txt')
driver.close()
def answers(self):
driver = self.get_driver(self.home_page_url,chrome_ports[3],0,self.half_window_height,self.half_window_width,self.half_window_height)
answers_href_list = User_ZhiHu(driver).parallel_user_answers_information_collection()
driver.close()
answers_chrome_port = [str(i) for i in range(9233,9243)]
if len(answers_href_list) > 0 :
width = (self.half_window_width*2)/10
args_list = [( answers_href_list[i],answers_chrome_port[i],width*i,0,width,self.half_window_height,i,'answers_information.txt') for i in range(len(answers_href_list))]
with ThreadPoolExecutor(max_workers=len(answers_href_list)) as executor:
executor.map(self.para_answers, *zip(*args_list))
def para_answers(self,answers_href,answers_chrome,pos_x,pos_y,width,height,answer_index,output_filename):
driver = self.get_driver(answers_href,answers_chrome,pos_x,pos_y,width,height)
User_ZhiHu(driver).parallel_answers_information_collection(answer_index,output_filename)
def asks(self):
print("start")
driver = self.get_driver(self.home_page_url,chrome_ports[4],self.half_window_width,self.half_window_height,self.half_window_width,self.half_window_height)
# driver = self.get_driver(self.home_page_url,chrome_ports[4],self.half_window_width,self.half_window_height,self.half_window_width,self.half_window_height)
asks_chrome_port = [str(i) for i in range(9243,9253)]
asks_href_list,asks_information = User_ZhiHu(driver).parallel_user_asks_information_collection()
driver.close()
if len(asks_href_list) > 0 :
width = (self.half_window_width*2)/10
args_list = [( asks_href_list[i],asks_chrome_port[i],width*i,self.half_window_height,width,self.half_window_height,asks_information,i,asks_information[i]['回答数'],'asks_information.txt') for i in range(len(asks_href_list))]
with ThreadPoolExecutor(max_workers=len(asks_href_list)) as executor:
executor.map(self.para_asks, *zip(*args_list))
def para_asks(self,asks_href,asks_chrome,pos_x,pos_y,width,height,asks_information,ask_index,answers_count,output_filename):
driver = self.get_driver(asks_href,asks_chrome,pos_x,pos_y,width,height)
User_ZhiHu(driver).parallel_asks_information_collection(asks_information,ask_index,answers_count,output_filename)
并行类的操作需要用到的相关函数:parallel_answers_information_collection、parallel_user_answers_information_collection、parallel_asks_information_collection、parallel_user_asks_information_collection
与上述顺序执行的代码差别不是很大,有些地方有不同,基本信息处理函数、关注的人和粉丝信息的处理函数都与上一个顺序执行的代码相同。
def parallel_answers_information_collection(self,index,output_filename):
answers_information = [{"isEmpty": 0} for i in range(10)]
try:
# 有的页面的这个xpath值和大多数的不一样。
# 等待时间所在的xpath值出现
# WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//*[@id="root"]/div/main/div/div/div[3]/div[1]/div/div[2]/div/div/div/div/div[2]/div[1]/div/a/span')))
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="ContentItem-time"]/a/span')))
except:
print("Search Failed! Check u network statue")
# 回答不分是否有推荐提问的人干扰xpath的情况 # 当有推荐提问的人时,会出现额外的干扰项的class = List-item的标签
# time_raw 是一个列表:['发布于' '年-月-日' '时:分']
# time_raw = self.driver.find_element(By.XPATH,'//div[@class="ContentItem-time"]/a/span').get_attribute("aria-label").split(' ')
time_raw = self.driver.find_element(By.XPATH,'//div[@class="ContentItem-time"]/a/span').text.split(' ')
# ['发帖时间','发帖内容','评论次数','点赞次数','评论信息']
answers_information[index]['发帖时间'] = time_raw[1] + '-' + time_raw[2]
# 回答内容的HTML代码,因为不确定内容中是否有图片等内容,后续计划将这部分内容使用flask呈现出来
# 若为视频回答,则使用except的代码
ask_title = self.driver.find_element(By.XPATH,'//h1[@class="QuestionHeader-title"]').text
try:
answers_information[index]['发帖内容'] = ask_title + '\n' + self.driver.find_element(By.XPATH,'//*[@id="root"]/div/main/div/div/div[3]/div[1]/div/div[2]/div/div/div/div/div[2]/span[1]/div/div/span').text
except:
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//img[@class="css-lawu0e"]')))
except:
print("Search Failed! Check u network statue")
answers_information[index]['发帖内容'] = ask_title + '\n' + self.driver.find_element(By.XPATH,'//img[@class="css-lawu0e"]').get_attribute("src")
comment_description = self.driver.find_element(By.XPATH,'//button[@class="Button ContentItem-action FEfUrdfMIKpQDJDqkjte Button--plain Button--withIcon Button--withLabel fEPKGkUK5jyc4fUuT0QP B46v1Ak6Gj5sL2JTS4PY RuuQ6TOh2cRzJr6WlyQp"]').text
if comment_description == '添加评论':
answers_information[index]['评论次数'] = '0'
else:
answers_information[index]['评论次数'] = comment_description.split(' ')[0]
# 经验教训:下面注释掉的4行代码使用的xpath完全不如跟具class定位的xpath要精确,有的页面注释掉的部分的xpath值会有些变动
answers_information[index]['点赞次数'] = self.driver.find_element(By.XPATH,'//button[@class="Button VoteButton VoteButton--up FEfUrdfMIKpQDJDqkjte"]').get_attribute("aria-label").split(' ')[1]
####### 评论 # 有的文章过长会出现固定在页面下方的评论框,并弹窗出现评论
if comment_description == '添加评论':
comments_information = [{"isEmpty": 1} for i in range(10)]
else:
comments_information = [{"isEmpty": 0} for i in range(10)]
self.driver.find_element(By.XPATH,'//button[@class="Button ContentItem-action FEfUrdfMIKpQDJDqkjte Button--plain Button--withIcon Button--withLabel fEPKGkUK5jyc4fUuT0QP B46v1Ak6Gj5sL2JTS4PY RuuQ6TOh2cRzJr6WlyQp"]').click()
# 等待至第一个评论加载出来 # 有的评论是js脚本加载出的弹窗处理与下述代码有异需要做特殊处理
unfold_comment_buttons = self.driver.find_elements(By.CLASS_NAME, 'Button--secondary')
for button in unfold_comment_buttons:
self.driver.execute_script("arguments[0].click();", button)
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="CommentContent css-1ygdre8"]')))
except:
print("Search Failed! Check u network statue")
comments_content = self.driver.find_elements(By.XPATH, '//div[@class="CommentContent css-1ygdre8"]')
comments_times = self.driver.find_elements(By.XPATH,'//span[@class="css-12cl38p"]')
comments_href_and_usernames =self.driver.find_elements(By.XPATH,'//a[@class="css-1rd0h6f"]')
comments_likes = self.driver.find_elements(By.CSS_SELECTOR, '.Button--plain.Button--grey.Button--withIcon.Button--withLabel.css-h1yvwn')
all_comments_count = len(comments_likes)
valid_comment_count = 10
if all_comments_count < 10:
for i in range(all_comments_count,10):
comments_information[i]['isEmpty'] = 1
valid_comment_count = all_comments_count
for i in range(0,valid_comment_count):
# 评论人ID、评论人昵称、评论时间、评论内容、点赞次数
comments_information[i]['评论人昵称'] = comments_href_and_usernames[i].text
if comments_href_and_usernames[i].text != '匿名用户':
comments_information[i]['评论人ID'] = comments_href_and_usernames[i].get_attribute("href").split('/')[-1]
else:
comments_information[i]['评论人ID'] = 'Not Found because of anonymous'
comments_information[i]['评论时间'] = comments_times[i].text
comments_information[i]['评论内容'] = comments_content[i].text
if comments_likes[i].text == '赞':
comments_information[i]['点赞次数'] = '0'
else:
comments_information[i]['点赞次数'] = comments_likes[i].text
answers_information[index]['评论信息'] = comments_information
self.driver.close()
def parallel_user_answers_information_collection(self):
# 处理回答数为0的特殊情况:
answers_tag_in_this_list = self.driver.find_elements(By.XPATH, '//a[@class="Tabs-link"]')
for answers_tag in answers_tag_in_this_list:
if answers_tag.get_attribute("href").split('/')[-1] == 'answers' and answers_tag.find_element(By.XPATH, './/span').text != '':
answers_tag_clickable = answers_tag
answers_count = int(answers_tag.find_element(By.XPATH, './/span').text)
break
# answers_tag.click()
self.driver.execute_script("arguments[0].click();", answers_tag)
self.driver.refresh()
valid_answers_count = min(answers_count,10)
if valid_answers_count == 0:
print("No Answers!!")
answers_href_list = []
else:
# 等待加载出回答
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
# WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//a[@data-za-detail-view-element_name="Title"]')))
except:
print("Search Failed! Check u network statue")
# 获取当前页面所有回答的element列表
answers_href_list = [element.get_attribute("href") for element in self.driver.find_elements(By.XPATH, '//a[@data-za-detail-view-element_name="Title"]')[0:valid_answers_count]]
return answers_href_list[0:10]
def parallel_asks_information_collection(self,asks_information,index,answers_count,output_filename):
try:
# 依据关注问题按钮问题标题是否出现判断页面加载完全(感觉其实是有问题的)
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//h1[@class="QuestionHeader-title"]')))
except:
print("Search Failed! Check u network statue")
# 可能会有点击阅读全文的按钮
try:
self.driver.find_element(By.XPATH,'//Button[@class="Button QuestionRichText-more FEfUrdfMIKpQDJDqkjte Button--plain fEPKGkUK5jyc4fUuT0QP"]').click()
except:
do_nothing = 1
# 排除视频回答
try:
ask_content = self.driver.find_element(By.XPATH,'//span[@class="RichText ztext css-1g0fqss"]').text
except:
ask_content = ''
ask_title = [title.text for title in self.driver.find_elements(By.XPATH,'//h1[@class="QuestionHeader-title"]') if title.text != ''][0]
asks_information[index]['提问内容'] = ask_title + '\n' + ask_content
asks_tags = ['提问时间','回答数','关注人数','提问内容']
ask_answers_information = [{"isEmpty": 0} for i in range(10)]
answers_count = int(answers_count)
valid_answers_count = min(answers_count,10)
if valid_answers_count > 0:
try:
# 依据关注问题按钮问题标题是否出现判断页面加载完全(感觉其实是有问题的)
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item" and @tabindex="0"]')))
except:
print("Search Failed! Check u network statue")
ask_answers = self.driver.find_elements(By.XPATH,'//div[@class="List-item" and @tabindex="0"]')
ask_answers_count = len(ask_answers)
while ask_answers_count < valid_answers_count:
# Scroll by 100 pixels
self.driver.execute_script("window.scrollBy(0, -30);")
self.driver.execute_script("window.scrollBy(0, 100);")# self.driver.execute_script("window.scrollTo(0,document.body.scrollHeight)")
ask_answers = self.driver.find_elements(By.XPATH,'//div[@class="List-item" and @tabindex="0"]')
ask_answers_count = len(ask_answers)
if (ask_answers_count == answers_count):
break
ask_answer_tags = ['回答人ID','回答人昵称','回答时间','回答内容','点赞次数']
ask_answers_index = 0
for ask_answer in ask_answers[0:valid_answers_count]:
ask_answers_information[ask_answers_index]['回答人ID'] = ask_answer.find_element(By.XPATH,'.//div[@class="css-1gomreu"]/a').get_attribute("href").split('/')[-1]
ask_answers_information[ask_answers_index]['回答人昵称'] = ask_answer.find_elements(By.XPATH,'.//div[@class="css-1gomreu"]/a')[1].text
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="ContentItem-time"]/a')))
except:
print("Search Failed! Check u network statue")
try:
raw_time = ask_answer.find_element(By.XPATH,'.//div[@class="ContentItem-time"]/a').text.split(' ')
except:
continue
ask_answers_information[ask_answers_index]['回答时间'] = raw_time[1] + '-' + raw_time[2]
ask_answers_information[ask_answers_index]['回答内容'] = ask_answer.find_element(By.XPATH,'.//span[@class="RichText ztext CopyrightRichText-richText css-1g0fqss"]').text
try:
ask_answers_information[ask_answers_index]['点赞次数'] = ask_answer.find_element(By.XPATH,'.//Button[@class="Button VoteButton VoteButton--up FEfUrdfMIKpQDJDqkjte"]').text.split(' ')[1]
except:
ask_answers_information[ask_answers_index]['点赞次数'] = '0'
self.driver.close()
def parallel_user_asks_information_collection(self):
asks_tag_in_this_list = self.driver.find_elements(By.XPATH, '//a[@class="Tabs-link"]')
# 这里会有个空字符串是我妹能理解的
for asks_tag in asks_tag_in_this_list:
if asks_tag.get_attribute("href").split('/')[-1] == 'asks' and asks_tag.find_element(By.XPATH, './/span').text != '':
asks_tag_clickable = asks_tag
asks_count = int(asks_tag.find_element(By.XPATH, './/span').text)
break
self.driver.execute_script("arguments[0].click();", asks_tag_clickable)
# asks_tag_clickable.click() # 这个不如上面这个优,有时候点不到
self.driver.refresh()
asks_information = [{"isEmpty": 1} for i in range(10)]
valid_asks_count = min(asks_count,10)
if valid_asks_count == 0:
asks_href_list = []
else:
# 等待加载出回答
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
except:
print("Search Failed! Check u network statue")
# 获取当前页面所有回答的element列表
asks_list_items = self.driver.find_elements(By.XPATH, '//div[@class="List-item"]')
asks_href_list = [ask_list_item.find_element(By.XPATH, './/div[@class="QuestionItem-title"]/a').get_attribute("href") for ask_list_item in asks_list_items[0:valid_asks_count]]
ask_index = 0
for asks_list_item in asks_list_items[0:valid_asks_count]:
asks_information[ask_index]['isEmpty'] = 0
asks_time_answers_followers = asks_list_item.find_elements(By.XPATH,'.//span[@class="ContentItem-statusItem"]')
asks_information[ask_index]['提问时间'] = asks_time_answers_followers[0].text
answers_count = asks_time_answers_followers[1].text.split(' ')[0]
asks_information[ask_index]['回答数'] = answers_count
asks_information[ask_index]['关注人数'] = asks_time_answers_followers[2].text.split(' ')[0]
ask_index += 1
return asks_href_list,asks_information
3.提问信息和其他信息搜集两个线程并行
以下代码首先是优化了用户回答信息的采集方法,不再采用每个回答帖子打开一个网页采集信息的方式,而是在当前用户的回答主页采集所有的回答信息。
在程序找到了指定用户的用户主页后,创建一个子线程,搜集用户的提问信息。之所以创建这个线程来单独处理用户的提问信息,是因为用户的提问信息需要打开新页面才能获取一些必要的新信息打开页面的过程比较慢,而搜集用户的提问信息所需要的时间,与所有的其他信息搜集所用的时间大致相当,所以,设置两个线程,并行处理用户的其他信息的搜集工作和用户的提问信息搜集工作启动第二个线程处理用户的提问信息
代码执行时间:51.84秒,信息搜索效率提高了超过60%。(测试时明显感到网络延时相对前两次测试较大,在相同条件下,效率可能会更高)
本文最终实现的信息搜集是经过多线程加速的,先看main函数
if __name__ == '__main__':
WeiBo_usr = '3247842625@qq.com'
WeiBo_pwd = 'irontys'
login_url = 'https://www.zhihu.com'
Login = Login_ZhiHu(login_url,chrome_ports[0])
Login.cookie_login()
driver_ZhiHu = Login.prepared_drive()
# 用于用户信息搜集的类
User = User_ZhiHu(driver_ZhiHu)
# 该用户只是我随机找的,如有冒犯,万分抱歉
username_ZhiHu = '孟冬石榴'
home_page_url = User.goto_user_home_page(username_ZhiHu)
# 创建子线程,搜集用户的提问信息,之所以创建这个线程来单独处理用户的提问信息
# 是因为用户的提问信息需要打开新页面才能获取一些必要的新信息
# 打开页面的过程比较慢,而搜集用户的提问信息所需要的时间,与所有的其他信息搜集所用的时间
# 大致相当,所以,设置两个线程,并行处理用户的其他信息的搜集工作和用户的提问信息搜集工作
# 启动第二个线程处理用户的提问信息
thread_asks_information = threading.Thread(target=parallel(home_page_url).asks)
thread_asks_information.start()
# 搜集基本信息
User.user_basic_information_collection('basic_information.txt')
# 搜集关注的人的信息
following_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[1]'
User.user_relationship_information_collection(following_XPATH,'followings','followings_information.txt')
# 搜集粉丝的信息
follower_XPATH = '//*[@id="Profile-following"]/div[1]/h4/div/a[2]'
User.user_relationship_information_collection(follower_XPATH,'followers','followers_information.txt')
# 搜集回答信息
answers_href_list = User.parallel2_user_answers_information_collection()
driver_ZhiHu.close()
thread_asks_information.join()
处理用户的提问信息(子线程),User_ZhiHu类中的parallel2_user_answers_information_collection函数
def parallel2_user_answers_information_collection(self):
# 处理回答数为0的特殊情况:
#
answers_information = [{"isEmpty": 1} for i in range(10)]
answers_tag_in_this_list = self.driver.find_elements(By.XPATH, '//a[@class="Tabs-link"]')
for answers_tag in answers_tag_in_this_list:
if answers_tag.get_attribute("href").split('/')[-1] == 'answers' and answers_tag.find_element(By.XPATH, './/span').text != '':
answers_tag_clickable = answers_tag
answers_count = int(answers_tag.find_element(By.XPATH, './/span').text)
break
# answers_tag.click()
self.driver.execute_script("arguments[0].click();", answers_tag_clickable)
self.driver.refresh()
valid_answers_count = min(answers_count,10)
if valid_answers_count == 0:
print("No Answers!!")
else:
# 等待加载出回答
try:
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="List-item"]')))
except:
print("Search Failed! Check u network statue")
answers_list = self.driver.find_elements(By.XPATH,'//div[@class="List-item" and @tabindex="0"]')[0:valid_answers_count]
for i,answer in enumerate(answers_list):
answers_information[i]['isEmpty'] = 0
buttons = answer.find_elements(By.XPATH,'.//Button[@class="Button ContentItem-action FEfUrdfMIKpQDJDqkjte Button--plain Button--withIcon Button--withLabel fEPKGkUK5jyc4fUuT0QP B46v1Ak6Gj5sL2JTS4PY RuuQ6TOh2cRzJr6WlyQp"]')
if buttons[0].text != '添加评论':
answers_information[i]['评论次数'] = buttons[0].text.split(' ')[0]
self.driver.execute_script("arguments[0].click();", buttons[0])
remarks_list = answer.find_elements(By.CLASS_NAME, 'css-1frn93x')
try:
# WebDriverWait(answer, 100).until(EC.presence_of_element_located((By.XPATH, './/div[@class="css-1frn93x"]/div')))# 这个标签不行
WebDriverWait(answer, 100).until(EC.presence_of_element_located((By.XPATH, './/div[@class="css-14nvvry"]')))
except:
print("Search Failed! Check u network statue")
remarks_information = []
remarks_list = answer.find_elements(By.XPATH,'.//div[@class="css-14nvvry"]')
for remark in remarks_list:
try:
remarker_id = remark.find_element(By.XPATH,'.//a[@class="css-1rd0h6f"]').get_attribute("href").split('/')[-1]
remarker_username = remark.find_element(By.XPATH,'.//a[@class="css-1rd0h6f"]').text
remarker_time = remark.find_element(By.XPATH,'//*[@class="css-12cl38p"]').text
remarker_content = remark.find_element(By.XPATH,'.//div[@class="CommentContent css-1ygdre8"]').text
if remark.find_element(By.XPATH,'.//Button[@class="Button Button--plain Button--grey Button--withIcon Button--withLabel css-h1yvwn"]').text == '赞':
remarker_likes = '0'
else:
remarker_likes = remark.find_element(By.XPATH,'.//Button[@class="Button Button--plain Button--grey Button--withIcon Button--withLabel css-h1yvwn"]').text
remarker_info = {"isEmpty":0,"评论人ID":remarker_id,"评论者昵称":remarker_username,"评论时间":remarker_time,"评论内容":remarker_content,"点赞次数": remarker_likes}
if remarker_info not in remarks_information:
remarks_information.append(remarker_info)
except:
do_nothing = 1
# 处理点击出现的评论弹窗
try:
more_remarks_buttons = answer.find_elements(By.XPATH,'.//Button[@class="Button Button--secondary Button--grey css-1p04wnp"]')
for more_remarks_button in more_remarks_buttons:
self.driver.execute_script("arguments[0].click();", more_remarks_button)
try:
# WebDriverWait(answer, 100).until(EC.presence_of_element_located((By.XPATH, './/div[@class="css-1frn93x"]/div')))# 这个标签不行
WebDriverWait(self.driver, 100).until(EC.presence_of_element_located((By.XPATH, '//div[@class="css-weau4n"]/div/div/div[2]')))
except:
print("Search Failed! Check u network statue")
remarks_list = self.driver.find_elements(By.XPATH,'//div[@class="css-weau4n"]/div/div/div[2]')
for remark in remarks_list:
remarker_id = remark.find_element(By.XPATH,'.//a[@class="css-1rd0h6f"]').get_attribute("href").split('/')[-1]
remarker_username = remark.find_element(By.XPATH,'.//a[@class="css-1rd0h6f"]').text
remarker_time = remark.find_element(By.XPATH,'.//span[@class="css-12cl38p"]').text
remarker_content = remark.find_element(By.XPATH,'.//div[@class="CommentContent css-1ygdre8"]').text
if remark.find_element(By.XPATH,'.//Button[@class="Button Button--plain Button--grey Button--withIcon Button--withLabel css-h1yvwn"]').text == '赞':
remarker_likes = '0'
else:
remarker_likes = remark.find_element(By.XPATH,'.//Button[@class="Button Button--plain Button--grey Button--withIcon Button--withLabel css-h1yvwn"]').text
remarker_info = {"isEmpty":0,"评论人ID":remarker_id,"评论者昵称":remarker_username,"评论时间":remarker_time,"评论内容":remarker_content,"点赞次数": remarker_likes}
# 在弹窗中,有的内容都一样,但是评论日期不一样,算作不一样
if remarker_info not in remarks_information:
remarks_information.append(remarker_info)
self.driver.execute_script("arguments[0].click();", self.driver.find_element(By.XPATH,'.//Button[@aria-label="关闭"]'))
except:
do_noting = 1
fold_remarks = answer.find_element(By.XPATH,'.//Button[@class="Button ContentItem-action FEfUrdfMIKpQDJDqkjte Button--plain Button--withIcon Button--withLabel fEPKGkUK5jyc4fUuT0QP B46v1Ak6Gj5sL2JTS4PY RuuQ6TOh2cRzJr6WlyQp"]').text
if fold_remarks == '收起评论':
self.driver.execute_script("arguments[0].click();", buttons[0])
else:
answers_information[i]['评论次数'] = '0'
print("no remarks")
# 处理回答者的回答信息
try:
button = answer.find_element(By.XPATH,'.//Button[@class="Button ContentItem-more FEfUrdfMIKpQDJDqkjte Button--plain fEPKGkUK5jyc4fUuT0QP"]')
self.driver.execute_script("arguments[0].click();", button)
except:
do_nothing = 1
# try:# 这里不能加and
# 这个隐式等待语句不好使
WebDriverWait(answer, 100).until(EC.presence_of_element_located((By.XPATH, './/span[@class="RichText ztext CopyrightRichText-richText css-1g0fqss"]')))
title = answer.find_element(By.XPATH,'.//a[@data-za-detail-view-element_name="Title"]').text
# 这里好像是必须显式等待,等待时间不确定,如果报错,就设置久一点
sleep(0.1)
answer_content = title+ answer.find_element(By.XPATH,'.//span[@class="RichText ztext CopyrightRichText-richText css-1g0fqss"]').text
answer_time = answer.find_element(By.XPATH,'.//div[@class="ContentItem-time"]/a/span').text.split(' ')[1] +"-"+ answer.find_element(By.XPATH,'.//div[@class="ContentItem-time"]/a/span').text.split(' ')[-1]
answer_likes = answer.find_element(By.XPATH,'.//Button[@class="Button VoteButton VoteButton--up FEfUrdfMIKpQDJDqkjte"]').get_attribute("aria-label").split(' ')[-1]
# answer.find_element(By.XPATH,'.//span[@class="RichContent-collapsedText"]').click()
answers_information[i]['回答内容'] = answer_content
answers_information[i]['回答时间'] = answer_time
answers_information[i]["点赞次数"] = answer_likes
answers_information[i]["评论信息"] = remarks_information[0:10]
其他函数的使用同上,不再赘述
五、监控信息变化
在启动(前端)程序的同时,在后端的flask 的python脚本中使用subprocess方法并行启动另一个检查用户信息更新的脚本,每个五分钟,使用GET请求,向启动前端的flask python脚本发送需要更新数据的请求,此时程序开始更新数据。
更新数据的python脚本:
import requests
import time
while True:
url = 'http://127.0.0.1:5002?update=1'
response = requests.get(url)
if response.status_code == 200:
print('请求成功')
time.sleep(5 * 60) # 5分钟
六、可视化:以Web形式较美观的展示采集到的数据
如何入手使用flask设计前端可视化界面----参考文章
# -*- coding: UTF-8 -*-
from flask import Flask, render_template, request, session, redirect, url_for
from flask_bootstrap import Bootstrap
from tool_parallel_zhihu_specific_ueser_information_collection import get_specific_user_info,Login_ZhiHu
import time
import json, os
import threading,requests
import subprocess
class Display():
def __init__(self):
self.app=Flask(__name__,template_folder="templates")
self.app.add_url_rule("/", "/index/", methods=["GET","POST"],view_func = self.index)
self.app.add_url_rule("/index/", methods=["GET","POST"],view_func = self.index)
self.app.add_url_rule("/login/", methods=["GET","POST"],view_func = self.login)
self.app.add_url_rule("/successfully_login/", methods=["GET","POST"],view_func = self.successfully_login)
# self.app.add_url_rule("/logout/", methods=["GET","POST"],view_func = self.logout)
self.app.add_url_rule("/basic_info/", methods=["GET","POST"],view_func = self.basic_info)
self.app.add_url_rule("/followings/", methods=["GET","POST"],view_func = self.followings)
self.app.add_url_rule("/followers/", methods=["GET","POST"],view_func = self.followers)
self.app.add_url_rule("/answers/", methods=["GET","POST"],view_func = self.answers)
self.app.add_url_rule("/asks/", methods=["GET","POST"],view_func = self.asks)
self.app.config['SECRET_KEY'] = 'secret_key'
self.WeiBo_username = '3247842625@qq.com'
self.WeiBo_password = 'irontys'
self.specific_username = None
self.initial_login = 0
self.img_src = None
self.cond = None
self.finish_initial_login = None
self.specific_user_info_tag = {'basic_info','followers_info','followings_info','answers_info','asks_info'}
self.specific_user_info_list = None
self.basic_info_tag_list = ['用户名','性别','一句话介绍','居住地','所在行业','职业经历','个人简介']
self.basic_info = None
self.followings_info_tag_list = ['头像','用户昵称','链接地址','回答数','文章数','关注者数']
self.followings_info = None
self.followers_info_tag_list = ['头像','用户昵称','链接地址','回答数','文章数','关注者数']
self.followers_info = None
self.current_ask_index = 0
self.asks_info = None
self.current_answer_index = 0
self.answers_info = None
self.login_mode = 1
self.login_cookie = None
self.is_search_done = 0
self.accessed_answers_page = 0
self.accessed_asks_page = 0
def successfully_login(self):
# self.specific_username = None当访问很多页面时会报错
if self.specific_username == None and not self.initial_login:
return redirect(url_for('index'))
login_mode_tag = ['微博','微博','QQ','QQ','微信']
return render_template('successfully_login.html'
,login_via = login_mode_tag[self.login_mode]
,is_search_done = self.is_search_done
)
def login(self):
if request.method == 'POST':
self.login_mode = int(request.form.get('login_mode'))
if self.login_mode == 0:
self.login_username = request.form["login_username"]
self.login_password = request.form["login_password"]
self.sign_cookie(login_mode = self.login_mode)
# self.initial_login用于判断是否登录
# 而如果没有经过login,self.initial_login = 0,当访问其他页面时重定向到index页面,即可避免该问题
self.initial_login = 1
return render_template('successfully_login.html')
# 用于后续判断是否进行过搜索,否则self.specific_username = None当访问很多页面时会报错
return render_template('login.html'
,is_search_done = self.is_search_done
)
def get_specific_user_info(self):
self.specific_user_info_list = {}
self.specific_user_info_list_filename = 'specific_user_info_list.txt'
if os.path.isfile(self.specific_user_info_list_filename):
# 文件存在,执行读取操作等
with open(self.specific_user_info_list_filename, 'r') as f:
self.specific_user_info_list = json.load(f)
if self.specific_username in self.specific_user_info_list:
current_user_info = self.specific_user_info_list[self.specific_username]
if self.specific_username not in self.specific_user_info_list:
current_user_info = get_specific_user_info(1,self.WeiBo_username,self.WeiBo_password,self.specific_username,1,0)
if os.path.isfile(self.specific_user_info_list_filename):
with open(self.specific_user_info_list_filename, 'r') as f:
self.specific_user_info_list = json.load(f)
self.specific_user_info_list[self.specific_username] = current_user_info
with open(self.specific_user_info_list_filename, 'w') as f:
json.dump(self.specific_user_info_list, f)
# print(self.specific_user_info_list[self.specific_username])
# {'basic_info','followers_info','followings_info','answers_info','asks_info'}
self.basic_info = current_user_info['basic_info']
self.followings_info = current_user_info['followings_info']
self.followers_info = current_user_info['followers_info']
self.answers_info = current_user_info['answers_info']
self.asks_info = current_user_info['asks_info']
self.is_search_done = 1
def index(self):
# 处理点击搜索框的搜索用户信息的操作
if request.method == 'POST':
self.specific_username = request.form.get('specific_username')
if self.specific_username != None:
self.get_specific_user_info()
# 用于处理更新数据的请求
if self.specific_username != None:
if request.method == 'GET':
if 'update' in request.args:
current_user_info = get_specific_user_info(1,self.WeiBo_username,self.WeiBo_password,self.specific_username,1,0)
self.specific_user_info_list[self.specific_username] = current_user_info
with open(self.specific_user_info_list_filename, 'w') as f:
json.dump(self.specific_user_info_list, f)
self.basic_info = current_user_info['basic_info']
self.followings_info = current_user_info['followings_info']
self.followers_info = current_user_info['followers_info']
self.answers_info = current_user_info['answers_info']
self.asks_info = current_user_info['asks_info']
# 默认显示的页面
return render_template('index.html'
,is_search_done = self.is_search_done
)
# 登陆操作中的保存cookie
def sign_cookie(self,login_mode = 1):
login_url = 'https://www.zhihu.com'
Login = Login_ZhiHu(login_url,'9301',0)
Login.third_party_login(mode = login_mode,username = self.WeiBo_username, password = self.WeiBo_password)
self.login_cookie = Login.sign_cookie()
Login.close_current_drive()
def basic_info(self):
if self.specific_username == None:
return redirect(url_for('index'))
# self.basic_info_tag_list = ['用户名','性别','一句话介绍','居住地','所在行业','职业经历','个人简介']
return render_template('basic_info.html'
,basic_info = self.basic_info
,is_search_done = self.is_search_done
)
def followings(self):
if self.specific_username == None:
return redirect(url_for('index'))
# followings_info_tag_list = ['头像','用户昵称','链接地址','回答数','文章数','关注者数']
return render_template('followings.html'
,followings_info = self.followings_info
,basic_info = self.basic_info
,is_search_done = self.is_search_done
)
def followers(self):
if self.specific_username == None:
return redirect(url_for('index'))
# followings_info_tag_list = ['头像','用户昵称','链接地址','回答数','文章数','关注者数']
return render_template('followers.html'
,followers_info = self.followers_info
,basic_info = self.basic_info
,is_search_done = self.is_search_done
)
def answers(self):
if self.specific_username == None:
return redirect(url_for('index'))
if request.method == 'POST':
self.current_answer_index = int(request.form.get('current_answer_index'))
return render_template('answers.html'
,answers_info = self.answers_info
,basic_info = self.basic_info
,current_answer_index = self.current_answer_index
,is_search_done = self.is_search_done
,answers_count = len(self.answers_info)
)
def asks(self):
if self.specific_username == None:
return redirect(url_for('index'))
if request.method == 'POST':
self.current_ask_index = int(request.form.get('current_ask_index'))
return render_template('asks.html'
,asks_info = self.asks_info
,basic_info = self.basic_info
,current_ask_index = self.current_ask_index
,is_search_done = self.is_search_done
,asks_count = len(self.asks_info)
)
def run(self):
self.app.run(host='127.0.0.1',port=5002,debug=True,threaded=True)
if __name__ == '__main__':
app = Display()
subprocess.Popen("python check_update.py", shell=True)
app.run()
附录
(一)使用xpath找打网页中WebElement的方法和技巧
参考文章
七、遇到的问题
在本次实验中,使用的自动化工具搜集信息时用的是chrome driver
,但是这个东西在登录知乎甚至在访问知乎的服务器时能够被检测出来,于是本实验中采取了使用远程调试的方式:先启动一个chrome浏览器,然后使用chrome driver远程连接这个在指定端口上启动的浏览器的方法,绕过检测。对于使用这种方法,稍微有些遗憾吧,因为后来听说其他同学用了edge浏览器,知乎没有对edge driver
进行检测,而使用远程调试的方式搜集信息过程中有很多困难:一是浏览器不能够设置为headless模式,是能够通过设置其窗口位置的方式来达到“伪”不可见的目的;二是浏览器没有默认不保存cookie,如果浏览器存储了cookie,会导致非预期的一些错误;三是如果程序异常出错了,浏览器不会主动关闭,需要手动关闭。
实验过程中,登陆系统使用了5种登陆方式,微博登陆的方式失效了一种,而去年的时候,chrome driver 还没有被限制的像现在这样,甚至可以直接的登录,在知乎不断不断反爬的过程中,本文的程序很有可能会很快的失效。文章来源:https://www.toymoban.com/news/detail-409855.html
八、总结
这次的实验是一个从无到有的学习过程,是一次在实践中学习的经历,不仅学习了对于自动化工具搜集信息的方法,对于web前端的设计也有了更深的理解,本次还深入地学习了前端界面的设计方法。
在本次实验中,使用的自动化工具搜集信息时用的是chrome driver
,但是这个东西在登录知乎甚至在访问知乎的服务器时能够被检测出来,于是本实验中采取了使用远程调试的方式:先启动一个chrome浏览器,然后使用chrome driver远程连接这个在指定端口上启动的浏览器的方法,绕过检测。对于使用这种方法,稍微有些遗憾吧,因为后来听说其他同学用了edge浏览器,知乎没有对edge driver进行检测,而使用远程调试的方式搜集信息过程中有很多困难:一是浏览器不能够设置为headless模式,是能够通过设置其窗口位置的方式来达到“伪”不可见的目的;二是浏览器没有默认不保存cookie,如果浏览器存储了cookie,会导致非预期的一些错误…文章来源地址https://www.toymoban.com/news/detail-409855.html
到了这里,关于(待完善)2023-selenium 实现知乎自动登录(第三方登录/使用cookie自动登录)+指定用户的信息相关搜集(2023.3)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!