楼主: 孙宇鹏
137 0

[其他] 04-多线程爬取B站视频信息 [推广有奖]

  • 0关注
  • 0粉丝

等待验证会员

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-6-25
最后登录
2018-6-25

楼主
孙宇鹏 发表于 2025-12-9 18:49:02 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

本案例旨在验证对多线程机制的理解,并结合 Redis 与 MongoDB 的基础应用。以下为具体实现代码:

[此处为图片1]

系统架构采用多线程方式进行数据爬取与处理,存储流程中利用 Redis 实现去重操作,随后将清洗后的数据写入 MongoDB。当前未引入协程机制,原因在于所使用的 Python 3.12 版本与 aioredis 存在兼容性问题。

import threading
import requests
from queue import Queue
import re
import hashlib
import redis
import pymongo

lst = []
    

用于数据抓取和预处理的核心类定义如下:

class GetData:
    def __init__(self, url):
        self.url = url
        self.cookies = {
            # 用户登录后获取的cookie信息
        }
        self.headers = {
            'accept': '*/*',
            'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
            'cache-control': 'no-cache',
            'origin': 'https://search.bilibili.com',
            'pragma': 'no-cache',
            'priority': 'u=1, i',
            'referer': 'https://search.bilibili.com/all?keyword=%E8%81%8C%E5%9C%BA+Excel+%E6%8A%80%E8%83%BD%E9%9B%B6%E5%9F%BA%E7%A1%80%E5%85%A5%E9%97%A8&from_source=webtop_search&spm_id_from=333.1007&search_source=3&page=3&o=60',
            'sec-ch-ua': '"Chromium";v="142", "Microsoft Edge";v="142", "Not_A Brand";v="99"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"Windows"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-site',
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36 Edg/142.0.0.0'
        }
    

该类初始化时接收目标 URL,并配置请求所需的 headers 与 cookies,以模拟真实浏览器行为,提升请求成功率。整体逻辑围绕多线程任务分发、网络请求发送、数据提取、去重判断及持久化存储展开。

在进行数据抓取时,首先需要设置请求所需的参数与cookies信息。以下是用于发送HTTP请求的相关配置内容:

[此处为图片1]

其中包含多个关键字段,如 buvid3、_uuid、DedeUserID 等用户标识信息,以及 SESSDATA、bili_jct 等用于维持会话状态的凭证。此外,还设置了 CURRENT_QUALITY(当前画质)、CURRENT_FNVAL(功能标志值)等偏好参数,并通过 headers 和 cookies 模拟真实浏览器行为以确保请求正常响应。

程序中定义了两个队列:self.p_queue 与 self.c_queue,分别用于存放待处理的页码任务和已获取的响应数据。生产者函数 producer 负责从页码队列中取出参数并发起网络请求。

当 p_queue 不为空时,程序将提取当前页码构建请求参数 params,包括搜索关键词 keyword、分页信息 page 和 page_size、搜索类型 search_type 设定为 video 视频类,以及其他必要字段如 platform=pc、highlight=1 等。

随后使用 requests 库向指定 URL 发起 GET 请求,携带上述参数及登录态 cookies 和请求头 headers。服务器返回的 JSON 数据将被放入消费者队列 c_queue 中,供后续处理使用。

若 p_queue 队列为空,则输出“爬取结束”并终止循环。

另一个方法 consumer 则作为消费者,负责从 c_queue 中读取已抓取的数据结果,并进行解析或存储等后续操作,具体实现可根据实际需求扩展。

[此处为图片2]

# 初始化空列表用于存储数据
lst = []

