添加对B站分离音视频流的合并功能,使用ffmpeg合并m4s格式的视频和音频流 扩展download_file接口支持自定义请求头,用于B站视频下载的Referer校验
629 lines
27 KiB
Python
629 lines
27 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
import os
|
||
import subprocess
|
||
import tempfile
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, List, Union
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
from core.utils.logger import logger
|
||
from models import MessageEvent, MessageSegment
|
||
from ..base import BaseParser
|
||
from ..utils import format_duration
|
||
|
||
from bilibili_api import video, select_client, Credential
|
||
from bilibili_api.exceptions import ResponseCodeException
|
||
from core.config_loader import global_config
|
||
from core.services.local_file_server import download_to_local
|
||
|
||
try:
|
||
import aiohttp
|
||
AIOHTTP_AVAILABLE = True
|
||
except ImportError:
|
||
AIOHTTP_AVAILABLE = False
|
||
logger.warning("[B站解析器] aiohttp 未安装,音视频合并功能将不可用")
|
||
|
||
# bilibili_api-python 可用性标志
|
||
BILI_API_AVAILABLE = True
|
||
|
||
# ffmpeg 可用性标志
|
||
FFMPEG_AVAILABLE = False
|
||
try:
|
||
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
|
||
FFMPEG_AVAILABLE = True
|
||
logger.success("[B站解析器] ffmpeg 已安装,支持合并音视频")
|
||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||
logger.warning("[B站解析器] ffmpeg 未安装,视频可能没有声音。建议安装 ffmpeg 以获得完整音视频体验")
|
||
|
||
# 显式指定使用 aiohttp,避免与其他库冲突
|
||
try:
|
||
select_client("aiohttp")
|
||
except Exception as e:
|
||
logger.warning(f"设置 bilibili_api 客户端失败: {e}")
|
||
|
||
|
||
class BiliParser(BaseParser):
|
||
"""
|
||
B站视频解析器(使用 bilibili-api-python 库)
|
||
"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.name = "B站解析器"
|
||
self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
|
||
self.nickname = "B站视频解析"
|
||
|
||
|
||
|
||
def _get_credential(self) -> Optional[Credential]:
|
||
"""获取 B 站登录凭证"""
|
||
try:
|
||
bili_config = global_config.bilibili
|
||
if bili_config.sessdata and bili_config.bili_jct and bili_config.buvid3:
|
||
return Credential(
|
||
sessdata=bili_config.sessdata,
|
||
bili_jct=bili_config.bili_jct,
|
||
buvid3=bili_config.buvid3,
|
||
dedeuserid=bili_config.dedeuserid
|
||
)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
解析B站视频信息
|
||
|
||
Args:
|
||
url (str): B站视频URL
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None
|
||
"""
|
||
# 提取 BV 号
|
||
bvid = self.extract_bvid(url)
|
||
if not bvid:
|
||
logger.error(f"[{self.name}] 无法从 URL 提取 BV 号: {url}")
|
||
return None
|
||
|
||
try:
|
||
if BILI_API_AVAILABLE:
|
||
# 使用 bilibili-api-python 库
|
||
credential = self._get_credential()
|
||
v = video.Video(bvid=bvid, credential=credential)
|
||
info = await v.get_info()
|
||
|
||
# 处理封面 URL
|
||
cover_url = info.get('pic', '')
|
||
if cover_url:
|
||
cover_url = cover_url.split('@')[0]
|
||
if cover_url.startswith('//'):
|
||
cover_url = 'https:' + cover_url
|
||
|
||
# 处理 UP 主头像
|
||
owner = info.get('owner', {})
|
||
owner_name = owner.get('name', '未知UP主')
|
||
owner_face = owner.get('face', '')
|
||
if owner_face:
|
||
if owner_face.startswith('//'):
|
||
owner_face = 'https:' + owner_face
|
||
owner_face = owner_face.split('@')[0]
|
||
|
||
# 处理统计信息
|
||
stat = info.get('stat', {})
|
||
|
||
return {
|
||
"title": info.get('title', '未知标题'),
|
||
"bvid": bvid,
|
||
"aid": info.get('aid', 0),
|
||
"duration": info.get('duration', 0),
|
||
"cover_url": cover_url,
|
||
"play": stat.get('view', 0),
|
||
"like": stat.get('like', 0),
|
||
"coin": stat.get('coin', 0),
|
||
"favorite": stat.get('favorite', 0),
|
||
"share": stat.get('share', 0),
|
||
"danmaku": stat.get('danmaku', 0),
|
||
"owner_name": owner_name,
|
||
"owner_avatar": owner_face,
|
||
"followers": info.get('owner', {}).get('fans', 0),
|
||
"description": info.get('desc', ''),
|
||
"pubdate": info.get('pubdate', 0),
|
||
}
|
||
else:
|
||
# 备用方案:直接解析页面
|
||
return await self._parse_fallback(url, bvid)
|
||
|
||
except ResponseCodeException as e:
|
||
logger.error(f"[{self.name}] API 返回错误: {e.code} - {e.msg}")
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 解析视频信息失败: {e}")
|
||
if BILI_API_AVAILABLE:
|
||
logger.info(f"[{self.name}] 尝试备用解析方案")
|
||
return await self._parse_fallback(url, bvid)
|
||
|
||
return None
|
||
|
||
async def _parse_fallback(self, url: str, bvid: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
备用解析方案(不使用 bilibili-api-python)
|
||
|
||
Args:
|
||
url (str): B站视频URL
|
||
bvid (str): BV号
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 视频信息字典
|
||
"""
|
||
try:
|
||
session = self.get_session()
|
||
clean_url = url.split('?')[0]
|
||
if '#/' in clean_url:
|
||
clean_url = clean_url.split('#/')[0]
|
||
|
||
async with session.get(clean_url, headers=self.HEADERS, timeout=5) as response:
|
||
response.raise_for_status()
|
||
text = await response.text()
|
||
|
||
# 提取标题
|
||
import re
|
||
title_match = re.search(r'<h1[^>]*>([^<]+)</h1>', text)
|
||
title = title_match.group(1).strip() if title_match else '未知标题'
|
||
|
||
# 提取播放量等信息
|
||
play_match = re.search(r'"view":(\d+)', text)
|
||
play = int(play_match.group(1)) if play_match else 0
|
||
|
||
like_match = re.search(r'"like":(\d+)', text)
|
||
like = int(like_match.group(1)) if like_match else 0
|
||
|
||
coin_match = re.search(r'"coin":(\d+)', text)
|
||
coin = int(coin_match.group(1)) if coin_match else 0
|
||
|
||
favorite_match = re.search(r'"favorite":(\d+)', text)
|
||
favorite = int(favorite_match.group(1)) if favorite_match else 0
|
||
|
||
share_match = re.search(r'"share":(\d+)', text)
|
||
share = int(share_match.group(1)) if share_match else 0
|
||
|
||
# 提取 UP 主信息
|
||
owner_match = re.search(r'"name":"([^"]+)"', text)
|
||
owner_name = owner_match.group(1) if owner_match else '未知UP主'
|
||
|
||
face_match = re.search(r'"face":"([^"]+)"', text)
|
||
owner_face = face_match.group(1) if face_match else ''
|
||
if owner_face:
|
||
if owner_face.startswith('//'):
|
||
owner_face = 'https:' + owner_face
|
||
owner_face = owner_face.split('@')[0]
|
||
|
||
return {
|
||
"title": title,
|
||
"bvid": bvid,
|
||
"aid": 0,
|
||
"duration": 0,
|
||
"cover_url": '',
|
||
"play": play,
|
||
"like": like,
|
||
"coin": coin,
|
||
"favorite": favorite,
|
||
"share": share,
|
||
"danmaku": 0,
|
||
"owner_name": owner_name,
|
||
"owner_avatar": owner_face,
|
||
"followers": 0,
|
||
"description": '',
|
||
"pubdate": 0,
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 备用解析方案失败: {e}")
|
||
|
||
return None
|
||
|
||
def extract_bvid(self, url: str) -> Optional[str]:
|
||
"""
|
||
从 URL 中提取 BV 号
|
||
|
||
Args:
|
||
url (str): B站视频URL
|
||
|
||
Returns:
|
||
Optional[str]: BV号,如果失败则返回None
|
||
"""
|
||
# 方式1: 直接从 URL 中提取
|
||
bvid_match = re.search(r'/video/(BV\w+)', url)
|
||
if bvid_match:
|
||
return bvid_match.group(1)
|
||
|
||
# 方式2: 从短链接跳转后提取
|
||
if 'b23.tv' in url:
|
||
try:
|
||
session = self.get_session()
|
||
# 简单处理,不实际跳转,直接尝试提取
|
||
bvid_match = re.search(r'BV\w{10}', url)
|
||
if bvid_match:
|
||
return bvid_match.group(0)
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
async def get_real_url(self, short_url: str) -> Optional[str]:
|
||
"""
|
||
获取B站短链接的真实URL
|
||
|
||
Args:
|
||
short_url (str): B站短链接
|
||
|
||
Returns:
|
||
Optional[str]: 真实URL,如果失败则返回None
|
||
"""
|
||
try:
|
||
session = self.get_session()
|
||
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response:
|
||
if response.status == 302:
|
||
return response.headers.get('Location')
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
|
||
return None
|
||
|
||
async def get_direct_video_url(self, video_url: str, bvid: str) -> Optional[str]:
|
||
"""
|
||
获取B站视频直链(通过本地文件服务器下载)
|
||
|
||
Args:
|
||
video_url (str): B站视频的完整URL
|
||
bvid (str): BV号
|
||
|
||
Returns:
|
||
Optional[str]: 本地视频 URL,如果失败则返回None
|
||
"""
|
||
if not BILI_API_AVAILABLE:
|
||
return None
|
||
|
||
try:
|
||
credential = self._get_credential()
|
||
v = video.Video(bvid=bvid, credential=credential)
|
||
# 先获取视频信息以获取 cid
|
||
info = await v.get_info()
|
||
cid = info.get('cid', 0)
|
||
|
||
if not cid:
|
||
return None
|
||
|
||
# 获取下载链接数据,使用 html5=True 获取网页格式(通常包含合并的音视频)
|
||
download_url_data = await v.get_download_url(cid=cid, html5=True)
|
||
|
||
# 使用 VideoDownloadURLDataDetecter 解析数据
|
||
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
|
||
|
||
# 尝试获取 MP4 格式的合并流(包含音视频)
|
||
streams = detecter.detect_best_streams()
|
||
|
||
# 如果没有获取到流,尝试其他格式
|
||
if not streams:
|
||
logger.warning(f"[{self.name}] 无法获取 html5 格式,尝试获取其他格式...")
|
||
download_url_data = await v.get_download_url(cid=cid, html5=False)
|
||
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
|
||
streams = detecter.detect_best_streams()
|
||
|
||
if streams:
|
||
# 获取视频直链
|
||
video_direct_url = streams[0].url
|
||
|
||
# 检查是否是分离的 m4s 流(可能没有声音)
|
||
is_m4s_stream = '.m4s' in video_direct_url
|
||
if is_m4s_stream:
|
||
logger.warning(f"[{self.name}] 检测到分离的 m4s 流,B站 API 返回的 m4s 流通常是分离的视频和音频,需要客户端合并才能有声音")
|
||
logger.info(f"[{self.name}] 建议: 使用支持合并 m4s 流的下载工具(如 ffmpeg)合并视频和音频")
|
||
|
||
logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...")
|
||
|
||
# B站下载需要 Referer 和 User-Agent
|
||
headers = {
|
||
"Referer": "https://www.bilibili.com",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||
}
|
||
|
||
# 调试:打印 download_url_data 结构
|
||
logger.debug(f"[{self.name}] download_url_data 类型: {type(download_url_data)}")
|
||
if isinstance(download_url_data, dict):
|
||
logger.debug(f"[{self.name}] download_url_data keys: {list(download_url_data.keys())}")
|
||
|
||
# 如果是 m4s 流且 ffmpeg 可用,先保存 download_url_data 供合并使用
|
||
if is_m4s_stream and FFMPEG_AVAILABLE and AIOHTTP_AVAILABLE:
|
||
local_url = await self._download_and_merge_m4s(video_direct_url, headers, bvid, download_url_data)
|
||
else:
|
||
# 使用本地文件服务器下载
|
||
local_url = await download_to_local(video_direct_url, timeout=120, headers=headers)
|
||
|
||
if local_url:
|
||
logger.success(f"[{self.name}] 视频已下载到本地: {local_url}")
|
||
return local_url
|
||
else:
|
||
logger.error(f"[{self.name}] 下载到本地失败")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 获取视频直链失败: {e}")
|
||
|
||
return None
|
||
|
||
async def _download_and_merge_m4s(self, video_url: str, headers: Dict[str, str], bvid: str, download_url_data: Dict) -> Optional[str]:
|
||
"""
|
||
下载并合并 m4s 视频和音频流
|
||
|
||
Args:
|
||
video_url (str): 视频流 URL
|
||
headers (Dict[str, str]): 请求头
|
||
bvid (str): BV号
|
||
download_url_data (Dict): 下载 URL 数据
|
||
|
||
Returns:
|
||
Optional[str]: 合并后的本地视频 URL,如果失败则返回None
|
||
"""
|
||
if not FFMPEG_AVAILABLE:
|
||
logger.warning("[B站解析器] ffmpeg 不可用,无法合并音视频")
|
||
return None
|
||
|
||
if not AIOHTTP_AVAILABLE:
|
||
logger.warning("[B站解析器] aiohttp 不可用,无法合并音视频")
|
||
return None
|
||
|
||
try:
|
||
logger.info(f"[{self.name}] 开始下载并合并 m4s 音视频...")
|
||
|
||
# 创建共享的 ClientSession 用于下载
|
||
async with aiohttp.ClientSession() as session:
|
||
# 下载视频流
|
||
video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
|
||
video_file.close()
|
||
|
||
async with session.get(video_url, headers=headers, timeout=60) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[{self.name}] 下载视频流失败: HTTP {response.status}")
|
||
return None
|
||
|
||
with open(video_file.name, 'wb') as f:
|
||
while True:
|
||
chunk = await response.content.read(8192)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
|
||
logger.info(f"[{self.name}] 视频流下载完成: {video_file.name}")
|
||
|
||
# 从 download_url_data 中提取音频 URL
|
||
# B站的 dash 格式包含视频和音频流
|
||
audio_url = None
|
||
if isinstance(download_url_data, dict):
|
||
# 尝试 dash 格式(推荐)
|
||
if 'dash' in download_url_data and isinstance(download_url_data['dash'], dict):
|
||
dash = download_url_data['dash']
|
||
if 'audio' in dash and isinstance(dash['audio'], list) and len(dash['audio']) > 0:
|
||
# 获取第一个音频流
|
||
audio_item = dash['audio'][0]
|
||
audio_url = audio_item.get('baseUrl') or audio_item.get('url') or audio_item.get('backupUrl')
|
||
logger.debug(f"[{self.name}] 从 dash.audio 提取音频 URL: {audio_url is not None}")
|
||
elif 'audio' in dash and isinstance(dash['audio'], dict):
|
||
audio_url = dash['audio'].get('baseUrl') or dash['audio'].get('url')
|
||
logger.debug(f"[{self.name}] 从 dash.audio (dict) 提取音频 URL: {audio_url is not None}")
|
||
|
||
# 尝试 durl 格式(非分段流)
|
||
elif 'durl' in download_url_data:
|
||
if isinstance(download_url_data['durl'], list) and len(download_url_data['durl']) > 0:
|
||
main_url = download_url_data['durl'][0].get('url') or download_url_data['durl'][0].get('baseUrl')
|
||
if main_url:
|
||
video_url = main_url
|
||
logger.debug(f"[{self.name}] 使用 durl 主 URL: {video_url}")
|
||
|
||
if not audio_url and not video_url.startswith('http'):
|
||
logger.warning(f"[{self.name}] 无法从 download_url_data 中提取音频 URL")
|
||
logger.debug(f"[{self.name}] download_url_data 结构: {download_url_data}")
|
||
os.unlink(video_file.name)
|
||
return None
|
||
|
||
# 下载音频流
|
||
audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
|
||
audio_file.close()
|
||
|
||
async with session.get(audio_url, headers=headers, timeout=60) as response:
|
||
if response.status != 200:
|
||
logger.error(f"[{self.name}] 下载音频流失败: HTTP {response.status}")
|
||
os.unlink(video_file.name)
|
||
return None
|
||
|
||
with open(audio_file.name, 'wb') as f:
|
||
while True:
|
||
chunk = await response.content.read(8192)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
|
||
logger.info(f"[{self.name}] 音频流下载完成: {audio_file.name}")
|
||
|
||
# 使用 ffmpeg 合并视频和音频
|
||
merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
|
||
merged_file.close()
|
||
|
||
# ffmpeg命令:使用ffmpeg -i多次输入,然后合并
|
||
# 先转换视频流(移除音频),然后添加音频流
|
||
ffmpeg_cmd = [
|
||
'ffmpeg', '-y', '-i', video_file.name, '-i', audio_file.name,
|
||
'-c:v', 'libx264', '-c:a', 'aac',
|
||
'-shortest', merged_file.name
|
||
]
|
||
|
||
logger.debug(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
|
||
|
||
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
|
||
|
||
# 详细记录ffmpeg输出
|
||
if result.stdout:
|
||
logger.debug(f"[{self.name}] ffmpeg stdout: {result.stdout}")
|
||
if result.stderr:
|
||
logger.debug(f"[{self.name}] ffmpeg stderr: {result.stderr}")
|
||
|
||
if result.returncode != 0:
|
||
logger.error(f"[{self.name}] ffmpeg 合并失败: {result.stderr}")
|
||
os.unlink(video_file.name)
|
||
os.unlink(audio_file.name)
|
||
return None
|
||
|
||
# 验证输出文件
|
||
merged_size = os.path.getsize(merged_file.name)
|
||
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
|
||
|
||
if merged_size == 0:
|
||
logger.error(f"[{self.name}] ffmpeg生成了空文件,命令可能有问题")
|
||
logger.error(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
|
||
if result.stderr:
|
||
logger.error(f"[{self.name}] ffmpeg错误输出: {result.stderr}")
|
||
os.unlink(video_file.name)
|
||
os.unlink(audio_file.name)
|
||
return None
|
||
|
||
logger.info(f"[{self.name}] 音视频合并成功: {merged_file.name} ({merged_size} bytes)")
|
||
|
||
# 上传合并后的文件到本地文件服务器
|
||
from core.services.local_file_server import get_local_file_server
|
||
server = get_local_file_server()
|
||
if server:
|
||
try:
|
||
file_id = server._generate_file_id(f'file://{merged_file.name}')
|
||
dest_path = server.download_dir / file_id
|
||
|
||
# 获取合并文件大小
|
||
merged_size = os.path.getsize(merged_file.name)
|
||
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
|
||
|
||
if merged_size == 0:
|
||
logger.error(f"[{self.name}] 合并文件为空,ffmpeg可能失败了")
|
||
merged_url = None
|
||
else:
|
||
# 复制本地文件到服务器目录
|
||
import shutil
|
||
shutil.copy2(merged_file.name, dest_path)
|
||
server.file_map[file_id] = dest_path
|
||
|
||
# 验证复制后的文件
|
||
if dest_path.exists():
|
||
dest_size = dest_path.stat().st_size
|
||
logger.debug(f"[{self.name}] 复制后文件大小: {dest_size} bytes")
|
||
if dest_size == merged_size:
|
||
merged_url = f"http://127.0.0.1:{server.port}/download?id={file_id}"
|
||
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
|
||
else:
|
||
logger.error(f"[{self.name}] 文件大小不匹配: 原始 {merged_size} vs 复制 {dest_size}")
|
||
merged_url = None
|
||
else:
|
||
logger.error(f"[{self.name}] 文件复制失败: {dest_path} 不存在")
|
||
merged_url = None
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 上传合并文件失败: {e}")
|
||
merged_url = None
|
||
else:
|
||
merged_url = None
|
||
|
||
# 清理临时文件
|
||
try:
|
||
os.unlink(video_file.name)
|
||
os.unlink(audio_file.name)
|
||
os.unlink(merged_file.name)
|
||
except Exception as e:
|
||
logger.warning(f"[{self.name}] 清理临时文件失败: {e}")
|
||
|
||
if merged_url:
|
||
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
|
||
return merged_url
|
||
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 合并音视频失败: {e}")
|
||
|
||
return None
|
||
|
||
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
|
||
"""
|
||
格式化B站视频响应消息
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
data (Dict[str, Any]): 视频信息
|
||
|
||
Returns:
|
||
List[Any]: 消息段列表
|
||
"""
|
||
# 检查视频时长
|
||
video_message: Union[str, MessageSegment]
|
||
direct_url = None
|
||
if data['duration'] > 7200: # 2小时 = 7200秒
|
||
video_message = "视频时长超过2小时,不进行解析。"
|
||
else:
|
||
# 构建完整的B站视频URL
|
||
video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}"
|
||
bvid = data.get('bvid', '')
|
||
direct_url = await self.get_direct_video_url(video_url, bvid)
|
||
if direct_url:
|
||
video_message = MessageSegment.video(direct_url)
|
||
else:
|
||
video_message = "视频解析失败,无法获取直链。"
|
||
|
||
text_message = (
|
||
f"BiliBili 视频解析\n"
|
||
f"--------------------\n"
|
||
f" UP主: {data['owner_name']}\n"
|
||
f" 粉丝: {self.format_count(data['followers'])}\n"
|
||
f"--------------------\n"
|
||
f" 标题: {data['title']}\n"
|
||
f" BV号: {data['bvid']}\n"
|
||
f" 时长: {format_duration(data['duration'])}\n"
|
||
f"--------------------\n"
|
||
f" 数据:\n"
|
||
f" 播放: {self.format_count(data['play'])}\n"
|
||
f" 点赞: {self.format_count(data['like'])}\n"
|
||
f" 投币: {self.format_count(data['coin'])}\n"
|
||
f" 收藏: {self.format_count(data['favorite'])}\n"
|
||
f" 转发: {self.format_count(data['share'])}\n"
|
||
f" 弹幕: {self.format_count(data.get('danmaku', 0))}\n"
|
||
)
|
||
|
||
image_message_segment = [
|
||
MessageSegment.text("B站封面:"),
|
||
MessageSegment.image(data['cover_url'])
|
||
]
|
||
|
||
up_info_segment = [
|
||
MessageSegment.text("UP主头像:"),
|
||
MessageSegment.image(data['owner_avatar'])
|
||
]
|
||
|
||
nodes = [
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=text_message),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=image_message_segment),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=up_info_segment),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=video_message)
|
||
]
|
||
|
||
# 同时直接发送视频(如果获取到直链)
|
||
if direct_url:
|
||
try:
|
||
await event.reply(MessageSegment.video(direct_url))
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 直接发送视频失败: {e}")
|
||
|
||
return nodes
|
||
|
||
def should_handle_url(self, url: str) -> bool:
|
||
"""
|
||
判断是否应该处理该URL
|
||
|
||
Args:
|
||
url (str): URL
|
||
|
||
Returns:
|
||
bool: 是否应该处理
|
||
"""
|
||
return bool(self.url_pattern.search(url))
|