Files
NeoBot/plugins/web_parser/parsers/bili.py
K2cr2O1 c05ac03af1 feat: 更新配置和功能,修复B站解析问题
- 将WebSocket地址改为本地127.0.0.1
- 修改命令前缀为"/"
- 延长B站视频解析时长限制至2小时
- 更新版本号至v1.0.1并生成变更日志
- 完全重写依赖项列表
- 新增HTML格式的变更日志页面
2026-03-07 17:39:01 +08:00

269 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import re
import orjson
import aiohttp
from typing import Optional, Dict, Any, List, Union
from bs4 import BeautifulSoup
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from ..base import BaseParser
from ..utils import format_duration
from cachetools import TTLCache
class BiliParser(BaseParser):
"""
B站视频解析器
"""
def __init__(self):
super().__init__()
self.name = "B站解析器"
self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
self.nickname = "B站视频解析"
# 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析B站视频信息
Args:
url (str): B站视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
try:
# 清理URL
clean_url = url.split('?')[0]
if '#/' in clean_url:
clean_url = clean_url.split('#/')[0]
session = self.get_session()
async with session.get(clean_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=5)) as response:
response.raise_for_status()
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
# 尝试多种方式获取视频数据
# 方式1: 尝试获取 __INITIAL_STATE__
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string:
# 方式2: 尝试获取 __PLAYINFO__
script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__'))
if not script_tag or not script_tag.string:
# 方式3: 尝试获取页面标题和其他信息
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# 提取BV号
bv_match = re.search(r'(BV\w{10})', clean_url)
bvid = bv_match.group(1) if bv_match else '未知BV号'
return {
"title": title.replace('_哔哩哔哩_bilibili', '').strip(),
"bvid": bvid,
"duration": 0,
"cover_url": '',
"play": 0,
"like": 0,
"coin": 0,
"favorite": 0,
"share": 0,
"owner_name": '未知UP主',
"owner_avatar": '',
"followers": 0,
}
return None
# 原始解析逻辑
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string)
if not match:
# 尝试另一种正则表达式
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL)
if not match:
return None
json_str = match.group(1)
# 清理JSON字符串中的潜在问题字符
json_str = json_str.strip().rstrip(';')
try:
data = orjson.loads(json_str)
except ValueError:
# 如果直接解析失败尝试清理JSON字符串
# 移除可能的注释或无效字符
cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释
cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释
data = orjson.loads(cleaned_json)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (aiohttp.ClientError, KeyError, AttributeError, ValueError) as e:
logger.error(f"[{self.name}] 解析视频信息失败: {e}")
logger.debug(f"失败的URL: {url}")
except Exception as e:
logger.error(f"[{self.name}] 解析视频信息时发生未知错误: {e}")
logger.debug(f"失败的URL: {url}")
return None
async def get_real_url(self, short_url: str) -> Optional[str]:
"""
获取B站短链接的真实URL
Args:
short_url (str): B站短链接
Returns:
Optional[str]: 真实URL如果失败则返回None
"""
try:
session = self.get_session()
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 302:
return response.headers.get('Location')
except Exception as e:
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
return None
async def get_direct_video_url(self, video_url: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
Args:
video_url (str): B站视频的完整URL
Returns:
Optional[str]: 视频直链URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status()
# 使用 content_type=None 来忽略 Content-Type 检查
data = await response.json(content_type=None)
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (aiohttp.ClientError, ValueError, KeyError, IndexError) as e:
logger.error(f"[{self.name}] 调用第三方API解析视频失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
"""
格式化B站视频响应消息
Args:
event (MessageEvent): 消息事件对象
data (Dict[str, Any]): 视频信息
Returns:
List[Any]: 消息段列表
"""
# 检查视频时长
video_message: Union[str, MessageSegment]
direct_url = None
if data['duration'] > 7200: # 2小时 = 7200秒
video_message = "视频时长超过2小时不进行解析。"
else:
# 构建完整的B站视频URL
video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}"
direct_url = await self.get_direct_video_url(video_url)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
video_message = "视频解析失败,无法获取直链。"
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" UP主: {data['owner_name']}\n"
f" 粉丝: {self.format_count(data['followers'])}\n"
f"--------------------\n"
f" 标题: {data['title']}\n"
f" BV号: {data['bvid']}\n"
f" 时长: {format_duration(data['duration'])}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {self.format_count(data['play'])}\n"
f" 点赞: {self.format_count(data['like'])}\n"
f" 投币: {self.format_count(data['coin'])}\n"
f" 收藏: {self.format_count(data['favorite'])}\n"
f" 转发: {self.format_count(data['share'])}\n"
)
image_message_segment = [
MessageSegment.text("B站封面"),
MessageSegment.image(data['cover_url'])
]
up_info_segment = [
MessageSegment.text("UP主头像"),
MessageSegment.image(data['owner_avatar'])
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=video_message)
]
# 同时直接发送视频(如果获取到直链)
if direct_url:
try:
await event.reply(MessageSegment.video(direct_url))
except Exception as e:
logger.error(f"[{self.name}] 直接发送视频失败: {e}")
return nodes
def should_handle_url(self, url: str) -> bool:
"""
判断是否应该处理该URL
Args:
url (str): URL
Returns:
bool: 是否应该处理
"""
# 检查是否是B站相关域名包括短链接
return bool(self.url_pattern.search(url))