feat(bili): 支持合并B站分离的音视频流并添加请求头支持

添加对B站分离音视频流的合并功能,使用ffmpeg合并m4s格式的视频和音频流
扩展download_file接口支持自定义请求头,用于B站视频下载的Referer校验
This commit is contained in:
2026-03-15 01:34:00 +08:00
parent 958c1df1fc
commit 2a6e9b8f89
2 changed files with 254 additions and 8 deletions

View File

@@ -72,13 +72,14 @@ class LocalFileServer:
url_hash = hashlib.md5(url.encode()).hexdigest()[:16] url_hash = hashlib.md5(url.encode()).hexdigest()[:16]
return f"file_{url_hash}" return f"file_{url_hash}"
async def download_file(self, url: str, timeout: int = 60) -> Optional[str]: async def download_file(self, url: str, timeout: int = 60, headers: Optional[Dict[str, str]] = None) -> Optional[str]:
""" """
下载远程文件到本地 下载远程文件到本地
Args: Args:
url (str): 远程文件 URL url (str): 远程文件 URL
timeout (int): 下载超时时间(秒) timeout (int): 下载超时时间(秒)
headers (Optional[Dict[str, str]]): 请求头
Returns: Returns:
Optional[str]: 本地文件 ID如果失败则返回 None Optional[str]: 本地文件 ID如果失败则返回 None
@@ -96,7 +97,7 @@ class LocalFileServer:
# 使用 aiohttp 下载文件 # 使用 aiohttp 下载文件
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response: async with session.get(url, timeout=timeout, headers=headers) as response:
if response.status != 200: if response.status != 200:
logger.error(f"[LocalFileServer] 下载失败: HTTP {response.status}") logger.error(f"[LocalFileServer] 下载失败: HTTP {response.status}")
return None return None
@@ -195,13 +196,14 @@ async def stop_local_file_server():
_local_file_server = None _local_file_server = None
async def download_to_local(url: str, timeout: int = 60) -> Optional[str]: async def download_to_local(url: str, timeout: int = 60, headers: Optional[Dict[str, str]] = None) -> Optional[str]:
""" """
下载远程文件到本地并返回本地访问 URL 下载远程文件到本地并返回本地访问 URL
Args: Args:
url (str): 远程文件 URL url (str): 远程文件 URL
timeout (int): 下载超时时间(秒) timeout (int): 下载超时时间(秒)
headers (Optional[Dict[str, str]]): 请求头
Returns: Returns:
Optional[str]: 本地访问 URL如果失败则返回 None Optional[str]: 本地访问 URL如果失败则返回 None
@@ -210,7 +212,7 @@ async def download_to_local(url: str, timeout: int = 60) -> Optional[str]:
if not server: if not server:
return None return None
file_id = await server.download_file(url, timeout) file_id = await server.download_file(url, timeout, headers)
if not file_id: if not file_id:
return None return None

View File