# 持续从队列中获取数据并解析
while True:
    if not self.c_queue.empty():
        # 从队列中取出数据
        data = self.c_queue.get()['data']['result']
        
        # 遍历每一条数据进行处理
        for i in data:
            # 提取标题内容
            titles = i['title']
            # 使用正则表达式分离关键词高亮部分与普通文本
            title_elems = re.findall(
                r'<em class="keyword">(.*?)</em>|([^<]+)', titles, re.DOTALL)
            title_elem_lst = []
            
            # 整合提取出的标题片段
            for elems in title_elems:
                if elems[0]:
                    title_elem_lst.append(elems[0])
                elif elems[1]:
                    title_elem_lst.append(elems[1])
                elif elems[2]:
                    title_elem_lst.append(elems[2])
            title = ''.join(title_elem_lst)

            # 提取其他字段信息
            url = i['arcurl']
            description = i['description']
            author = i['author']
            cover_image = 'https:' + i['pic']

            # 将整理好的数据添加到列表中
            lst.append({
                'title': title,
                'url': url,
                'description': description,
                'author': author,
                'cover_image': cover_image
            })
    else:
        print('数据解析结束...')
        break

# 主程序执行函数
def main(self):
    # 获取用户输入的搜索关键词
    input_key = input('请输入想要爬取信息的标题:')
    print('爬虫开始...')
    
    # 向生产者队列中放入页码任务(共9页)
    for page in range(1, 10):
        self.p_queue.put(page)
    
    # 创建多个生产者线程
    producer_threading = [threading.Thread(target=self.producer, args=(input_key,)) for _ in range(8)]
    
    # 启动所有生产者线程
    for pt in producer_threading:
        pt.start()
    
    # 等待生产者线程完成
    for pt_join in producer_threading:
        pt_join.join()
    
    # 创建消费者线程
    consumer_threading = [threading.Thread(target=self.consumer) for _ in range(5)]
    
    # 启动所有消费者线程
    for ct in consumer_threading:
        ct.start()
    
    # 等待消费者线程完成
    for ct_join in consumer_threading:
        ct_join.join()
    
    return input_key

# 数据去重及持久化存储类
class SaveData:
    def __init__(self):
        self.client_syn = None          # Redis 客户端连接
        self.mongo_client = None        # MongoDB 客户端连接
        self.key_name = 'B_file'        # Redis 中用于去重的键名
        self.unique_lst = []            # 存储去重后的数据列表

    # 初始化 MongoDB 连接
    def mongo_init(self):
        self.mongo_client = pymongo.MongoClient()

    # 初始化 Redis 连接
    def redis_init(self):
        self.client_syn = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)

    # 去除重复数据,基于 URL 的 MD5 值进行判重
    def del_same_data(self):
        print('开始去重...')
        for class_data in lst:
            md5_url = hashlib.md5(class_data['url'].encode('utf-8')).hexdigest()
            
            # 若该 URL 的哈希值未存在于 Redis 集合中,则视为新数据
            if not self.client_syn.sismember(self.key_name, md5_url):
                self.unique_lst.append(class_data)
                self.client_syn.sadd(self.key_name, md5_url)
                
        print('------------------去重结束----------------------')
        print(self.unique_lst)

    # 将去重后的数据保存至 MongoDB
    def save_data(self, input_class):
def main(self, input_key):
    # 初始化 Redis
    self.redis_init()
    # 去除重复数据
    self.del_same_data()
    # 将数据保存到数据库
    self.save_data(input_key)

# MongoDB 初始化与数据存储
self.mongo_init()
print('开始存入mongoDB...')
db_client = self.mongo_client['b_class'][f'{input_class}']
db_client.insert_many(self.unique_lst)
print('数据存储结束...')

[此处为图片1]

if __name__ == '__main__':
    # 启动多线程进行数据爬取和处理
    get_data = GetData('https://api.bilibili.com/x/web-interface/wbi/search/type')
    main_spider = get_data.main()
    save_data = SaveData()
    save_data.main(main_spider)
    print('程序结束...')

小结  
当前代码仍有优化空间,例如可将去重操作与数据存储过程改为异步协程方式实现,进一步提升执行效率。感兴趣的同学可以尝试改进,继续努力!
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:多线程 highlight Microsoft bilibili Platform

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2025-12-21 11:51