本案例旨在验证对多线程机制的理解,并结合 Redis 与 MongoDB 的基础应用。以下为具体实现代码:
[此处为图片1]
系统架构采用多线程方式进行数据爬取与处理,存储流程中利用 Redis 实现去重操作,随后将清洗后的数据写入 MongoDB。当前未引入协程机制,原因在于所使用的 Python 3.12 版本与 aioredis 存在兼容性问题。
import threading
import requests
from queue import Queue
import re
import hashlib
import redis
import pymongo
lst = []
用于数据抓取和预处理的核心类定义如下:
class GetData:
def __init__(self, url):
self.url = url
self.cookies = {
# 用户登录后获取的cookie信息
}
self.headers = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'cache-control': 'no-cache',
'origin': 'https://search.bilibili.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://search.bilibili.com/all?keyword=%E8%81%8C%E5%9C%BA+Excel+%E6%8A%80%E8%83%BD%E9%9B%B6%E5%9F%BA%E7%A1%80%E5%85%A5%E9%97%A8&from_source=webtop_search&spm_id_from=333.1007&search_source=3&page=3&o=60',
'sec-ch-ua': '"Chromium";v="142", "Microsoft Edge";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0'
}
该类初始化时接收目标 URL,并配置请求所需的 headers 与 cookies,以模拟真实浏览器行为,提升请求成功率。整体逻辑围绕多线程任务分发、网络请求发送、数据提取、去重判断及持久化存储展开。
在进行数据抓取时,首先需要设置请求所需的参数与cookies信息。以下是用于发送HTTP请求的相关配置内容:
[此处为图片1]
其中包含多个关键字段,如 buvid3、_uuid、DedeUserID 等用户标识信息,以及 SESSDATA、bili_jct 等用于维持会话状态的凭证。此外,还设置了 CURRENT_QUALITY(当前画质)、CURRENT_FNVAL(功能标志值)等偏好参数,并通过 headers 和 cookies 模拟真实浏览器行为以确保请求正常响应。
程序中定义了两个队列:self.p_queue 与 self.c_queue,分别用于存放待处理的页码任务和已获取的响应数据。生产者函数 producer 负责从页码队列中取出参数并发起网络请求。
当 p_queue 不为空时,程序将提取当前页码构建请求参数 params,包括搜索关键词 keyword、分页信息 page 和 page_size、搜索类型 search_type 设定为 video 视频类,以及其他必要字段如 platform=pc、highlight=1 等。
随后使用 requests 库向指定 URL 发起 GET 请求,携带上述参数及登录态 cookies 和请求头 headers。服务器返回的 JSON 数据将被放入消费者队列 c_queue 中,供后续处理使用。
若 p_queue 队列为空,则输出“爬取结束”并终止循环。
另一个方法 consumer 则作为消费者,负责从 c_queue 中读取已抓取的数据结果,并进行解析或存储等后续操作,具体实现可根据实际需求扩展。
[此处为图片2]
# 初始化空列表用于存储数据
lst = []
# 持续从队列中获取数据并解析
while True:
if not self.c_queue.empty():
# 从队列中取出数据
data = self.c_queue.get()['data']['result']
# 遍历每一条数据进行处理
for i in data:
# 提取标题内容
titles = i['title']
# 使用正则表达式分离关键词高亮部分与普通文本
title_elems = re.findall(
r'<em class="keyword">(.*?)</em>|([^<]+)', titles, re.DOTALL)
title_elem_lst = []
# 整合提取出的标题片段
for elems in title_elems:
if elems[0]:
title_elem_lst.append(elems[0])
elif elems[1]:
title_elem_lst.append(elems[1])
elif elems[2]:
title_elem_lst.append(elems[2])
title = ''.join(title_elem_lst)
# 提取其他字段信息
url = i['arcurl']
description = i['description']
author = i['author']
cover_image = 'https:' + i['pic']
# 将整理好的数据添加到列表中
lst.append({
'title': title,
'url': url,
'description': description,
'author': author,
'cover_image': cover_image
})
else:
print('数据解析结束...')
break
# 主程序执行函数
def main(self):
# 获取用户输入的搜索关键词
input_key = input('请输入想要爬取信息的标题:')
print('爬虫开始...')
# 向生产者队列中放入页码任务(共9页)
for page in range(1, 10):
self.p_queue.put(page)
# 创建多个生产者线程
producer_threading = [threading.Thread(target=self.producer, args=(input_key,)) for _ in range(8)]
# 启动所有生产者线程
for pt in producer_threading:
pt.start()
# 等待生产者线程完成
for pt_join in producer_threading:
pt_join.join()
# 创建消费者线程
consumer_threading = [threading.Thread(target=self.consumer) for _ in range(5)]
# 启动所有消费者线程
for ct in consumer_threading:
ct.start()
# 等待消费者线程完成
for ct_join in consumer_threading:
ct_join.join()
return input_key
# 数据去重及持久化存储类
class SaveData:
def __init__(self):
self.client_syn = None # Redis 客户端连接
self.mongo_client = None # MongoDB 客户端连接
self.key_name = 'B_file' # Redis 中用于去重的键名
self.unique_lst = [] # 存储去重后的数据列表
# 初始化 MongoDB 连接
def mongo_init(self):
self.mongo_client = pymongo.MongoClient()
# 初始化 Redis 连接
def redis_init(self):
self.client_syn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
# 去除重复数据,基于 URL 的 MD5 值进行判重
def del_same_data(self):
print('开始去重...')
for class_data in lst:
md5_url = hashlib.md5(class_data['url'].encode('utf-8')).hexdigest()
# 若该 URL 的哈希值未存在于 Redis 集合中,则视为新数据
if not self.client_syn.sismember(self.key_name, md5_url):
self.unique_lst.append(class_data)
self.client_syn.sadd(self.key_name, md5_url)
print('------------------去重结束----------------------')
print(self.unique_lst)
# 将去重后的数据保存至 MongoDB
def save_data(self, input_class):
def main(self, input_key):
# 初始化 Redis
self.redis_init()
# 去除重复数据
self.del_same_data()
# 将数据保存到数据库
self.save_data(input_key)
# MongoDB 初始化与数据存储
self.mongo_init()
print('开始存入mongoDB...')
db_client = self.mongo_client['b_class'][f'{input_class}']
db_client.insert_many(self.unique_lst)
print('数据存储结束...')
[此处为图片1]
if __name__ == '__main__':
# 启动多线程进行数据爬取和处理
get_data = GetData('https://api.bilibili.com/x/web-interface/wbi/search/type')
main_spider = get_data.main()
save_data = SaveData()
save_data.main(main_spider)
print('程序结束...')
小结
当前代码仍有优化空间,例如可将去重操作与数据存储过程改为异步协程方式实现,进一步提升执行效率。感兴趣的同学可以尝试改进,继续努力!


雷达卡


京公网安备 11010802022788号







