Files
NeoBot/plugins/bili_parser.py
K2cr2O1 24af862924 feat: 添加性能优化和架构文档,更新依赖和核心模块
refactor(browser_manager): 实现页面池机制以提升性能
refactor(image_manager): 添加模板缓存并集成页面池
refactor(bili_parser): 迁移到异步HTTP请求并实现会话复用
docs: 新增性能优化、架构设计和最佳实践文档
chore: 更新requirements.txt添加新依赖
2026-01-13 03:56:31 +08:00

258 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from bs4 import BeautifulSoup
from typing import Optional, Dict, Any, Union
from cachetools import TTLCache
from core.utils.logger import logger
from core.managers.command_manager import matcher
from models import MessageEvent, MessageSegment
# 创建一个TTL缓存最大容量100缓存时间10秒
processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
__plugin_meta__ = {
"name": "bili_parser",
"description": "自动解析B站分享卡片提取视频封面和播放量等信息。",
"usage": "自动触发当检测到B站小程序分享卡片时自动发送视频信息。",
}
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# 全局共享的 ClientSession
_session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession()
return _session
def format_count(num: int) -> str:
if not isinstance(num, int):
return str(num)
if num < 10000:
return str(num)
return f"{num / 10000:.1f}"
def format_duration(seconds: int) -> str:
"""将秒数格式化为 MM:SS 的形式"""
if not isinstance(seconds, int) or seconds < 0:
return "滚木"
minutes, seconds = divmod(seconds, 60)
return f"{minutes:02d}:{seconds:02d}"
async def get_real_url(short_url: str) -> Optional[str]:
try:
session = await get_session()
async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302:
return response.headers.get('Location')
except Exception as e:
logger.error(f"获取真实URL失败: {e}")
return None
async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
try:
session = await get_session()
async with session.get(video_url, headers=HEADERS, timeout=5) as response:
response.raise_for_status()
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string:
return None
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string)
if not match:
return None
json_str = match.group(1)
data = json.loads(json_str)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"解析视频信息失败: {e}")
return None
async def get_direct_video_url(video_url: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
:param video_url: B站视频的完整URL
:return: 视频直链URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=HEADERS, timeout=10) as response:
response.raise_for_status()
data = await response.json()
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e:
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
return None
BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/[a-zA-Z0-9_]+|b23\.tv/[a-zA-Z0-9]+)")
@matcher.on_message()
async def handle_bili_share(event: MessageEvent):
"""
处理消息检测B站分享链接JSON卡片或文本链接并进行解析。
:param event: 消息事件对象
"""
# 消息去重
if event.message_id in processed_messages:
return
processed_messages[event.message_id] = True
# 忽略机器人自己发送的消息,防止无限循环
if event.user_id == event.self_id:
return
url_to_process = None
# 1. 优先解析JSON卡片中的短链接
for segment in event.message:
if segment.type == "json":
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if short_url and "b23.tv" in short_url:
url_to_process = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {url_to_process}")
break # 找到后立即跳出循环
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
continue
# 2. 如果未在JSON卡片中找到链接则在文本消息中查找
if not url_to_process:
for segment in event.message:
if segment.type == "text":
text_content = segment.data.get("text", "")
match = BILI_URL_PATTERN.search(text_content)
if match:
url_to_process = match.group(0)
logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {url_to_process}")
break # 找到后立即跳出循环
# 3. 如果找到了任何类型的B站链接则进行处理
if url_to_process:
await process_bili_link(event, url_to_process)
async def process_bili_link(event: MessageEvent, url: str):
"""
处理B站链接长链接或短链接获取信息并回复
:param event: 消息事件对象
:param url: 待处理的B站链接
"""
if "b23.tv" in url:
real_url = await get_real_url(url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。")
await event.reply("无法解析B站短链接。")
return
else:
real_url = url.split('?')[0]
video_info = await parse_video_info(real_url)
if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
# 检查视频时长
video_message: Union[str, MessageSegment]
if video_info['duration'] > 300: # 5分钟 = 300秒
video_message = "视频时长超过5分钟不进行解析。"
else:
direct_url = await get_direct_video_url(real_url)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
video_message = "视频解析失败,无法获取直链。"
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" UP主: {video_info['owner_name']}\n"
f" 粉丝: {format_count(video_info['followers'])}\n"
f"--------------------\n"
f" 标题: {video_info['title']}\n"
f" BV号: {video_info['bvid']}\n"
f" 时长: {format_duration(video_info['duration'])}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {format_count(video_info['play'])}\n"
f" 点赞: {format_count(video_info['like'])}\n"
f" 投币: {format_count(video_info['coin'])}\n"
f" 收藏: {format_count(video_info['favorite'])}\n"
f" 转发: {format_count(video_info['share'])}\n"
f" B站链接: {url}"
)
image_message_segment = [
MessageSegment.text("B站封面"),
MessageSegment.image(video_info['cover_url'])
]
up_info_segment = [
MessageSegment.text("UP主头像"),
MessageSegment.image(video_info['owner_avatar'])
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=video_message)
]
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
await event.bot.send_forwarded_messages(target=event, nodes=nodes)