refactor(WS): 使用TYPE_CHECKING优化导入并延迟导入Bot类 refactor(image_manager): 使用系统临时目录替代自定义临时目录 feat(bili/douyin): 添加直接发送视频/图片功能 chore: 删除forward_test插件并添加furry插件 refactor(main): 移除JIT检查代码并优化插件重载逻辑
269 lines
11 KiB
Python
269 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
import orjson
|
||
import aiohttp
|
||
from typing import Optional, Dict, Any, List, Union
|
||
from bs4 import BeautifulSoup
|
||
|
||
from core.utils.logger import logger
|
||
from models import MessageEvent, MessageSegment
|
||
from ..base import BaseParser
|
||
from ..utils import format_duration
|
||
|
||
from cachetools import TTLCache
|
||
|
||
class BiliParser(BaseParser):
|
||
"""
|
||
B站视频解析器
|
||
"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.name = "B站解析器"
|
||
self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
|
||
self.nickname = "B站视频解析"
|
||
# 消息去重缓存
|
||
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
|
||
|
||
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
解析B站视频信息
|
||
|
||
Args:
|
||
url (str): B站视频URL
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None
|
||
"""
|
||
try:
|
||
# 清理URL
|
||
clean_url = url.split('?')[0]
|
||
if '#/' in clean_url:
|
||
clean_url = clean_url.split('#/')[0]
|
||
|
||
session = self.get_session()
|
||
async with session.get(clean_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=5)) as response:
|
||
response.raise_for_status()
|
||
text = await response.text()
|
||
soup = BeautifulSoup(text, 'html.parser')
|
||
|
||
# 尝试多种方式获取视频数据
|
||
# 方式1: 尝试获取 __INITIAL_STATE__
|
||
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
|
||
if not script_tag or not script_tag.string:
|
||
# 方式2: 尝试获取 __PLAYINFO__
|
||
script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__'))
|
||
|
||
if not script_tag or not script_tag.string:
|
||
# 方式3: 尝试获取页面标题和其他信息
|
||
title_tag = soup.find('title')
|
||
if title_tag:
|
||
title = title_tag.get_text().strip()
|
||
# 提取BV号
|
||
bv_match = re.search(r'(BV\w{10})', clean_url)
|
||
bvid = bv_match.group(1) if bv_match else '未知BV号'
|
||
|
||
return {
|
||
"title": title.replace('_哔哩哔哩_bilibili', '').strip(),
|
||
"bvid": bvid,
|
||
"duration": 0,
|
||
"cover_url": '',
|
||
"play": 0,
|
||
"like": 0,
|
||
"coin": 0,
|
||
"favorite": 0,
|
||
"share": 0,
|
||
"owner_name": '未知UP主',
|
||
"owner_avatar": '',
|
||
"followers": 0,
|
||
}
|
||
return None
|
||
|
||
# 原始解析逻辑
|
||
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string)
|
||
if not match:
|
||
# 尝试另一种正则表达式
|
||
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL)
|
||
|
||
if not match:
|
||
return None
|
||
|
||
json_str = match.group(1)
|
||
# 清理JSON字符串中的潜在问题字符
|
||
json_str = json_str.strip().rstrip(';')
|
||
|
||
try:
|
||
data = orjson.loads(json_str)
|
||
except ValueError:
|
||
# 如果直接解析失败,尝试清理JSON字符串
|
||
# 移除可能的注释或无效字符
|
||
cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号
|
||
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释
|
||
cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释
|
||
data = orjson.loads(cleaned_json)
|
||
|
||
video_data = data.get('videoData', {})
|
||
up_data = data.get('upData', {})
|
||
stat = video_data.get('stat', {})
|
||
owner = video_data.get('owner', {})
|
||
|
||
cover_url = video_data.get('pic', '')
|
||
if cover_url:
|
||
cover_url = cover_url.split('@')[0]
|
||
if cover_url.startswith('//'):
|
||
cover_url = 'https:' + cover_url
|
||
|
||
owner_avatar = owner.get('face', '')
|
||
if owner_avatar:
|
||
if owner_avatar.startswith('//'):
|
||
owner_avatar = 'https:' + owner_avatar
|
||
owner_avatar = owner_avatar.split('@')[0]
|
||
|
||
return {
|
||
"title": video_data.get('title', '未知标题'),
|
||
"bvid": video_data.get('bvid', '未知BV号'),
|
||
"duration": video_data.get('duration', 0),
|
||
"cover_url": cover_url,
|
||
"play": stat.get('view', 0),
|
||
"like": stat.get('like', 0),
|
||
"coin": stat.get('coin', 0),
|
||
"favorite": stat.get('favorite', 0),
|
||
"share": stat.get('share', 0),
|
||
"owner_name": owner.get('name', '未知UP主'),
|
||
"owner_avatar": owner_avatar,
|
||
"followers": up_data.get('fans', 0),
|
||
}
|
||
|
||
except (aiohttp.ClientError, KeyError, AttributeError, ValueError) as e:
|
||
logger.error(f"[{self.name}] 解析视频信息失败: {e}")
|
||
logger.debug(f"失败的URL: {url}")
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 解析视频信息时发生未知错误: {e}")
|
||
logger.debug(f"失败的URL: {url}")
|
||
|
||
return None
|
||
|
||
async def get_real_url(self, short_url: str) -> Optional[str]:
|
||
"""
|
||
获取B站短链接的真实URL
|
||
|
||
Args:
|
||
short_url (str): B站短链接
|
||
|
||
Returns:
|
||
Optional[str]: 真实URL,如果失败则返回None
|
||
"""
|
||
try:
|
||
session = self.get_session()
|
||
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=aiohttp.ClientTimeout(total=5)) as response:
|
||
if response.status == 302:
|
||
return response.headers.get('Location')
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
|
||
return None
|
||
|
||
async def get_direct_video_url(self, video_url: str) -> Optional[str]:
|
||
"""
|
||
调用第三方API解析B站视频直链
|
||
|
||
Args:
|
||
video_url (str): B站视频的完整URL
|
||
|
||
Returns:
|
||
Optional[str]: 视频直链URL,如果失败则返回None
|
||
"""
|
||
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
|
||
response.raise_for_status()
|
||
# 使用 content_type=None 来忽略 Content-Type 检查
|
||
data = await response.json(content_type=None)
|
||
if data.get("code") == 200 and data.get("data"):
|
||
return data["data"][0].get("video_url")
|
||
except (aiohttp.ClientError, ValueError, KeyError, IndexError) as e:
|
||
logger.error(f"[{self.name}] 调用第三方API解析视频失败: {e}")
|
||
return None
|
||
|
||
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
|
||
"""
|
||
格式化B站视频响应消息
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
data (Dict[str, Any]): 视频信息
|
||
|
||
Returns:
|
||
List[Any]: 消息段列表
|
||
"""
|
||
# 检查视频时长
|
||
video_message: Union[str, MessageSegment]
|
||
direct_url = None
|
||
if data['duration'] > 1200: # 20分钟 = 1200秒
|
||
video_message = "视频时长超过20分钟,不进行解析。"
|
||
else:
|
||
# 构建完整的B站视频URL
|
||
video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}"
|
||
direct_url = await self.get_direct_video_url(video_url)
|
||
if direct_url:
|
||
video_message = MessageSegment.video(direct_url)
|
||
else:
|
||
video_message = "视频解析失败,无法获取直链。"
|
||
|
||
text_message = (
|
||
f"BiliBili 视频解析\n"
|
||
f"--------------------\n"
|
||
f" UP主: {data['owner_name']}\n"
|
||
f" 粉丝: {self.format_count(data['followers'])}\n"
|
||
f"--------------------\n"
|
||
f" 标题: {data['title']}\n"
|
||
f" BV号: {data['bvid']}\n"
|
||
f" 时长: {format_duration(data['duration'])}\n"
|
||
f"--------------------\n"
|
||
f" 数据:\n"
|
||
f" 播放: {self.format_count(data['play'])}\n"
|
||
f" 点赞: {self.format_count(data['like'])}\n"
|
||
f" 投币: {self.format_count(data['coin'])}\n"
|
||
f" 收藏: {self.format_count(data['favorite'])}\n"
|
||
f" 转发: {self.format_count(data['share'])}\n"
|
||
)
|
||
|
||
image_message_segment = [
|
||
MessageSegment.text("B站封面:"),
|
||
MessageSegment.image(data['cover_url'])
|
||
]
|
||
|
||
up_info_segment = [
|
||
MessageSegment.text("UP主头像:"),
|
||
MessageSegment.image(data['owner_avatar'])
|
||
]
|
||
|
||
nodes = [
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=text_message),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=image_message_segment),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=up_info_segment),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=video_message)
|
||
]
|
||
|
||
# 同时直接发送视频(如果获取到直链)
|
||
if direct_url:
|
||
try:
|
||
await event.reply(MessageSegment.video(direct_url))
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 直接发送视频失败: {e}")
|
||
|
||
return nodes
|
||
|
||
def should_handle_url(self, url: str) -> bool:
|
||
"""
|
||
判断是否应该处理该URL
|
||
|
||
Args:
|
||
url (str): URL
|
||
|
||
Returns:
|
||
bool: 是否应该处理
|
||
"""
|
||
# 检查是否是B站相关域名,包括短链接
|
||
return bool(self.url_pattern.search(url))
|