feat(scripts): 添加Python环境检查脚本 feat(scripts): 增强依赖导出脚本功能 perf(plugins/bili_parser): 优化B站解析器性能和代码结构 style(plugins/bili_parser): 统一代码风格和常量命名
280 lines
10 KiB
Python
280 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
import json
|
||
import aiohttp
|
||
from bs4 import BeautifulSoup
|
||
from typing import Optional, Dict, Any, Union
|
||
from cachetools import TTLCache
|
||
|
||
from core.utils.logger import logger
|
||
from core.managers.command_manager import matcher
|
||
from models import MessageEvent, MessageSegment
|
||
|
||
# 创建一个TTL缓存,最大容量100,缓存时间10秒
|
||
processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
|
||
|
||
# 插件元数据
|
||
__plugin_meta__ = {
|
||
"name": "bili_parser",
|
||
"description": "自动解析B站分享卡片,提取视频封面和播放量等信息。",
|
||
"usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。",
|
||
}
|
||
|
||
# 常量定义
|
||
BILI_NICKNAME = "B站视频解析"
|
||
|
||
HEADERS = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
# 全局共享的 ClientSession
|
||
_session: Optional[aiohttp.ClientSession] = None
|
||
|
||
async def get_session() -> aiohttp.ClientSession:
|
||
global _session
|
||
if _session is None or _session.closed:
|
||
_session = aiohttp.ClientSession(headers=HEADERS)
|
||
return _session
|
||
|
||
|
||
def format_count(num: int) -> str:
|
||
if not isinstance(num, int):
|
||
return str(num)
|
||
if num < 10000:
|
||
return str(num)
|
||
return f"{num / 10000:.1f}万"
|
||
|
||
|
||
def format_duration(seconds: int) -> str:
|
||
"""将秒数格式化为 MM:SS 的形式"""
|
||
if not isinstance(seconds, int) or seconds < 0:
|
||
return "滚木"
|
||
minutes, seconds = divmod(seconds, 60)
|
||
return f"{minutes:02d}:{seconds:02d}"
|
||
|
||
|
||
async def get_real_url(short_url: str) -> Optional[str]:
|
||
try:
|
||
session = await get_session()
|
||
async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response:
|
||
if response.status == 302:
|
||
return response.headers.get('Location')
|
||
except Exception as e:
|
||
logger.error(f"获取真实URL失败: {e}")
|
||
return None
|
||
|
||
async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
session = await get_session()
|
||
async with session.get(video_url, headers=HEADERS, timeout=5) as response:
|
||
response.raise_for_status()
|
||
text = await response.text()
|
||
soup = BeautifulSoup(text, 'html.parser')
|
||
|
||
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
|
||
if not script_tag or not script_tag.string:
|
||
return None
|
||
|
||
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^\}]*\});', script_tag.string)
|
||
if not match:
|
||
return None
|
||
|
||
json_str = match.group(1)
|
||
data = json.loads(json_str)
|
||
|
||
video_data = data.get('videoData', {})
|
||
up_data = data.get('upData', {})
|
||
stat = video_data.get('stat', {})
|
||
owner = video_data.get('owner', {})
|
||
|
||
cover_url = video_data.get('pic', '')
|
||
if cover_url:
|
||
cover_url = cover_url.split('@')[0]
|
||
if cover_url.startswith('//'):
|
||
cover_url = 'https:' + cover_url
|
||
|
||
owner_avatar = owner.get('face', '')
|
||
if owner_avatar:
|
||
if owner_avatar.startswith('//'):
|
||
owner_avatar = 'https:' + owner_avatar
|
||
owner_avatar = owner_avatar.split('@')[0]
|
||
|
||
return {
|
||
"title": video_data.get('title', '未知标题'),
|
||
"bvid": video_data.get('bvid', '未知BV号'),
|
||
"duration": video_data.get('duration', 0),
|
||
"cover_url": cover_url,
|
||
"play": stat.get('view', 0),
|
||
"like": stat.get('like', 0),
|
||
"coin": stat.get('coin', 0),
|
||
"favorite": stat.get('favorite', 0),
|
||
"share": stat.get('share', 0),
|
||
"owner_name": owner.get('name', '未知UP主'),
|
||
"owner_avatar": owner_avatar,
|
||
"followers": up_data.get('fans', 0),
|
||
}
|
||
|
||
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
|
||
logger.error(f"解析视频信息失败: {e}")
|
||
|
||
return None
|
||
|
||
async def get_direct_video_url(video_url: str) -> Optional[str]:
|
||
"""
|
||
调用第三方API解析B站视频直链
|
||
:param video_url: B站视频的完整URL
|
||
:return: 视频直链URL,如果失败则返回None
|
||
"""
|
||
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(api_url, headers=HEADERS, timeout=10) as response:
|
||
response.raise_for_status()
|
||
# 使用 content_type=None 来忽略 Content-Type 检查
|
||
# 因为 API 返回 text/json 而不是标准的 application/json
|
||
data = await response.json(content_type=None)
|
||
if data.get("code") == 200 and data.get("data"):
|
||
return data["data"][0].get("video_url")
|
||
except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e:
|
||
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
|
||
return None
|
||
|
||
BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
|
||
|
||
|
||
def extract_url_from_json_segments(segments):
|
||
"""
|
||
从消息的JSON段中提取B站链接
|
||
:param segments: 消息段列表
|
||
:return: 提取到的URL或None
|
||
"""
|
||
for segment in segments:
|
||
if segment.type == "json":
|
||
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
|
||
try:
|
||
json_data = json.loads(segment.data.get("data", "{}"))
|
||
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
|
||
|
||
if short_url and "b23.tv" in short_url:
|
||
extracted_url = short_url.split('?')[0]
|
||
logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {extracted_url}")
|
||
return extracted_url
|
||
except (json.JSONDecodeError, KeyError) as e:
|
||
logger.error(f"[bili_parser] 解析JSON失败: {e}")
|
||
continue
|
||
return None
|
||
|
||
def extract_url_from_text_segments(segments):
|
||
"""
|
||
从消息的文本段中提取B站链接
|
||
:param segments: 消息段列表
|
||
:return: 提取到的URL或None
|
||
"""
|
||
for segment in segments:
|
||
if segment.type == "text":
|
||
text_content = segment.data.get("text", "")
|
||
match = BILI_URL_PATTERN.search(text_content)
|
||
if match:
|
||
extracted_url = match.group(0)
|
||
logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {extracted_url}")
|
||
return extracted_url
|
||
return None
|
||
|
||
@matcher.on_message()
|
||
async def handle_bili_share(event: MessageEvent):
|
||
"""
|
||
处理消息,检测B站分享链接(JSON卡片或文本链接)并进行解析。
|
||
:param event: 消息事件对象
|
||
"""
|
||
# 消息去重
|
||
if event.message_id in processed_messages:
|
||
return
|
||
processed_messages[event.message_id] = True
|
||
|
||
# 忽略机器人自己发送的消息,防止无限循环
|
||
if event.user_id == event.self_id:
|
||
return
|
||
|
||
# 1. 优先解析JSON卡片中的短链接
|
||
url_to_process = extract_url_from_json_segments(event.message)
|
||
|
||
# 2. 如果未在JSON卡片中找到链接,则在文本消息中查找
|
||
if not url_to_process:
|
||
url_to_process = extract_url_from_text_segments(event.message)
|
||
|
||
# 3. 如果找到了任何类型的B站链接,则进行处理
|
||
if url_to_process:
|
||
await process_bili_link(event, url_to_process)
|
||
|
||
async def process_bili_link(event: MessageEvent, url: str):
|
||
"""
|
||
处理B站链接(长链接或短链接),获取信息并回复
|
||
:param event: 消息事件对象
|
||
:param url: 待处理的B站链接
|
||
"""
|
||
if "b23.tv" in url:
|
||
real_url = await get_real_url(url)
|
||
if not real_url:
|
||
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。")
|
||
await event.reply("无法解析B站短链接。")
|
||
return
|
||
else:
|
||
real_url = url.split('?')[0]
|
||
|
||
video_info = await parse_video_info(real_url)
|
||
if not video_info:
|
||
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
|
||
await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。")
|
||
return
|
||
|
||
# 检查视频时长
|
||
video_message: Union[str, MessageSegment]
|
||
if video_info['duration'] > 300: # 5分钟 = 300秒
|
||
video_message = "视频时长超过5分钟,不进行解析。"
|
||
else:
|
||
direct_url = await get_direct_video_url(real_url)
|
||
if direct_url:
|
||
video_message = MessageSegment.video(direct_url)
|
||
else:
|
||
video_message = "视频解析失败,无法获取直链。"
|
||
|
||
text_message = (
|
||
f"BiliBili 视频解析\n"
|
||
f"--------------------\n"
|
||
f" UP主: {video_info['owner_name']}\n"
|
||
f" 粉丝: {format_count(video_info['followers'])}\n"
|
||
f"--------------------\n"
|
||
f" 标题: {video_info['title']}\n"
|
||
f" BV号: {video_info['bvid']}\n"
|
||
f" 时长: {format_duration(video_info['duration'])}\n"
|
||
f"--------------------\n"
|
||
f" 数据:\n"
|
||
f" 播放: {format_count(video_info['play'])}\n"
|
||
f" 点赞: {format_count(video_info['like'])}\n"
|
||
f" 投币: {format_count(video_info['coin'])}\n"
|
||
f" 收藏: {format_count(video_info['favorite'])}\n"
|
||
f" 转发: {format_count(video_info['share'])}\n"
|
||
f" B站链接: {url}"
|
||
)
|
||
|
||
image_message_segment = [
|
||
MessageSegment.text("B站封面:"),
|
||
MessageSegment.image(video_info['cover_url'])
|
||
]
|
||
|
||
up_info_segment = [
|
||
MessageSegment.text("UP主头像:"),
|
||
MessageSegment.image(video_info['owner_avatar'])
|
||
]
|
||
|
||
nodes = [
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=text_message),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=image_message_segment),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=up_info_segment),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=video_message)
|
||
]
|
||
|
||
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
|
||
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
|
||
await event.bot.send_forwarded_messages(target=event, nodes=nodes)
|