代码介绍稍后补充
import concurrent.futures
import glob
import json
import math
import os
import random
import re
from time import sleep
import pandas as pd
import requests
from tqdm import tqdm
def get_proxy():
return None
def fetch_page_range_comments(game_id, start_page, end_page, url, headers, cookies, base_params, temp_dir):
# 爬取指定范围内的页面评论数据
local_params = base_params.copy()
range_comments = []
# 为每个范围创建进度条
with tqdm(total=end_page - start_page + 1, desc=f"线程负责页面 {start_page}-{end_page}") as range_progress:
for page in range(start_page, end_page + 1):
local_params["page"] = str(page)
# 添加重试逻辑
retry_count = 0
max_page_retries = 3
page_comments = []
while retry_count < max_page_retries:
try:
# 获取代理
proxy = get_proxy()
# 如果有代理则使用,否则直接连接
if proxy:
proxies = {"http": proxy, "https": proxy}
else:
proxies = None
# 使用代理或直接发送请求
page_response = requests.get(
url,
headers=headers,
cookies=cookies,
params=local_params,
proxies=proxies,
timeout=10
)
page_data = json.loads(page_response.text)
# 检查响应是否包含预期的数据
if 'appComments' not in page_data:
print(f"页面 {page} 尝试 {retry_count + 1}/{max_page_retries}: 响应中没有找到appComments字段")
retry_count += 1
wait_time = 2 + random.randint(1, 3) # 减少等待时间
sleep(wait_time)
continue
# 成功获取数据,处理评论
page_comments = []
for comment in page_data['appComments']:
comment_info = {
'name': comment['comment']['name'],
'rating': comment['rating'],
'date': comment['date'],
'title': comment['comment']['title'],
'body': comment['comment']['body'],
'delStatus': comment['comment']['delStatus'],
'page': page # 记录评论来自哪一页,方便调试
}
page_comments.append(comment_info)
# 将当前页面的评论添加到范围评论列表
range_comments.extend(page_comments)
# 保存当前页面的评论到临时文件
temp_filename = os.path.join(temp_dir, f"temp_page_{page}.xlsx")
if page_comments:
temp_df = pd.DataFrame(page_comments)
temp_df.to_excel(temp_filename, index=False)
# 更新进度条
range_progress.update(1)
# 随机短暂延迟,避免请求过于频繁
sleep_time = 0.2 + random.random() * 0.5
sleep(sleep_time)
# 成功获取数据,跳出重试循环
break
except Exception as e:
print(f"页面 {page} 尝试 {retry_count + 1}/{max_page_retries}: 获取数据时出错: {e}")
retry_count += 1
if retry_count < max_page_retries:
wait_time = 2 + random.randint(1, 3) # 减少等待时间
sleep(wait_time)
else:
print(f"页面 {page} 达到最大重试次数,跳过此页")
# 更新进度条
range_progress.update(1)
# 返回此范围内爬取的所有评论
return range_comments
def merge_excel_files(directory_path):
"""合并指定目录下所有的temp_page_xx.xlsx文件"""
# 获取所有匹配的文件路径
file_pattern = os.path.join(directory_path, "temp_page_*.xlsx")
file_paths = glob.glob(file_pattern)
if not file_paths:
print(f"在目录 {directory_path} 中没有找到匹配 temp_page_*.xlsx 的文件")
return pd.DataFrame()
# 按照页码排序文件
def extract_page_number(file_path):
match = re.search(r'temp_page_(\d+)\.xlsx', file_path)
if match:
return int(match.group(1))
return 0
file_paths.sort(key=extract_page_number)
print(f"找到 {len(file_paths)} 个临时文件,开始合并...")
# 合并所有Excel文件
all_data = []
for file_path in file_paths:
try:
df = pd.read_excel(file_path)
all_data.append(df)
except Exception as e:
print(f"读取文件 {file_path} 时出错: {str(e)}")
if not all_data:
print("没有有效的数据可合并")
return pd.DataFrame()
# 合并所有数据
merged_df = pd.concat(all_data, ignore_index=True)
# 删除重复数据
original_len = len(merged_df)
merged_df = merged_df.drop_duplicates().reset_index(drop=True)
if original_len > len(merged_df):
print(f"删除了 {original_len - len(merged_df)} 行重复数据")
return merged_df
def get_game_info(game_id):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36 Edg/135.0.0.0",
"sec-ch-ua": "\"Microsoft Edge\";v=\"135\", \"Not-A.Brand\";v=\"8\", \"Chromium\";v=\"135\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\""
}
cookies = {
"PHPSESSID": "jrfep0j1up12k8c806q200omqe",
}
url = "https://api.qimai.cn/app/comment"
params = {
"appid": f"{game_id}",
"country": "cn",
"sdate": "2024-01-01 00:00:00",
"edate": "2025-01-01 23:59:59",
"page": "1"
}
# 初始化请求,获取总页数
max_retries = 3
max_page = 0
for attempt in range(max_retries):
try:
# 获取代理
proxy = get_proxy()
if proxy:
proxies = {"http": proxy, "https": proxy}
print(f"使用代理: {proxy}")
else:
proxies = None
print("使用直接连接")
# 使用代理发送请求
response = requests.get(
url,
headers=headers,
cookies=cookies,
params=params,
proxies=proxies,
timeout=10
)
data = json.loads(response.text)
# 检查响应是否包含预期的数据
if 'maxPage' not in data:
print(f"尝试 {attempt + 1}/{max_retries}: 响应中没有找到maxPage字段,等待后重试...")
print(f"响应内容: {data}")
sleep(5 + random.randint(2, 5)) # 随机等待时间
continue
max_page = data['maxPage']
break # 成功获取数据,跳出重试循环
except Exception as e:
print(f"尝试 {attempt + 1}/{max_retries}: 获取初始数据时出错: {e}")
if attempt < max_retries - 1: # 如果不是最后一次尝试
wait_time = 5 + random.randint(2, 5)
print(f"等待 {wait_time} 秒后重试...")
sleep(wait_time)
else:
print("达到最大重试次数,跳过此游戏")
return 0
if max_page == 0:
print(f"游戏ID {game_id} 没有评论数据")
return 0
# 创建临时目录存储每个批次的数据
temp_dir = f"temp_comments_{game_id}"
os.makedirs(temp_dir, exist_ok=True)
# 设置线程数,根据系统性能调整
num_threads = min(8, max_page) # 最多8个线程,或者页数较少时等于页数
# 计算每个线程负责的页面范围
pages_per_thread = math.ceil(max_page / num_threads)
# 创建任务列表,每个任务负责一个页面范围
tasks = []
for i in range(num_threads):
start_page = i * pages_per_thread + 1
end_page = min((i + 1) * pages_per_thread, max_page)
tasks.append((start_page, end_page))
print(f"使用 {num_threads} 个线程爬取游戏ID {game_id} 的评论数据,共 {max_page} 页")
print("页面分配方案:")
for i, (start, end) in enumerate(tasks):
print(f"线程 {i + 1}: 负责页面 {start}-{end} (共 {end - start + 1} 页)")
# 使用线程池并发爬取
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# 提交任务
futures = [
executor.submit(
fetch_page_range_comments,
game_id,
start_page,
end_page,
url,
headers,
cookies,
params,
temp_dir
)
for start_page, end_page in tasks
]
# 等待所有任务完成
concurrent.futures.wait(futures)
# 合并所有临时文件
print(f"所有线程爬取完成,开始合并临时文件...")
merged_df = merge_excel_files(temp_dir)
# 按日期排序评论
if not merged_df.empty:
merged_df = merged_df.sort_values(by='date', ascending=False).reset_index(drop=True)
# 保存最终结果
excel_filename = f"评论数据_{game_id}.xlsx"
merged_df.to_excel(excel_filename, index=False)
print(f"已保存 {len(merged_df)} 条评论数据到文件 {excel_filename}")
else:
print(f"游戏ID {game_id} 没有找到任何评论")
# 返回评论数量
return len(merged_df)
def get_already_crawled_ids():
# 查找data文件夹中所有评论数据文件
pattern = os.path.join('data', '评论数据_*.xlsx')
files = glob.glob(pattern)
# 从文件名中提取游戏ID
crawled_ids = []
for file in files:
match = re.search(r'评论数据_(\d+)\.xlsx', file)
if match:
crawled_ids.append(int(match.group(1)))
print(f"已找到 {len(crawled_ids)} 个已爬取的游戏ID")
return crawled_ids
if __name__ == "__main__":
try:
index = pd.read_excel("game_id.xlsx")
id_list = index.iloc[:, 0].tolist()
count = 0
total_comments = 0
already_crawled = get_already_crawled_ids()
print(f"将跳过以下已爬取的游戏ID: {already_crawled}")
for game_id in id_list:
if game_id in already_crawled:
continue
count += 1
print(f"\n===== 开始爬取第 {count}/{len(id_list)} 个游戏(ID: {game_id})的评论数据 =====")
num_comments = get_game_info(game_id)
total_comments += num_comments
print(f"已完成 {count}/{len(id_list)} 个游戏的评论爬取,当前总评论数: {total_comments}")
# 每个游戏之间添加随机延迟
if count < len(id_list):
wait_time = 2 + random.randint(1, 3)
print(f"等待 {wait_time} 秒后继续下一个游戏...")
sleep(wait_time)
print(f"\n全部爬取完成!共爬取了 {count} 个游戏,总计 {total_comments} 条评论")
except KeyboardInterrupt:
print("\n程序被用户中断,正在退出...")
except Exception as e:
print(f"\n程序出现错误: {e}")

