Skip to content
On this page

Python 爬取朋友圈

注意

要使用 3.9.2.23 或以下版本微信,否则爬取时拿不到发布时间会报错

使用到的模块

python
import psutil # 用于获取微信进程
import pywinauto # 用于控制微信
from pywinauto.application import Application
import time
import json # 用于读取json文件
import io # 用于读取json文件

使用到的工具方法

python
import json
import io
def write_json(obj: dict or list, time_str: str = ''):
    time_str = re.sub(r'[-: ]', '_', time_str)
    JSON_FILE_NAME = f"pyq_records{f'_{time_str}' if time_str else ''}.json"
    # json_data = json.loads(json.dumps(obj, indent=2, ensure_ascii=False))
    try:
        before_json_str = io.open(
            JSON_FILE_NAME, encoding='utf-8', mode='r').read()
    except:
        before_json_str = ''
        pass
    before_json: list = json.loads(
        before_json_str if len(before_json_str) > 0 else "[]")
    if isinstance(obj, list):
        before_json.extend(obj)
    elif isinstance(obj, dict):
        before_json.append(obj)
    with io.open(JSON_FILE_NAME, encoding='utf-8', mode='w') as f:
        f.write(json.dumps(before_json, indent=2, ensure_ascii=False))
        f.flush()
        f.close()

获取微信进程

首先要获取到微信的进程,拿到进程之后,就可以通过 pywinauto 来控制微信了。

python
PID = 0
for proc in psutil.process_iter():
    try:
        pinfo = proc.as_dict(attrs=['pid', 'name'])
    except psutil.NoSuchProcess:
        pass
    else:
        if 'WeChat.exe' == pinfo['name']:
            PID = pinfo['pid']

通过 pywinauto 控制微信

python
app = Application(backend='uia').connect(process=PID)
win = app["微信"]
# 点击朋友圈
pyq_btn = win.child_window(title=u'朋友圈', control_type="Button")
cords = pyq_btn.rectangle()
pywinauto.mouse.click(button='left', coords=(cords.left + 10, cords.top + 10))

获取朋友圈数据

会自动打开朋友圈获取数据并滚动,直到获取到指定的时间为止。爬取的数据会保存在 pyq_records.json 文件中。

python
all_pyq = []
all_pyq_contents = set()

crawl_cut_off_time = '' # 朋友圈左下角的时间限制,如:昨天,一天前等,爬取到这样的文本会停止
crawl_cut_off_text = '' # 要爬取的朋友昵称
format_time_str = "%Y-%m-%d %H:%M:%S"
current_crawl_time_str = time.strftime(format_time_str, time.localtime())

crawling = True

while crawling:
    # 如果按Esc关闭朋友圈页面,这里就会崩掉然后结束
    try:
        pyq_win = app['朋友圈']
    except:
        break
    try:
        pyqs = pyq_win.wrapper_object().descendants(depth=4)
        for x in pyqs:
            try:
                pyq_info = {}

                classname = x.friendly_class_name()
                if (classname == "ListItem"):
                    # 这是一条朋友圈
                    pyq_contents = str(x.window_text())

                    if (pyq_contents in all_pyq_contents):
                        # 已经爬过这一条了
                        continue

                    contents = pyq_contents.splitlines()

                    if len(contents) == 0:
                        crawling = False
                        break

                    nickname = contents[0]
                    msg_send_time_index = -1
                    try:
                        msg_send_time: str = contents[msg_send_time_index]
                        convert_msg_send_time = convert_relative_time_to_datetime(
                            msg_send_time)
                    except:
                        print(f'获取时间错误 -> {msg_send_time}')
                        msg_send_time_index = msg_send_time_index - 1
                        msg_send_time: str = contents[msg_send_time_index]
                        convert_msg_send_time = convert_relative_time_to_datetime(
                            msg_send_time)
                    print(msg_send_time)
                    if crawl_cut_off_time and msg_send_time.find(crawl_cut_off_time) != -1:
                        print('到底了', pyq_contents)
                        crawling = False
                        break

                    # 限定采集的数据
                    if nickname.find(crawl_cut_off_text) != -1:
                        all_pyq_contents.add(pyq_contents)
                        pyq_info["content"] = '\n'.join(
                            list(map(lambda x: str(x), contents[1:msg_send_time_index])))
                        pyq_info["nickname"] = nickname
                        pyq_info["msg_send_time_str"] = msg_send_time
                        pyq_info["msg_send_timestamp"] = round(
                            convert_msg_send_time.timestamp() * 1000)
                        pyq_info["msg_send_time"] = convert_msg_send_time.strftime(
                            format_time_str)
                        pyq_info["crawled_timestamp"] = int(
                            round(time.time() * 1000))
                        pyq_info["crawled_time"] = time.strftime(
                            format_time_str, time.localtime())
                        try:
                            edits = DFS(x, 6)
                            for e in edits:
                                if (e.friendly_class_name() == "Edit"):
                                    likes = e.window_text()
                                    pyq_info["likes"] = likes
                                if (e.friendly_class_name() == "ListBox"):
                                    pinglun = []
                                    comments = e.children()
                                    for com in comments:
                                        if (com.friendly_class_name() == "ListItem"):
                                            pinglun.append(com.window_text())
                                    # 所有信息采集完毕
                                    pyq_info["comments"] = pinglun
                        except:
                            pass
                        write_json(pyq_info, current_crawl_time_str)
                        all_pyq.append(pyq_info)
            except Exception as e:
                print("passed exception: ", e)
                pass
    except:
        pass
    # 向下滚动
    cords = pyq_win.rectangle()
    pywinauto.mouse.scroll(
        wheel_dist=-5, coords=(cords.left+10, cords.bottom-10))
    if (len(all_pyq) > 50000):
        break