Appearance
Python 爬取朋友圈
注意
要使用 3.9.2.23
或以下版本微信,否则爬取时拿不到发布时间会报错
使用到的模块
python
import psutil # 用于获取微信进程
import pywinauto # 用于控制微信
from pywinauto.application import Application
import time
import json # 用于读取json文件
import io # 用于读取json文件
使用到的工具方法
python
import json
import io
def write_json(obj: dict or list, time_str: str = ''):
time_str = re.sub(r'[-: ]', '_', time_str)
JSON_FILE_NAME = f"pyq_records{f'_{time_str}' if time_str else ''}.json"
# json_data = json.loads(json.dumps(obj, indent=2, ensure_ascii=False))
try:
before_json_str = io.open(
JSON_FILE_NAME, encoding='utf-8', mode='r').read()
except:
before_json_str = ''
pass
before_json: list = json.loads(
before_json_str if len(before_json_str) > 0 else "[]")
if isinstance(obj, list):
before_json.extend(obj)
elif isinstance(obj, dict):
before_json.append(obj)
with io.open(JSON_FILE_NAME, encoding='utf-8', mode='w') as f:
f.write(json.dumps(before_json, indent=2, ensure_ascii=False))
f.flush()
f.close()
获取微信进程
首先要获取到微信的进程,拿到进程之后,就可以通过 pywinauto 来控制微信了。
python
PID = 0
for proc in psutil.process_iter():
try:
pinfo = proc.as_dict(attrs=['pid', 'name'])
except psutil.NoSuchProcess:
pass
else:
if 'WeChat.exe' == pinfo['name']:
PID = pinfo['pid']
通过 pywinauto 控制微信
python
app = Application(backend='uia').connect(process=PID)
win = app["微信"]
# 点击朋友圈
pyq_btn = win.child_window(title=u'朋友圈', control_type="Button")
cords = pyq_btn.rectangle()
pywinauto.mouse.click(button='left', coords=(cords.left + 10, cords.top + 10))
获取朋友圈数据
会自动打开朋友圈获取数据并滚动,直到获取到指定的时间为止。爬取的数据会保存在 pyq_records.json 文件中。
python
all_pyq = []
all_pyq_contents = set()
crawl_cut_off_time = '' # 朋友圈左下角的时间限制,如:昨天,一天前等,爬取到这样的文本会停止
crawl_cut_off_text = '' # 要爬取的朋友昵称
format_time_str = "%Y-%m-%d %H:%M:%S"
current_crawl_time_str = time.strftime(format_time_str, time.localtime())
crawling = True
while crawling:
# 如果按Esc关闭朋友圈页面,这里就会崩掉然后结束
try:
pyq_win = app['朋友圈']
except:
break
try:
pyqs = pyq_win.wrapper_object().descendants(depth=4)
for x in pyqs:
try:
pyq_info = {}
classname = x.friendly_class_name()
if (classname == "ListItem"):
# 这是一条朋友圈
pyq_contents = str(x.window_text())
if (pyq_contents in all_pyq_contents):
# 已经爬过这一条了
continue
contents = pyq_contents.splitlines()
if len(contents) == 0:
crawling = False
break
nickname = contents[0]
msg_send_time_index = -1
try:
msg_send_time: str = contents[msg_send_time_index]
convert_msg_send_time = convert_relative_time_to_datetime(
msg_send_time)
except:
print(f'获取时间错误 -> {msg_send_time}')
msg_send_time_index = msg_send_time_index - 1
msg_send_time: str = contents[msg_send_time_index]
convert_msg_send_time = convert_relative_time_to_datetime(
msg_send_time)
print(msg_send_time)
if crawl_cut_off_time and msg_send_time.find(crawl_cut_off_time) != -1:
print('到底了', pyq_contents)
crawling = False
break
# 限定采集的数据
if nickname.find(crawl_cut_off_text) != -1:
all_pyq_contents.add(pyq_contents)
pyq_info["content"] = '\n'.join(
list(map(lambda x: str(x), contents[1:msg_send_time_index])))
pyq_info["nickname"] = nickname
pyq_info["msg_send_time_str"] = msg_send_time
pyq_info["msg_send_timestamp"] = round(
convert_msg_send_time.timestamp() * 1000)
pyq_info["msg_send_time"] = convert_msg_send_time.strftime(
format_time_str)
pyq_info["crawled_timestamp"] = int(
round(time.time() * 1000))
pyq_info["crawled_time"] = time.strftime(
format_time_str, time.localtime())
try:
edits = DFS(x, 6)
for e in edits:
if (e.friendly_class_name() == "Edit"):
likes = e.window_text()
pyq_info["likes"] = likes
if (e.friendly_class_name() == "ListBox"):
pinglun = []
comments = e.children()
for com in comments:
if (com.friendly_class_name() == "ListItem"):
pinglun.append(com.window_text())
# 所有信息采集完毕
pyq_info["comments"] = pinglun
except:
pass
write_json(pyq_info, current_crawl_time_str)
all_pyq.append(pyq_info)
except Exception as e:
print("passed exception: ", e)
pass
except:
pass
# 向下滚动
cords = pyq_win.rectangle()
pywinauto.mouse.scroll(
wheel_dist=-5, coords=(cords.left+10, cords.bottom-10))
if (len(all_pyq) > 50000):
break