背景
做rumor detection 用到了twitter15和twitter16数据集,里边只给了推文id和评论者的uid,想要爬取其他数据就只能自己动手。
我需要爬取推文评论用户在评论时间点前两个月的历史推文,然而这两个数据集都太老了,里边的数据都是13-14年的,所以用twitter API无法获取到(因为官方API只能爬用户最近3000条历史推文),因此只能用推特的搜索API来爬数据。
这篇文章给出了用推特搜索api的爬取过程,但是万恶的马斯克限制了搜索API的使用权限,现在只能登陆后才能调用搜索API了。
之前有过一段时间selenium使用经验,所以打算用selenium模拟登陆后来爬取数据。因为数据集中推文很多,大概两千条,然后底下的评论平均有五个用户吧,所以一共要爬取差不多一万个用户的历史推文,规模还是蛮大的。因此在这个过程中还用到了多进程来加快爬取的速度。
获取cookies
用了selenium来模拟登陆过程,自动输入twitter账号和密码。
import time
from selenium import webdriver
twi_username = "username"
twi_keyword = "password"
browser = webdriver.Chrome()
browser.get(r'https://twitter.com/i/flow/login')
#这里睡几秒,等待页面加载。
time.sleep(5)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div/div/div/div[5]/label/div/div[2]/div/input").send_keys(twi_username)
time.sleep(3)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div/div/div/div[6]/div").click()
time.sleep(2)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div[1]/div/div/div[3]/div/label/div/div[2]/div[1]/input").send_keys(twi_keyword)
browser.find_element("xpath","//*[@id=\"layers\"]/div/div/div/div/div/div/div[2]/div[2]/div/div/div[2]/div[2]/div[2]/div/div[1]/div/div/div/div/span/span").click()
time.sleep(2)
#使用这个函数可以导出browser的cookies
savedCookies = browser.get_cookies()
print(savedCookies)
savedCookies 是一个词典,将cookies打印出来后复制一下,之后会用到。
应用cookies
推特现在对搜索作出了一些限制,首先就是得登陆后才能用搜索,其次就是当搜索次数过多后,会禁用一段时间搜索功能。
为了加快爬取进度,可以用多个推特账号来登陆。
这里参考了这篇文章
这个函数返回了一个浏览器
def init_browser(i):
#因为用了多线程,i表示线程编号
savedCookies1=[{}]
savedCookies2=[{}]
#两个推特账号,对应俩cookies
savedCookies_list=[savedCookies1,savedCookies3]
#通过线程编号来分配cookies
savedCookies=savedCookies_list[i%2]
#d和co俩变量是为了之后获取浏览器日志
co = webdriver.chrome.options.Options()
co.add_experimental_option('w3c',False)
d = webdriver.common.desired_capabilities.DesiredCapabilities.CHROME
d['loggingPrefs']={'performance':'ALL'}
d["goog:loggingPrefs"] = {"performance": "ALL"}
browser = webdriver.Chrome(desired_capabilities=d,options=co)
browser.get(r'https://twitter.com/i/flow/login')
time.sleep(1)
browser.delete_all_cookies()
for cookie in savedCookies:
for k in {'name', 'value', 'domain', 'path', 'expiry'}:
# cookie.keys()属于'dict_keys'类,通过list将它转化为列表
if k not in list(cookie.keys()):
# saveCookies中的第一个元素,由于记录的是登录前的状态,所以它没有'expiry'的键名,我们给它增加
if k == 'expiry':
t = time.time()
cookie[k] = int(t) # 时间戳s
browser.add_cookie({k: cookie[k] for k in {'name', 'value', 'domain', 'path', 'expiry'}})
return browser
使用搜索API爬取推文
推特高级搜索里边有很多字段,选定字段后可以搜到相关推文。
这里参考了这篇文章的思路,通过adative.json可以得到推文信息,但是那位大佬用request库来搞的,我不是很了解orz。用selenium做的话,可以通过获取浏览器日志来实现,参考了这篇文章
在长时间爬数据时,可能会遇到一些问题:
- 网不好,所以可能还在加载,所以用了is_search_loading(driver)函数来检测网页是否还在加载
- 被推特限制搜索了,用到is_search_loading(driver)函数
这俩函数原理很简单,就是判断有没有出现相关的元素,如果出现了,就返回True。
如果
def is_search_limitted(driver):
#调用该函数时页面可能没有加载完毕,睡个2秒
time.sleep(2)
try:
driver.find_element("xpath",
"/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/div[3]/div/div/div[2]/div/span/span")
return True
except:
return False
def is_search_loading(driver):
try:
time.sleep(2)
driver.find_element("xpath","/html/body/div[1]/div/div/div[2]/main/div/div/div/div/div/div[1]/div[1]/div[1]/div/div/div/div/div[2]/div[2]/div/div/div/svg")
return True
except:
return False
return
def scrollUntilLoaded(driver):
last_height = driver.execute_script("return document.body.scrollHeight")
while is_search_loading(driver):
time.sleep(2)
while True:
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
new_height = driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
def scrapUserTweets(browser,username,since_time,until_time):
# username = ""
# since_time = "2019-08-22"
# until_time = "2019-09-22"
browser.get(
fr'https://twitter.com/search?q=from%3A{username}%20until%3A{until_time}%20since%3A{since_time}%20-filter%3Alinks%20-filter%3Areplies&src=typed_query')
time.sleep(2)
sleep_time = 60
#如果搜索限制了,就睡一分钟
while is_search_limitted(browser):
print("*****search limitted, sleep {} seconds".format(sleep_time))
time.sleep(sleep_time)
browser.get(
fr'https://twitter.com/search?q=from%3A{username}%20until%3A{until_time}%20since%3A{since_time}%20-filter%3Alinks%20-filter%3Areplies&src=typed_query')
scrollUntilLoaded(browser)
#获取浏览器日志
logs = browser.get_log("performance")
tweets = []
for log in logs:
logjson = json.loads(log["message"])["message"]
if logjson['method'] == 'Network.responseReceived':
params = logjson['params']
try:
# 找到那啥adaptive.json玩意
requestUrl = params['response']['url']
if "adaptive.json" in requestUrl:
requestId = params['requestId']
response_body = browser.execute_cdp_cmd('Network.getResponseBody', {'requestId': requestId})
_content = json.loads(response_body["body"])
tweets_set = _content['globalObjects']['tweets']
for w in tweets_set.keys():
tweets.append(tweets_set[w])
else:
continue
except:
requestUrl = "None"
continue
else:
continue
if len(tweets) != 0:
tweets_json=json.loads(json.dumps(tweets, ensure_ascii=False))
user_tweets=[]
for i in tweets_json:
tweet = {}
tweet["id"] = i["id"]
tweet["full_text"] = i["full_text"]
tweet["created_at"] = i["created_at"]
user_tweets.append(tweet)
return user_tweets
else:
return None
多进程
这里用了队列来载入需要爬取的推特推文id,通过这个队列来给不同的进程分配任务,每一个进程都从队列获取推文id,然后根据这个推文id来爬取相应的信息,在这个推文id的信息爬取完后,会把这个推文id写入finished.txt中,这样在整个爬取过程中如果程序中断,从中断点能恢复任务,继续爬虫。文章来源:https://www.toymoban.com/news/detail-820623.html
import multiprocessing
def multi_scrap_tree_func( queue, dataset_path,i):
browser = scrap2.init_browser(i)
while not queue.empty():
#从队列中获取推特id
file = queue.get()
file_path = os.path.join(dataset_path,file)
get_tree_user(browser,file_path)
logging.debug(file+" is scrapped")
with open("./data/twitter15/finished.txt",mode="a", encoding="utf-8") as f:
f.write(file+"\n")
print("process{}:{} / {} ,{} is finished".format(i,len(twi16_dir_ls)-queue.qsize(),len(twi16_dir_ls),file))
if __name__ == '__main__':
lock = multiprocessing.Lock()
queue = multiprocessing.Queue()
continue_file = ""
#读取已完成爬取的推文id
with open("./data/twitter15/finished.txt",mode="r",encoding="utf-8") as f:
finished_file_list = f.readlines()
for i in range(len(twi16_dir_ls)):
is_finished = False
#如果一个推文id已经完成爬取,就不加入队列中
for j in finished_file_list:
if j[:-1]==twi16_dir_ls[i]:
is_finished=True
if not is_finished:
queue.put(twi16_dir_ls[i])
print(queue.qsize())
process_list = []
for i in range(12):#创建12个进程
process = multiprocessing.Process(target=multi_scrap_tree_func,args=(queue,twi16_dir,i))
process.start()
process_list.append(process)
for p in process_list:
p.join()
结语
在有现成API的情况下,用selenium来爬数据确实不够优雅,开12个chrome的性能需求太大了。但是因为我爬的数据量大,以及推特的登陆和请求次数限制,用request和httpx来爬数据在时间上也不会特别快。不过以后还是可以学学用requests和httpx爬数据文章来源地址https://www.toymoban.com/news/detail-820623.html
到了这里,关于【爬虫】用selenium登陆推特并爬取用户历史推文的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!