@@ -1,5 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re import re
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List, Union from typing import Optional, Dict, Any, List, Union
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
@@ -13,9 +17,25 @@ from bilibili_api.exceptions import ResponseCodeException
from core.config_loader import global_config from core.config_loader import global_config
from core.services.local_file_server import download_to_local from core.services.local_file_server import download_to_local
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
logger.warning("[B站解析器] aiohttp 未安装,音视频合并功能将不可用")
# bilibili_api-python 可用性标志 # bilibili_api-python 可用性标志
BILI_API_AVAILABLE = True BILI_API_AVAILABLE = True
# ffmpeg 可用性标志
FFMPEG_AVAILABLE = False
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
FFMPEG_AVAILABLE = True
logger.success("[B站解析器] ffmpeg 已安装,支持合并音视频")
except (subprocess.CalledProcessError, FileNotFoundError):
logger.warning("[B站解析器] ffmpeg 未安装,视频可能没有声音。建议安装 ffmpeg 以获得完整音视频体验")
# 显式指定使用 aiohttp避免与其他库冲突 # 显式指定使用 aiohttp避免与其他库冲突
try: try:
select_client("aiohttp") select_client("aiohttp")
@@ -273,20 +293,51 @@ class BiliParser(BaseParser):
if not cid: if not cid:
return None return None
# 获取下载链接数据 # 获取下载链接数据,使用 html5=True 获取网页格式(通常包含合并的音视频)
download_url_data = await v.get_download_url(cid=cid) download_url_data = await v.get_download_url(cid=cid, html5=True)
# 使用 VideoDownloadURLDataDetecter 解析数据 # 使用 VideoDownloadURLDataDetecter 解析数据
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
# 尝试获取 MP4 格式的合并流(包含音视频)
streams = detecter.detect_best_streams()
# 如果没有获取到流,尝试其他格式
if not streams:
logger.warning(f"[{self.name}] 无法获取 html5 格式,尝试获取其他格式...")
download_url_data = await v.get_download_url(cid=cid, html5=False)
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detecter.detect_best_streams() streams = detecter.detect_best_streams()
if streams: if streams:
# 获取视频直链 # 获取视频直链
video_direct_url = streams[0].url video_direct_url = streams[0].url
# 检查是否是分离的 m4s 流(可能没有声音)
is_m4s_stream = '.m4s' in video_direct_url
if is_m4s_stream:
logger.warning(f"[{self.name}] 检测到分离的 m4s 流B站 API 返回的 m4s 流通常是分离的视频和音频,需要客户端合并才能有声音")
logger.info(f"[{self.name}] 建议: 使用支持合并 m4s 流的下载工具(如 ffmpeg合并视频和音频")
logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...") logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...")
# B站下载需要 Referer 和 User-Agent
headers = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
# 调试:打印 download_url_data 结构
logger.debug(f"[{self.name}] download_url_data 类型: {type(download_url_data)}")
if isinstance(download_url_data, dict):
logger.debug(f"[{self.name}] download_url_data keys: {list(download_url_data.keys())}")
# 如果是 m4s 流且 ffmpeg 可用,先保存 download_url_data 供合并使用
if is_m4s_stream and FFMPEG_AVAILABLE and AIOHTTP_AVAILABLE:
local_url = await self._download_and_merge_m4s(video_direct_url, headers, bvid, download_url_data)
else:
# 使用本地文件服务器下载 # 使用本地文件服务器下载
local_url = await download_to_local(video_direct_url, timeout=120) local_url = await download_to_local(video_direct_url, timeout=120, headers=headers)
if local_url: if local_url:
logger.success(f"[{self.name}] 视频已下载到本地: {local_url}") logger.success(f"[{self.name}] 视频已下载到本地: {local_url}")
@@ -300,6 +351,199 @@ class BiliParser(BaseParser):
return None return None
async def _download_and_merge_m4s(self, video_url: str, headers: Dict[str, str], bvid: str, download_url_data: Dict) -> Optional[str]:
"""
下载并合并 m4s 视频和音频流
Args:
video_url (str): 视频流 URL
headers (Dict[str, str]): 请求头
bvid (str): BV号
download_url_data (Dict): 下载 URL 数据
Returns:
Optional[str]: 合并后的本地视频 URL如果失败则返回None
"""
if not FFMPEG_AVAILABLE:
logger.warning("[B站解析器] ffmpeg 不可用,无法合并音视频")
return None
if not AIOHTTP_AVAILABLE:
logger.warning("[B站解析器] aiohttp 不可用,无法合并音视频")
return None
try:
logger.info(f"[{self.name}] 开始下载并合并 m4s 音视频...")
# 创建共享的 ClientSession 用于下载
async with aiohttp.ClientSession() as session:
# 下载视频流
video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
video_file.close()
async with session.get(video_url, headers=headers, timeout=60) as response:
if response.status != 200:
logger.error(f"[{self.name}] 下载视频流失败: HTTP {response.status}")
return None
with open(video_file.name, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
logger.info(f"[{self.name}] 视频流下载完成: {video_file.name}")
# 从 download_url_data 中提取音频 URL
# B站的 dash 格式包含视频和音频流
audio_url = None
if isinstance(download_url_data, dict):
# 尝试 dash 格式(推荐)
if 'dash' in download_url_data and isinstance(download_url_data['dash'], dict):
dash = download_url_data['dash']
if 'audio' in dash and isinstance(dash['audio'], list) and len(dash['audio']) > 0:
# 获取第一个音频流
audio_item = dash['audio'][0]
audio_url = audio_item.get('baseUrl') or audio_item.get('url') or audio_item.get('backupUrl')
logger.debug(f"[{self.name}] 从 dash.audio 提取音频 URL: {audio_url is not None}")
elif 'audio' in dash and isinstance(dash['audio'], dict):
audio_url = dash['audio'].get('baseUrl') or dash['audio'].get('url')
logger.debug(f"[{self.name}] 从 dash.audio (dict) 提取音频 URL: {audio_url is not None}")
# 尝试 durl 格式(非分段流)
elif 'durl' in download_url_data:
if isinstance(download_url_data['durl'], list) and len(download_url_data['durl']) > 0:
main_url = download_url_data['durl'][0].get('url') or download_url_data['durl'][0].get('baseUrl')
if main_url:
video_url = main_url
logger.debug(f"[{self.name}] 使用 durl 主 URL: {video_url}")
if not audio_url and not video_url.startswith('http'):
logger.warning(f"[{self.name}] 无法从 download_url_data 中提取音频 URL")
logger.debug(f"[{self.name}] download_url_data 结构: {download_url_data}")
os.unlink(video_file.name)
return None
# 下载音频流
audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
audio_file.close()
async with session.get(audio_url, headers=headers, timeout=60) as response:
if response.status != 200:
logger.error(f"[{self.name}] 下载音频流失败: HTTP {response.status}")
os.unlink(video_file.name)
return None
with open(audio_file.name, 'wb') as f:
while True:
chunk = await response.content.read(8192)
if not chunk:
break
f.write(chunk)
logger.info(f"[{self.name}] 音频流下载完成: {audio_file.name}")
# 使用 ffmpeg 合并视频和音频
merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
merged_file.close()
# ffmpeg命令使用ffmpeg -i多次输入然后合并
# 先转换视频流(移除音频),然后添加音频流
ffmpeg_cmd = [
'ffmpeg', '-y', '-i', video_file.name, '-i', audio_file.name,
'-c:v', 'libx264', '-c:a', 'aac',
'-shortest', merged_file.name
]
logger.debug(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
# 详细记录ffmpeg输出
if result.stdout:
logger.debug(f"[{self.name}] ffmpeg stdout: {result.stdout}")
if result.stderr:
logger.debug(f"[{self.name}] ffmpeg stderr: {result.stderr}")
if result.returncode != 0:
logger.error(f"[{self.name}] ffmpeg 合并失败: {result.stderr}")
os.unlink(video_file.name)
os.unlink(audio_file.name)
return None
# 验证输出文件
merged_size = os.path.getsize(merged_file.name)
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
if merged_size == 0:
logger.error(f"[{self.name}] ffmpeg生成了空文件命令可能有问题")
logger.error(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
if result.stderr:
logger.error(f"[{self.name}] ffmpeg错误输出: {result.stderr}")
os.unlink(video_file.name)
os.unlink(audio_file.name)
return None
logger.info(f"[{self.name}] 音视频合并成功: {merged_file.name} ({merged_size} bytes)")
# 上传合并后的文件到本地文件服务器
from core.services.local_file_server import get_local_file_server
server = get_local_file_server()
if server:
try:
file_id = server._generate_file_id(f'file://{merged_file.name}')
dest_path = server.download_dir / file_id
# 获取合并文件大小
merged_size = os.path.getsize(merged_file.name)
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
if merged_size == 0:
logger.error(f"[{self.name}] 合并文件为空ffmpeg可能失败了")
merged_url = None
else:
# 复制本地文件到服务器目录
import shutil
shutil.copy2(merged_file.name, dest_path)
server.file_map[file_id] = dest_path
# 验证复制后的文件
if dest_path.exists():
dest_size = dest_path.stat().st_size
logger.debug(f"[{self.name}] 复制后文件大小: {dest_size} bytes")
if dest_size == merged_size:
merged_url = f"http://127.0.0.1:{server.port}/download?id={file_id}"
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
else:
logger.error(f"[{self.name}] 文件大小不匹配: 原始 {merged_size} vs 复制 {dest_size}")
merged_url = None
else:
logger.error(f"[{self.name}] 文件复制失败: {dest_path} 不存在")
merged_url = None
except Exception as e:
logger.error(f"[{self.name}] 上传合并文件失败: {e}")
merged_url = None
else:
merged_url = None
# 清理临时文件
try:
os.unlink(video_file.name)
os.unlink(audio_file.name)
os.unlink(merged_file.name)
except Exception as e:
logger.warning(f"[{self.name}] 清理临时文件失败: {e}")
if merged_url:
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
return merged_url
except Exception as e:
logger.error(f"[{self.name}] 合并音视频失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
""" """
格式化B站视频响应消息 格式化B站视频响应消息