# -*- coding: utf-8 -*- import re import os import subprocess import tempfile from pathlib import Path from typing import Optional, Dict, Any, List, Union from urllib.parse import urlparse, parse_qs from core.utils.logger import logger from models import MessageEvent, MessageSegment from ..base import BaseParser from ..utils import format_duration from bilibili_api import video, select_client, Credential from bilibili_api.exceptions import ResponseCodeException from core.config_loader import global_config from core.services.local_file_server import download_to_local try: import aiohttp AIOHTTP_AVAILABLE = True except ImportError: AIOHTTP_AVAILABLE = False logger.warning("[B站解析器] aiohttp 未安装,音视频合并功能将不可用") # bilibili_api-python 可用性标志 BILI_API_AVAILABLE = True # ffmpeg 可用性标志 FFMPEG_AVAILABLE = False try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) FFMPEG_AVAILABLE = True logger.success("[B站解析器] ffmpeg 已安装,支持合并音视频") except (subprocess.CalledProcessError, FileNotFoundError): logger.warning("[B站解析器] ffmpeg 未安装,视频可能没有声音。建议安装 ffmpeg 以获得完整音视频体验") # 显式指定使用 aiohttp,避免与其他库冲突 try: select_client("aiohttp") except Exception as e: logger.warning(f"设置 bilibili_api 客户端失败: {e}") class BiliParser(BaseParser): """ B站视频解析器(使用 bilibili-api-python 库) """ def __init__(self): super().__init__() self.name = "B站解析器" self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)") self.nickname = "B站视频解析" def _get_credential(self) -> Optional[Credential]: """获取 B 站登录凭证""" try: bili_config = global_config.bilibili if bili_config.sessdata and bili_config.bili_jct and bili_config.buvid3: return Credential( sessdata=bili_config.sessdata, bili_jct=bili_config.bili_jct, buvid3=bili_config.buvid3, dedeuserid=bili_config.dedeuserid ) except Exception: pass return None async def parse(self, url: str) -> Optional[Dict[str, Any]]: """ 解析B站视频信息 Args: url (str): B站视频URL Returns: Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None """ # 提取 BV 号 bvid = self.extract_bvid(url) if not bvid: logger.error(f"[{self.name}] 无法从 URL 提取 BV 号: {url}") return None try: if BILI_API_AVAILABLE: # 使用 bilibili-api-python 库 credential = self._get_credential() v = video.Video(bvid=bvid, credential=credential) info = await v.get_info() # 处理封面 URL cover_url = info.get('pic', '') if cover_url: cover_url = cover_url.split('@')[0] if cover_url.startswith('//'): cover_url = 'https:' + cover_url # 处理 UP 主头像 owner = info.get('owner', {}) owner_name = owner.get('name', '未知UP主') owner_face = owner.get('face', '') if owner_face: if owner_face.startswith('//'): owner_face = 'https:' + owner_face owner_face = owner_face.split('@')[0] # 处理统计信息 stat = info.get('stat', {}) return { "title": info.get('title', '未知标题'), "bvid": bvid, "aid": info.get('aid', 0), "duration": info.get('duration', 0), "cover_url": cover_url, "play": stat.get('view', 0), "like": stat.get('like', 0), "coin": stat.get('coin', 0), "favorite": stat.get('favorite', 0), "share": stat.get('share', 0), "danmaku": stat.get('danmaku', 0), "owner_name": owner_name, "owner_avatar": owner_face, "followers": info.get('owner', {}).get('fans', 0), "description": info.get('desc', ''), "pubdate": info.get('pubdate', 0), } else: # 备用方案:直接解析页面 return await self._parse_fallback(url, bvid) except ResponseCodeException as e: logger.error(f"[{self.name}] API 返回错误: {e.code} - {e.msg}") except Exception as e: logger.error(f"[{self.name}] 解析视频信息失败: {e}") if BILI_API_AVAILABLE: logger.info(f"[{self.name}] 尝试备用解析方案") return await self._parse_fallback(url, bvid) return None async def _parse_fallback(self, url: str, bvid: str) -> Optional[Dict[str, Any]]: """ 备用解析方案(不使用 bilibili-api-python) Args: url (str): B站视频URL bvid (str): BV号 Returns: Optional[Dict[str, Any]]: 视频信息字典 """ try: session = self.get_session() clean_url = url.split('?')[0] if '#/' in clean_url: clean_url = clean_url.split('#/')[0] async with session.get(clean_url, headers=self.HEADERS, timeout=5) as response: response.raise_for_status() text = await response.text() # 提取标题 import re title_match = re.search(r']*>([^<]+)', text) title = title_match.group(1).strip() if title_match else '未知标题' # 提取播放量等信息 play_match = re.search(r'"view":(\d+)', text) play = int(play_match.group(1)) if play_match else 0 like_match = re.search(r'"like":(\d+)', text) like = int(like_match.group(1)) if like_match else 0 coin_match = re.search(r'"coin":(\d+)', text) coin = int(coin_match.group(1)) if coin_match else 0 favorite_match = re.search(r'"favorite":(\d+)', text) favorite = int(favorite_match.group(1)) if favorite_match else 0 share_match = re.search(r'"share":(\d+)', text) share = int(share_match.group(1)) if share_match else 0 # 提取 UP 主信息 owner_match = re.search(r'"name":"([^"]+)"', text) owner_name = owner_match.group(1) if owner_match else '未知UP主' face_match = re.search(r'"face":"([^"]+)"', text) owner_face = face_match.group(1) if face_match else '' if owner_face: if owner_face.startswith('//'): owner_face = 'https:' + owner_face owner_face = owner_face.split('@')[0] return { "title": title, "bvid": bvid, "aid": 0, "duration": 0, "cover_url": '', "play": play, "like": like, "coin": coin, "favorite": favorite, "share": share, "danmaku": 0, "owner_name": owner_name, "owner_avatar": owner_face, "followers": 0, "description": '', "pubdate": 0, } except Exception as e: logger.error(f"[{self.name}] 备用解析方案失败: {e}") return None def extract_bvid(self, url: str) -> Optional[str]: """ 从 URL 中提取 BV 号 Args: url (str): B站视频URL Returns: Optional[str]: BV号,如果失败则返回None """ # 方式1: 直接从 URL 中提取 bvid_match = re.search(r'/video/(BV\w+)', url) if bvid_match: return bvid_match.group(1) # 方式2: 从短链接跳转后提取 if 'b23.tv' in url: try: session = self.get_session() # 简单处理,不实际跳转,直接尝试提取 bvid_match = re.search(r'BV\w{10}', url) if bvid_match: return bvid_match.group(0) except Exception: pass return None async def get_real_url(self, short_url: str) -> Optional[str]: """ 获取B站短链接的真实URL Args: short_url (str): B站短链接 Returns: Optional[str]: 真实URL,如果失败则返回None """ try: session = self.get_session() async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response: if response.status == 302: return response.headers.get('Location') except Exception as e: logger.error(f"[{self.name}] 获取真实URL失败: {e}") return None async def get_direct_video_url(self, video_url: str, bvid: str) -> Optional[str]: """ 获取B站视频直链(通过本地文件服务器下载) Args: video_url (str): B站视频的完整URL bvid (str): BV号 Returns: Optional[str]: 本地视频 URL,如果失败则返回None """ if not BILI_API_AVAILABLE: return None try: credential = self._get_credential() v = video.Video(bvid=bvid, credential=credential) # 先获取视频信息以获取 cid info = await v.get_info() cid = info.get('cid', 0) if not cid: return None # 获取下载链接数据,使用 html5=True 获取网页格式(通常包含合并的音视频) download_url_data = await v.get_download_url(cid=cid, html5=True) # 使用 VideoDownloadURLDataDetecter 解析数据 detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) # 尝试获取 MP4 格式的合并流(包含音视频) streams = detecter.detect_best_streams() # 如果没有获取到流,尝试其他格式 if not streams: logger.warning(f"[{self.name}] 无法获取 html5 格式,尝试获取其他格式...") download_url_data = await v.get_download_url(cid=cid, html5=False) detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) streams = detecter.detect_best_streams() if streams: # 获取视频直链 video_direct_url = streams[0].url # 检查是否是分离的 m4s 流(可能没有声音) is_m4s_stream = '.m4s' in video_direct_url if is_m4s_stream: logger.warning(f"[{self.name}] 检测到分离的 m4s 流,B站 API 返回的 m4s 流通常是分离的视频和音频,需要客户端合并才能有声音") logger.info(f"[{self.name}] 建议: 使用支持合并 m4s 流的下载工具(如 ffmpeg)合并视频和音频") logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...") # B站下载需要 Referer 和 User-Agent headers = { "Referer": "https://www.bilibili.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } # 调试:打印 download_url_data 结构 logger.debug(f"[{self.name}] download_url_data 类型: {type(download_url_data)}") if isinstance(download_url_data, dict): logger.debug(f"[{self.name}] download_url_data keys: {list(download_url_data.keys())}") # 如果是 m4s 流且 ffmpeg 可用,先保存 download_url_data 供合并使用 if is_m4s_stream and FFMPEG_AVAILABLE and AIOHTTP_AVAILABLE: local_url = await self._download_and_merge_m4s(video_direct_url, headers, bvid, download_url_data) else: # 使用本地文件服务器下载 local_url = await download_to_local(video_direct_url, timeout=120, headers=headers) if local_url: logger.success(f"[{self.name}] 视频已下载到本地: {local_url}") return local_url else: logger.error(f"[{self.name}] 下载到本地失败") return None except Exception as e: logger.error(f"[{self.name}] 获取视频直链失败: {e}") return None async def _download_and_merge_m4s(self, video_url: str, headers: Dict[str, str], bvid: str, download_url_data: Dict) -> Optional[str]: """ 下载并合并 m4s 视频和音频流 Args: video_url (str): 视频流 URL headers (Dict[str, str]): 请求头 bvid (str): BV号 download_url_data (Dict): 下载 URL 数据 Returns: Optional[str]: 合并后的本地视频 URL,如果失败则返回None """ if not FFMPEG_AVAILABLE: logger.warning("[B站解析器] ffmpeg 不可用,无法合并音视频") return None if not AIOHTTP_AVAILABLE: logger.warning("[B站解析器] aiohttp 不可用,无法合并音视频") return None try: logger.info(f"[{self.name}] 开始下载并合并 m4s 音视频...") # 创建共享的 ClientSession 用于下载 async with aiohttp.ClientSession() as session: # 下载视频流 video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False) video_file.close() async with session.get(video_url, headers=headers, timeout=60) as response: if response.status != 200: logger.error(f"[{self.name}] 下载视频流失败: HTTP {response.status}") return None with open(video_file.name, 'wb') as f: while True: chunk = await response.content.read(8192) if not chunk: break f.write(chunk) logger.info(f"[{self.name}] 视频流下载完成: {video_file.name}") # 从 download_url_data 中提取音频 URL # B站的 dash 格式包含视频和音频流 audio_url = None if isinstance(download_url_data, dict): # 尝试 dash 格式(推荐) if 'dash' in download_url_data and isinstance(download_url_data['dash'], dict): dash = download_url_data['dash'] if 'audio' in dash and isinstance(dash['audio'], list) and len(dash['audio']) > 0: # 获取第一个音频流 audio_item = dash['audio'][0] audio_url = audio_item.get('baseUrl') or audio_item.get('url') or audio_item.get('backupUrl') logger.debug(f"[{self.name}] 从 dash.audio 提取音频 URL: {audio_url is not None}") elif 'audio' in dash and isinstance(dash['audio'], dict): audio_url = dash['audio'].get('baseUrl') or dash['audio'].get('url') logger.debug(f"[{self.name}] 从 dash.audio (dict) 提取音频 URL: {audio_url is not None}") # 尝试 durl 格式(非分段流) elif 'durl' in download_url_data: if isinstance(download_url_data['durl'], list) and len(download_url_data['durl']) > 0: main_url = download_url_data['durl'][0].get('url') or download_url_data['durl'][0].get('baseUrl') if main_url: video_url = main_url logger.debug(f"[{self.name}] 使用 durl 主 URL: {video_url}") if not audio_url and not video_url.startswith('http'): logger.warning(f"[{self.name}] 无法从 download_url_data 中提取音频 URL") logger.debug(f"[{self.name}] download_url_data 结构: {download_url_data}") os.unlink(video_file.name) return None # 下载音频流 audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False) audio_file.close() async with session.get(audio_url, headers=headers, timeout=60) as response: if response.status != 200: logger.error(f"[{self.name}] 下载音频流失败: HTTP {response.status}") os.unlink(video_file.name) return None with open(audio_file.name, 'wb') as f: while True: chunk = await response.content.read(8192) if not chunk: break f.write(chunk) logger.info(f"[{self.name}] 音频流下载完成: {audio_file.name}") # 使用 ffmpeg 合并视频和音频 merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) merged_file.close() # ffmpeg命令:使用ffmpeg -i多次输入,然后合并 # 先转换视频流(移除音频),然后添加音频流 ffmpeg_cmd = [ 'ffmpeg', '-y', '-i', video_file.name, '-i', audio_file.name, '-c:v', 'libx264', '-c:a', 'aac', '-shortest', merged_file.name ] logger.debug(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}") result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) # 详细记录ffmpeg输出 if result.stdout: logger.debug(f"[{self.name}] ffmpeg stdout: {result.stdout}") if result.stderr: logger.debug(f"[{self.name}] ffmpeg stderr: {result.stderr}") if result.returncode != 0: logger.error(f"[{self.name}] ffmpeg 合并失败: {result.stderr}") os.unlink(video_file.name) os.unlink(audio_file.name) return None # 验证输出文件 merged_size = os.path.getsize(merged_file.name) logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes") if merged_size == 0: logger.error(f"[{self.name}] ffmpeg生成了空文件,命令可能有问题") logger.error(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}") if result.stderr: logger.error(f"[{self.name}] ffmpeg错误输出: {result.stderr}") os.unlink(video_file.name) os.unlink(audio_file.name) return None logger.info(f"[{self.name}] 音视频合并成功: {merged_file.name} ({merged_size} bytes)") # 上传合并后的文件到本地文件服务器 from core.services.local_file_server import get_local_file_server server = get_local_file_server() if server: try: file_id = server._generate_file_id(f'file://{merged_file.name}') dest_path = server.download_dir / file_id # 获取合并文件大小 merged_size = os.path.getsize(merged_file.name) logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes") if merged_size == 0: logger.error(f"[{self.name}] 合并文件为空,ffmpeg可能失败了") merged_url = None else: # 复制本地文件到服务器目录 import shutil shutil.copy2(merged_file.name, dest_path) server.file_map[file_id] = dest_path # 验证复制后的文件 if dest_path.exists(): dest_size = dest_path.stat().st_size logger.debug(f"[{self.name}] 复制后文件大小: {dest_size} bytes") if dest_size == merged_size: merged_url = f"http://127.0.0.1:{server.port}/download?id={file_id}" logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}") else: logger.error(f"[{self.name}] 文件大小不匹配: 原始 {merged_size} vs 复制 {dest_size}") merged_url = None else: logger.error(f"[{self.name}] 文件复制失败: {dest_path} 不存在") merged_url = None except Exception as e: logger.error(f"[{self.name}] 上传合并文件失败: {e}") merged_url = None else: merged_url = None # 清理临时文件 try: os.unlink(video_file.name) os.unlink(audio_file.name) os.unlink(merged_file.name) except Exception as e: logger.warning(f"[{self.name}] 清理临时文件失败: {e}") if merged_url: logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}") return merged_url except Exception as e: logger.error(f"[{self.name}] 合并音视频失败: {e}") return None async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: """ 格式化B站视频响应消息 Args: event (MessageEvent): 消息事件对象 data (Dict[str, Any]): 视频信息 Returns: List[Any]: 消息段列表 """ # 检查视频时长 video_message: Union[str, MessageSegment] direct_url = None if data['duration'] > 7200: # 2小时 = 7200秒 video_message = "视频时长超过2小时,不进行解析。" else: # 构建完整的B站视频URL video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}" bvid = data.get('bvid', '') direct_url = await self.get_direct_video_url(video_url, bvid) if direct_url: video_message = MessageSegment.video(direct_url) else: video_message = "视频解析失败,无法获取直链。" text_message = ( f"BiliBili 视频解析\n" f"--------------------\n" f" UP主: {data['owner_name']}\n" f" 粉丝: {self.format_count(data['followers'])}\n" f"--------------------\n" f" 标题: {data['title']}\n" f" BV号: {data['bvid']}\n" f" 时长: {format_duration(data['duration'])}\n" f"--------------------\n" f" 数据:\n" f" 播放: {self.format_count(data['play'])}\n" f" 点赞: {self.format_count(data['like'])}\n" f" 投币: {self.format_count(data['coin'])}\n" f" 收藏: {self.format_count(data['favorite'])}\n" f" 转发: {self.format_count(data['share'])}\n" f" 弹幕: {self.format_count(data.get('danmaku', 0))}\n" ) image_message_segment = [ MessageSegment.text("B站封面:"), MessageSegment.image(data['cover_url']) ] up_info_segment = [ MessageSegment.text("UP主头像:"), MessageSegment.image(data['owner_avatar']) ] nodes = [ event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=text_message), event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=image_message_segment), event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=up_info_segment), event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=video_message) ] # 同时直接发送视频(如果获取到直链) if direct_url: try: await event.reply(MessageSegment.video(direct_url)) except Exception as e: logger.error(f"[{self.name}] 直接发送视频失败: {e}") return nodes def should_handle_url(self, url: str) -> bool: """ 判断是否应该处理该URL Args: url (str): URL Returns: bool: 是否应该处理 """ return bool(self.url_pattern.search(url))