Merge pull request #67 from Fairy-Oracle-Sanctuary/dev

Dev
This commit is contained in:
镀铬酸钾
2026-03-01 16:05:28 +08:00
committed by GitHub
20 changed files with 2071 additions and 317 deletions

View File

@@ -1,20 +1,31 @@
# -*- coding: utf-8 -*-
import re
import orjson
import aiohttp
from typing import Optional, Dict, Any, List, Union
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from ..base import BaseParser
from ..utils import format_duration
from cachetools import TTLCache
from bilibili_api import video, select_client, Credential
from bilibili_api.exceptions import ResponseCodeException
from core.config_loader import global_config
from core.services.local_file_server import download_to_local
# bilibili_api-python 可用性标志
BILI_API_AVAILABLE = True
# 显式指定使用 aiohttp避免与其他库冲突
try:
select_client("aiohttp")
except Exception as e:
logger.warning(f"设置 bilibili_api 客户端失败: {e}")
class BiliParser(BaseParser):
"""
B站视频解析器
B站视频解析器(使用 bilibili-api-python 库)
"""
def __init__(self):
@@ -22,9 +33,24 @@ class BiliParser(BaseParser):
self.name = "B站解析器"
self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
self.nickname = "B站视频解析"
# 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
def _get_credential(self) -> Optional[Credential]:
"""获取 B 站登录凭证"""
try:
bili_config = global_config.bilibili
if bili_config.sessdata and bili_config.bili_jct and bili_config.buvid3:
return Credential(
sessdata=bili_config.sessdata,
bili_jct=bili_config.bili_jct,
buvid3=bili_config.buvid3,
dedeuserid=bili_config.dedeuserid
)
except Exception:
pass
return None
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析B站视频信息
@@ -35,111 +61,172 @@ class BiliParser(BaseParser):
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
# 提取 BV 号
bvid = self.extract_bvid(url)
if not bvid:
logger.error(f"[{self.name}] 无法从 URL 提取 BV 号: {url}")
return None
try:
# 清理URL
if BILI_API_AVAILABLE:
# 使用 bilibili-api-python 库
credential = self._get_credential()
v = video.Video(bvid=bvid, credential=credential)
info = await v.get_info()
# 处理封面 URL
cover_url = info.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
# 处理 UP 主头像
owner = info.get('owner', {})
owner_name = owner.get('name', '未知UP主')
owner_face = owner.get('face', '')
if owner_face:
if owner_face.startswith('//'):
owner_face = 'https:' + owner_face
owner_face = owner_face.split('@')[0]
# 处理统计信息
stat = info.get('stat', {})
return {
"title": info.get('title', '未知标题'),
"bvid": bvid,
"aid": info.get('aid', 0),
"duration": info.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"danmaku": stat.get('danmaku', 0),
"owner_name": owner_name,
"owner_avatar": owner_face,
"followers": info.get('owner', {}).get('fans', 0),
"description": info.get('desc', ''),
"pubdate": info.get('pubdate', 0),
}
else:
# 备用方案:直接解析页面
return await self._parse_fallback(url, bvid)
except ResponseCodeException as e:
logger.error(f"[{self.name}] API 返回错误: {e.code} - {e.msg}")
except Exception as e:
logger.error(f"[{self.name}] 解析视频信息失败: {e}")
if BILI_API_AVAILABLE:
logger.info(f"[{self.name}] 尝试备用解析方案")
return await self._parse_fallback(url, bvid)
return None
async def _parse_fallback(self, url: str, bvid: str) -> Optional[Dict[str, Any]]:
"""
备用解析方案(不使用 bilibili-api-python
Args:
url (str): B站视频URL
bvid (str): BV号
Returns:
Optional[Dict[str, Any]]: 视频信息字典
"""
try:
session = self.get_session()
clean_url = url.split('?')[0]
if '#/' in clean_url:
clean_url = clean_url.split('#/')[0]
session = self.get_session()
async with session.get(clean_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=5)) as response:
async with session.get(clean_url, headers=self.HEADERS, timeout=5) as response:
response.raise_for_status()
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
# 尝试多种方式获取视频数据
# 方式1: 尝试获取 __INITIAL_STATE__
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string:
# 方式2: 尝试获取 __PLAYINFO__
script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__'))
if not script_tag or not script_tag.string:
# 方式3: 尝试获取页面标题和其他信息
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# 提取BV号
bv_match = re.search(r'(BV\w{10})', clean_url)
bvid = bv_match.group(1) if bv_match else '未知BV号'
return {
"title": title.replace('_哔哩哔哩_bilibili', '').strip(),
"bvid": bvid,
"duration": 0,
"cover_url": '',
"play": 0,
"like": 0,
"coin": 0,
"favorite": 0,
"share": 0,
"owner_name": '未知UP主',
"owner_avatar": '',
"followers": 0,
}
return None
# 原始解析逻辑
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string)
if not match:
# 尝试另一种正则表达式
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL)
if not match:
return None
# 提取标题
import re
title_match = re.search(r'<h1[^>]*>([^<]+)</h1>', text)
title = title_match.group(1).strip() if title_match else '未知标题'
# 提取播放量等信息
play_match = re.search(r'"view":(\d+)', text)
play = int(play_match.group(1)) if play_match else 0
like_match = re.search(r'"like":(\d+)', text)
like = int(like_match.group(1)) if like_match else 0
coin_match = re.search(r'"coin":(\d+)', text)
coin = int(coin_match.group(1)) if coin_match else 0
favorite_match = re.search(r'"favorite":(\d+)', text)
favorite = int(favorite_match.group(1)) if favorite_match else 0
share_match = re.search(r'"share":(\d+)', text)
share = int(share_match.group(1)) if share_match else 0
# 提取 UP 主信息
owner_match = re.search(r'"name":"([^"]+)"', text)
owner_name = owner_match.group(1) if owner_match else '未知UP主'
face_match = re.search(r'"face":"([^"]+)"', text)
owner_face = face_match.group(1) if face_match else ''
if owner_face:
if owner_face.startswith('//'):
owner_face = 'https:' + owner_face
owner_face = owner_face.split('@')[0]
return {
"title": title,
"bvid": bvid,
"aid": 0,
"duration": 0,
"cover_url": '',
"play": play,
"like": like,
"coin": coin,
"favorite": favorite,
"share": share,
"danmaku": 0,
"owner_name": owner_name,
"owner_avatar": owner_face,
"followers": 0,
"description": '',
"pubdate": 0,
}
json_str = match.group(1)
# 清理JSON字符串中的潜在问题字符
json_str = json_str.strip().rstrip(';')
try:
data = orjson.loads(json_str)
except ValueError:
# 如果直接解析失败尝试清理JSON字符串
# 移除可能的注释或无效字符
cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释
cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释
data = orjson.loads(cleaned_json)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (aiohttp.ClientError, KeyError, AttributeError, ValueError) as e:
logger.error(f"[{self.name}] 解析视频信息失败: {e}")
logger.debug(f"失败的URL: {url}")
except Exception as e:
logger.error(f"[{self.name}] 解析视频信息时发生未知错误: {e}")
logger.debug(f"失败的URL: {url}")
logger.error(f"[{self.name}] 备用解析方案失败: {e}")
return None
def extract_bvid(self, url: str) -> Optional[str]:
"""
从 URL 中提取 BV 号
Args:
url (str): B站视频URL
Returns:
Optional[str]: BV号如果失败则返回None
"""
# 方式1: 直接从 URL 中提取
bvid_match = re.search(r'/video/(BV\w+)', url)
if bvid_match:
return bvid_match.group(1)
# 方式2: 从短链接跳转后提取
if 'b23.tv' in url:
try:
session = self.get_session()
# 简单处理,不实际跳转,直接尝试提取
bvid_match = re.search(r'BV\w{10}', url)
if bvid_match:
return bvid_match.group(0)
except Exception:
pass
return None
@@ -155,34 +242,62 @@ class BiliParser(BaseParser):
"""
try:
session = self.get_session()
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=aiohttp.ClientTimeout(total=5)) as response:
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302:
return response.headers.get('Location')
except Exception as e:
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
return None
async def get_direct_video_url(self, video_url: str) -> Optional[str]:
async def get_direct_video_url(self, video_url: str, bvid: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
获取B站视频直链通过本地文件服务器下载
Args:
video_url (str): B站视频的完整URL
bvid (str): BV号
Returns:
Optional[str]: 视频直链URL如果失败则返回None
Optional[str]: 本地视频 URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
if not BILI_API_AVAILABLE:
return None
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status()
# 使用 content_type=None 来忽略 Content-Type 检查
data = await response.json(content_type=None)
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (aiohttp.ClientError, ValueError, KeyError, IndexError) as e:
logger.error(f"[{self.name}] 调用第三方API解析视频失败: {e}")
credential = self._get_credential()
v = video.Video(bvid=bvid, credential=credential)
# 先获取视频信息以获取 cid
info = await v.get_info()
cid = info.get('cid', 0)
if not cid:
return None
# 获取下载链接数据
download_url_data = await v.get_download_url(cid=cid)
# 使用 VideoDownloadURLDataDetecter 解析数据
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detecter.detect_best_streams()
if streams:
# 获取视频直链
video_direct_url = streams[0].url
logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...")
# 使用本地文件服务器下载
local_url = await download_to_local(video_direct_url, timeout=120)
if local_url:
logger.success(f"[{self.name}] 视频已下载到本地: {local_url}")
return local_url
else:
logger.error(f"[{self.name}] 下载到本地失败")
return None
except Exception as e:
logger.error(f"[{self.name}] 获取视频直链失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
@@ -204,7 +319,8 @@ class BiliParser(BaseParser):
else:
# 构建完整的B站视频URL
video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}"
direct_url = await self.get_direct_video_url(video_url)
bvid = data.get('bvid', '')
direct_url = await self.get_direct_video_url(video_url, bvid)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
@@ -226,6 +342,7 @@ class BiliParser(BaseParser):
f" 投币: {self.format_count(data['coin'])}\n"
f" 收藏: {self.format_count(data['favorite'])}\n"
f" 转发: {self.format_count(data['share'])}\n"
f" 弹幕: {self.format_count(data.get('danmaku', 0))}\n"
)
image_message_segment = [
@@ -264,5 +381,4 @@ class BiliParser(BaseParser):
Returns:
bool: 是否应该处理
"""
# 检查是否是B站相关域名包括短链接
return bool(self.url_pattern.search(url))