From 2a6e9b8f89f28d7673f39839e2e734a33e4e76c8 Mon Sep 17 00:00:00 2001 From: K2Cr2O1 <2221577113@qq.com> Date: Sun, 15 Mar 2026 01:34:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(bili):=20=E6=94=AF=E6=8C=81=E5=90=88?= =?UTF-8?q?=E5=B9=B6B=E7=AB=99=E5=88=86=E7=A6=BB=E7=9A=84=E9=9F=B3?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E6=B5=81=E5=B9=B6=E6=B7=BB=E5=8A=A0=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E5=A4=B4=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加对B站分离音视频流的合并功能,使用ffmpeg合并m4s格式的视频和音频流 扩展download_file接口支持自定义请求头,用于B站视频下载的Referer校验 --- core/services/local_file_server.py | 10 +- plugins/web_parser/parsers/bili.py | 252 ++++++++++++++++++++++++++++- 2 files changed, 254 insertions(+), 8 deletions(-) diff --git a/core/services/local_file_server.py b/core/services/local_file_server.py index aeb9418..3df4f36 100644 --- a/core/services/local_file_server.py +++ b/core/services/local_file_server.py @@ -72,13 +72,14 @@ class LocalFileServer: url_hash = hashlib.md5(url.encode()).hexdigest()[:16] return f"file_{url_hash}" - async def download_file(self, url: str, timeout: int = 60) -> Optional[str]: + async def download_file(self, url: str, timeout: int = 60, headers: Optional[Dict[str, str]] = None) -> Optional[str]: """ 下载远程文件到本地 Args: url (str): 远程文件 URL timeout (int): 下载超时时间(秒) + headers (Optional[Dict[str, str]]): 请求头 Returns: Optional[str]: 本地文件 ID,如果失败则返回 None @@ -96,7 +97,7 @@ class LocalFileServer: # 使用 aiohttp 下载文件 async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=timeout) as response: + async with session.get(url, timeout=timeout, headers=headers) as response: if response.status != 200: logger.error(f"[LocalFileServer] 下载失败: HTTP {response.status}") return None @@ -195,13 +196,14 @@ async def stop_local_file_server(): _local_file_server = None -async def download_to_local(url: str, timeout: int = 60) -> Optional[str]: +async def download_to_local(url: str, timeout: int = 60, headers: Optional[Dict[str, str]] = None) -> Optional[str]: """ 下载远程文件到本地并返回本地访问 URL Args: url (str): 远程文件 URL timeout (int): 下载超时时间(秒) + headers (Optional[Dict[str, str]]): 请求头 Returns: Optional[str]: 本地访问 URL,如果失败则返回 None @@ -210,7 +212,7 @@ async def download_to_local(url: str, timeout: int = 60) -> Optional[str]: if not server: return None - file_id = await server.download_file(url, timeout) + file_id = await server.download_file(url, timeout, headers) if not file_id: return None diff --git a/plugins/web_parser/parsers/bili.py b/plugins/web_parser/parsers/bili.py index 6aa2dd1..16be50f 100644 --- a/plugins/web_parser/parsers/bili.py +++ b/plugins/web_parser/parsers/bili.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- import re +import os +import subprocess +import tempfile +from pathlib import Path from typing import Optional, Dict, Any, List, Union from urllib.parse import urlparse, parse_qs @@ -13,9 +17,25 @@ from bilibili_api.exceptions import ResponseCodeException from core.config_loader import global_config from core.services.local_file_server import download_to_local +try: + import aiohttp + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + logger.warning("[B站解析器] aiohttp 未安装,音视频合并功能将不可用") + # bilibili_api-python 可用性标志 BILI_API_AVAILABLE = True +# ffmpeg 可用性标志 +FFMPEG_AVAILABLE = False +try: + subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) + FFMPEG_AVAILABLE = True + logger.success("[B站解析器] ffmpeg 已安装,支持合并音视频") +except (subprocess.CalledProcessError, FileNotFoundError): + logger.warning("[B站解析器] ffmpeg 未安装,视频可能没有声音。建议安装 ffmpeg 以获得完整音视频体验") + # 显式指定使用 aiohttp,避免与其他库冲突 try: select_client("aiohttp") @@ -273,20 +293,51 @@ class BiliParser(BaseParser): if not cid: return None - # 获取下载链接数据 - download_url_data = await v.get_download_url(cid=cid) + # 获取下载链接数据,使用 html5=True 获取网页格式(通常包含合并的音视频) + download_url_data = await v.get_download_url(cid=cid, html5=True) # 使用 VideoDownloadURLDataDetecter 解析数据 detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) + + # 尝试获取 MP4 格式的合并流(包含音视频) streams = detecter.detect_best_streams() + # 如果没有获取到流,尝试其他格式 + if not streams: + logger.warning(f"[{self.name}] 无法获取 html5 格式,尝试获取其他格式...") + download_url_data = await v.get_download_url(cid=cid, html5=False) + detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) + streams = detecter.detect_best_streams() + if streams: # 获取视频直链 video_direct_url = streams[0].url + + # 检查是否是分离的 m4s 流(可能没有声音) + is_m4s_stream = '.m4s' in video_direct_url + if is_m4s_stream: + logger.warning(f"[{self.name}] 检测到分离的 m4s 流,B站 API 返回的 m4s 流通常是分离的视频和音频,需要客户端合并才能有声音") + logger.info(f"[{self.name}] 建议: 使用支持合并 m4s 流的下载工具(如 ffmpeg)合并视频和音频") + logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...") - # 使用本地文件服务器下载 - local_url = await download_to_local(video_direct_url, timeout=120) + # B站下载需要 Referer 和 User-Agent + headers = { + "Referer": "https://www.bilibili.com", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + } + + # 调试:打印 download_url_data 结构 + logger.debug(f"[{self.name}] download_url_data 类型: {type(download_url_data)}") + if isinstance(download_url_data, dict): + logger.debug(f"[{self.name}] download_url_data keys: {list(download_url_data.keys())}") + + # 如果是 m4s 流且 ffmpeg 可用,先保存 download_url_data 供合并使用 + if is_m4s_stream and FFMPEG_AVAILABLE and AIOHTTP_AVAILABLE: + local_url = await self._download_and_merge_m4s(video_direct_url, headers, bvid, download_url_data) + else: + # 使用本地文件服务器下载 + local_url = await download_to_local(video_direct_url, timeout=120, headers=headers) if local_url: logger.success(f"[{self.name}] 视频已下载到本地: {local_url}") @@ -300,6 +351,199 @@ class BiliParser(BaseParser): return None + async def _download_and_merge_m4s(self, video_url: str, headers: Dict[str, str], bvid: str, download_url_data: Dict) -> Optional[str]: + """ + 下载并合并 m4s 视频和音频流 + + Args: + video_url (str): 视频流 URL + headers (Dict[str, str]): 请求头 + bvid (str): BV号 + download_url_data (Dict): 下载 URL 数据 + + Returns: + Optional[str]: 合并后的本地视频 URL,如果失败则返回None + """ + if not FFMPEG_AVAILABLE: + logger.warning("[B站解析器] ffmpeg 不可用,无法合并音视频") + return None + + if not AIOHTTP_AVAILABLE: + logger.warning("[B站解析器] aiohttp 不可用,无法合并音视频") + return None + + try: + logger.info(f"[{self.name}] 开始下载并合并 m4s 音视频...") + + # 创建共享的 ClientSession 用于下载 + async with aiohttp.ClientSession() as session: + # 下载视频流 + video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False) + video_file.close() + + async with session.get(video_url, headers=headers, timeout=60) as response: + if response.status != 200: + logger.error(f"[{self.name}] 下载视频流失败: HTTP {response.status}") + return None + + with open(video_file.name, 'wb') as f: + while True: + chunk = await response.content.read(8192) + if not chunk: + break + f.write(chunk) + + logger.info(f"[{self.name}] 视频流下载完成: {video_file.name}") + + # 从 download_url_data 中提取音频 URL + # B站的 dash 格式包含视频和音频流 + audio_url = None + if isinstance(download_url_data, dict): + # 尝试 dash 格式(推荐) + if 'dash' in download_url_data and isinstance(download_url_data['dash'], dict): + dash = download_url_data['dash'] + if 'audio' in dash and isinstance(dash['audio'], list) and len(dash['audio']) > 0: + # 获取第一个音频流 + audio_item = dash['audio'][0] + audio_url = audio_item.get('baseUrl') or audio_item.get('url') or audio_item.get('backupUrl') + logger.debug(f"[{self.name}] 从 dash.audio 提取音频 URL: {audio_url is not None}") + elif 'audio' in dash and isinstance(dash['audio'], dict): + audio_url = dash['audio'].get('baseUrl') or dash['audio'].get('url') + logger.debug(f"[{self.name}] 从 dash.audio (dict) 提取音频 URL: {audio_url is not None}") + + # 尝试 durl 格式(非分段流) + elif 'durl' in download_url_data: + if isinstance(download_url_data['durl'], list) and len(download_url_data['durl']) > 0: + main_url = download_url_data['durl'][0].get('url') or download_url_data['durl'][0].get('baseUrl') + if main_url: + video_url = main_url + logger.debug(f"[{self.name}] 使用 durl 主 URL: {video_url}") + + if not audio_url and not video_url.startswith('http'): + logger.warning(f"[{self.name}] 无法从 download_url_data 中提取音频 URL") + logger.debug(f"[{self.name}] download_url_data 结构: {download_url_data}") + os.unlink(video_file.name) + return None + + # 下载音频流 + audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False) + audio_file.close() + + async with session.get(audio_url, headers=headers, timeout=60) as response: + if response.status != 200: + logger.error(f"[{self.name}] 下载音频流失败: HTTP {response.status}") + os.unlink(video_file.name) + return None + + with open(audio_file.name, 'wb') as f: + while True: + chunk = await response.content.read(8192) + if not chunk: + break + f.write(chunk) + + logger.info(f"[{self.name}] 音频流下载完成: {audio_file.name}") + + # 使用 ffmpeg 合并视频和音频 + merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) + merged_file.close() + + # ffmpeg命令:使用ffmpeg -i多次输入,然后合并 + # 先转换视频流(移除音频),然后添加音频流 + ffmpeg_cmd = [ + 'ffmpeg', '-y', '-i', video_file.name, '-i', audio_file.name, + '-c:v', 'libx264', '-c:a', 'aac', + '-shortest', merged_file.name + ] + + logger.debug(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}") + + result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) + + # 详细记录ffmpeg输出 + if result.stdout: + logger.debug(f"[{self.name}] ffmpeg stdout: {result.stdout}") + if result.stderr: + logger.debug(f"[{self.name}] ffmpeg stderr: {result.stderr}") + + if result.returncode != 0: + logger.error(f"[{self.name}] ffmpeg 合并失败: {result.stderr}") + os.unlink(video_file.name) + os.unlink(audio_file.name) + return None + + # 验证输出文件 + merged_size = os.path.getsize(merged_file.name) + logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes") + + if merged_size == 0: + logger.error(f"[{self.name}] ffmpeg生成了空文件,命令可能有问题") + logger.error(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}") + if result.stderr: + logger.error(f"[{self.name}] ffmpeg错误输出: {result.stderr}") + os.unlink(video_file.name) + os.unlink(audio_file.name) + return None + + logger.info(f"[{self.name}] 音视频合并成功: {merged_file.name} ({merged_size} bytes)") + + # 上传合并后的文件到本地文件服务器 + from core.services.local_file_server import get_local_file_server + server = get_local_file_server() + if server: + try: + file_id = server._generate_file_id(f'file://{merged_file.name}') + dest_path = server.download_dir / file_id + + # 获取合并文件大小 + merged_size = os.path.getsize(merged_file.name) + logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes") + + if merged_size == 0: + logger.error(f"[{self.name}] 合并文件为空,ffmpeg可能失败了") + merged_url = None + else: + # 复制本地文件到服务器目录 + import shutil + shutil.copy2(merged_file.name, dest_path) + server.file_map[file_id] = dest_path + + # 验证复制后的文件 + if dest_path.exists(): + dest_size = dest_path.stat().st_size + logger.debug(f"[{self.name}] 复制后文件大小: {dest_size} bytes") + if dest_size == merged_size: + merged_url = f"http://127.0.0.1:{server.port}/download?id={file_id}" + logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}") + else: + logger.error(f"[{self.name}] 文件大小不匹配: 原始 {merged_size} vs 复制 {dest_size}") + merged_url = None + else: + logger.error(f"[{self.name}] 文件复制失败: {dest_path} 不存在") + merged_url = None + except Exception as e: + logger.error(f"[{self.name}] 上传合并文件失败: {e}") + merged_url = None + else: + merged_url = None + + # 清理临时文件 + try: + os.unlink(video_file.name) + os.unlink(audio_file.name) + os.unlink(merged_file.name) + except Exception as e: + logger.warning(f"[{self.name}] 清理临时文件失败: {e}") + + if merged_url: + logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}") + return merged_url + + except Exception as e: + logger.error(f"[{self.name}] 合并音视频失败: {e}") + + return None + async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: """ 格式化B站视频响应消息