Files
NeoBot/plugins/bili_parser.py
K2cr2O1 bd8726b768 refactor(scripts): 重构并优化脚本文件结构
feat(scripts): 添加Python环境检查脚本
feat(scripts): 增强依赖导出脚本功能
perf(plugins/bili_parser): 优化B站解析器性能和代码结构
style(plugins/bili_parser): 统一代码风格和常量命名
2026-01-18 21:01:15 +08:00

280 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from bs4 import BeautifulSoup
from typing import Optional, Dict, Any, Union
from cachetools import TTLCache
from core.utils.logger import logger
from core.managers.command_manager import matcher
from models import MessageEvent, MessageSegment
# 创建一个TTL缓存最大容量100缓存时间10秒
processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
# 插件元数据
__plugin_meta__ = {
"name": "bili_parser",
"description": "自动解析B站分享卡片提取视频封面和播放量等信息。",
"usage": "自动触发当检测到B站小程序分享卡片时自动发送视频信息。",
}
# 常量定义
BILI_NICKNAME = "B站视频解析"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# 全局共享的 ClientSession
_session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession(headers=HEADERS)
return _session
def format_count(num: int) -> str:
if not isinstance(num, int):
return str(num)
if num < 10000:
return str(num)
return f"{num / 10000:.1f}"
def format_duration(seconds: int) -> str:
"""将秒数格式化为 MM:SS 的形式"""
if not isinstance(seconds, int) or seconds < 0:
return "滚木"
minutes, seconds = divmod(seconds, 60)
return f"{minutes:02d}:{seconds:02d}"
async def get_real_url(short_url: str) -> Optional[str]:
try:
session = await get_session()
async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302:
return response.headers.get('Location')
except Exception as e:
logger.error(f"获取真实URL失败: {e}")
return None
async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
try:
session = await get_session()
async with session.get(video_url, headers=HEADERS, timeout=5) as response:
response.raise_for_status()
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string:
return None
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^\}]*\});', script_tag.string)
if not match:
return None
json_str = match.group(1)
data = json.loads(json_str)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"解析视频信息失败: {e}")
return None
async def get_direct_video_url(video_url: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
:param video_url: B站视频的完整URL
:return: 视频直链URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=HEADERS, timeout=10) as response:
response.raise_for_status()
# 使用 content_type=None 来忽略 Content-Type 检查
# 因为 API 返回 text/json 而不是标准的 application/json
data = await response.json(content_type=None)
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e:
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
return None
BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
def extract_url_from_json_segments(segments):
"""
从消息的JSON段中提取B站链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "json":
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if short_url and "b23.tv" in short_url:
extracted_url = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {extracted_url}")
return extracted_url
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
continue
return None
def extract_url_from_text_segments(segments):
"""
从消息的文本段中提取B站链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "text":
text_content = segment.data.get("text", "")
match = BILI_URL_PATTERN.search(text_content)
if match:
extracted_url = match.group(0)
logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {extracted_url}")
return extracted_url
return None
@matcher.on_message()
async def handle_bili_share(event: MessageEvent):
"""
处理消息检测B站分享链接JSON卡片或文本链接并进行解析。
:param event: 消息事件对象
"""
# 消息去重
if event.message_id in processed_messages:
return
processed_messages[event.message_id] = True
# 忽略机器人自己发送的消息,防止无限循环
if event.user_id == event.self_id:
return
# 1. 优先解析JSON卡片中的短链接
url_to_process = extract_url_from_json_segments(event.message)
# 2. 如果未在JSON卡片中找到链接则在文本消息中查找
if not url_to_process:
url_to_process = extract_url_from_text_segments(event.message)
# 3. 如果找到了任何类型的B站链接则进行处理
if url_to_process:
await process_bili_link(event, url_to_process)
async def process_bili_link(event: MessageEvent, url: str):
"""
处理B站链接长链接或短链接获取信息并回复
:param event: 消息事件对象
:param url: 待处理的B站链接
"""
if "b23.tv" in url:
real_url = await get_real_url(url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。")
await event.reply("无法解析B站短链接。")
return
else:
real_url = url.split('?')[0]
video_info = await parse_video_info(real_url)
if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
# 检查视频时长
video_message: Union[str, MessageSegment]
if video_info['duration'] > 300: # 5分钟 = 300秒
video_message = "视频时长超过5分钟不进行解析。"
else:
direct_url = await get_direct_video_url(real_url)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
video_message = "视频解析失败,无法获取直链。"
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" UP主: {video_info['owner_name']}\n"
f" 粉丝: {format_count(video_info['followers'])}\n"
f"--------------------\n"
f" 标题: {video_info['title']}\n"
f" BV号: {video_info['bvid']}\n"
f" 时长: {format_duration(video_info['duration'])}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {format_count(video_info['play'])}\n"
f" 点赞: {format_count(video_info['like'])}\n"
f" 投币: {format_count(video_info['coin'])}\n"
f" 收藏: {format_count(video_info['favorite'])}\n"
f" 转发: {format_count(video_info['share'])}\n"
f" B站链接: {url}"
)
image_message_segment = [
MessageSegment.text("B站封面"),
MessageSegment.image(video_info['cover_url'])
]
up_info_segment = [
MessageSegment.text("UP主头像"),
MessageSegment.image(video_info['owner_avatar'])
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=video_message)
]
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
await event.bot.send_forwarded_messages(target=event, nodes=nodes)