chore: 整理配置与功能,优化B站解析流程

1. 从.gitignore移除config.example.toml并新增该示例配置文件
2. 新增项目规则文档说明开发环境要求
3. 修复config加载时的编码缺失问题
4. 重写bili_login.py,优化扫码登录流程与凭证输出
5. 重构B站解析器:简化下载逻辑,改用bilibili_api内置下载,优化音视频合并流程
This commit is contained in:
2026-05-12 12:38:34 +08:00
parent dcfb5d4892
commit 2cb55992f9
6 changed files with 254 additions and 235 deletions

1
.gitignore vendored
View File

@@ -150,7 +150,6 @@ scratch_files/
# Sensitive files (should never be committed) # Sensitive files (should never be committed)
config.toml config.toml
config.example.toml
ca/* ca/*
*.pem *.pem
*.key *.key

4
.trae/rules/1.md Normal file
View File

@@ -0,0 +1,4 @@
1. 所有代码都必须符合 PEP 8 规范
2. 项目根目录运行.venv\Scripts\activate 激活虚拟环境
3. 我是Windows11
4. 我的Python版本是3.15

View File

@@ -1,17 +1,39 @@
import asyncio import asyncio
from bilibili_api import login_v2 from bilibili_api import login_v2
async def main(): async def main():
print("请使用 Bilibili 手机 App 扫描二维码登录") print("请使用 Bilibili 手机 App 扫描二维码登录")
qr = login_v2.QrCodeLogin() print("=" * 40)
demo = await qr.generate_qrcode()
await print( qr.get_qrcode_terminal())
qr = login_v2.QrCodeLogin()
await qr.generate_qrcode()
print(qr.get_qrcode_terminal())
print("=" * 40)
print("等待扫码...")
while True:
state = await qr.check_state()
if state == login_v2.QrCodeLoginEvents.DONE:
print("登录成功!") print("登录成功!")
print(f"sessdata = \"{credential.sessdata}\"") break
print(f"bili_jct = \"{credential.bili_jct}\"") elif state == login_v2.QrCodeLoginEvents.SCAN:
print(f"buvid3 = \"{credential.buvid3}\"") print("已扫描,请确认登录...")
print(f"dedeuserid = \"{credential.dedeuserid}\"") elif state == login_v2.QrCodeLoginEvents.TIMEOUT:
print("二维码已过期,请重新运行")
return
await asyncio.sleep(1)
credential = qr.get_credential()
print()
print("请将以下凭证添加到 config.toml 的 [bilibili] 配置块中:")
print(f'sessdata = "{credential.sessdata}"')
print(f'bili_jct = "{credential.bili_jct}"')
print(f'buvid3 = "{credential.buvid3 if credential.buvid3 else ""}"')
print(f'dedeuserid = "{credential.dedeuserid}"')
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(main()) asyncio.run(main())

126
config.example.toml Normal file
View File

@@ -0,0 +1,126 @@
# =============================================================================
# NeoBot 配置文件示例
# =============================================================================
# 将此文件复制为 config.toml 并根据你的环境修改配置
# 敏感配置项如密码、Token可通过环境变量覆盖
# =============================================================================
# NapCat WebSocket 连接配置
# =============================================================================
[napcat_ws]
uri = "ws://localhost:8080" # NapCat WebSocket 地址
token = "" # NapCat WebSocket Token如无需鉴权则留空
reconnect_interval = 5 # 断线重连间隔(秒)
# =============================================================================
# Bot 基础配置
# =============================================================================
[bot]
command = ["/"] # 指令前缀列表
ignore_self_message = true # 是否忽略机器人自身消息
permission_denied_message = "权限不足,需要 {permission_name} 权限"
# =============================================================================
# 反向 WebSocket 服务端配置(可选)
# =============================================================================
[reverse_ws]
enabled = false # 是否启用
host = "0.0.0.0" # 监听地址
port = 3002 # 监听端口
token = "" # 鉴权 Token留空则不鉴权
# =============================================================================
# Redis 配置
# =============================================================================
[redis]
host = "localhost" # Redis 地址
port = 6379 # Redis 端口
db = 0 # Redis 数据库编号
password = "" # Redis 密码
# =============================================================================
# MySQL 配置
# =============================================================================
[mysql]
host = "localhost" # MySQL 地址
port = 3306 # MySQL 端口
user = "root" # MySQL 用户名
password = "" # MySQL 密码
db = "neobot" # 数据库名
charset = "utf8mb4" # 字符集
# =============================================================================
# Docker 沙箱执行配置
# =============================================================================
[docker]
base_url = "" # Docker 守护进程地址(留空使用默认)
sandbox_image = "python-sandbox:latest" # 沙箱镜像名
timeout = 10 # 执行超时(秒)
concurrency_limit = 5 # 最大并发数
tls_verify = false # 是否验证 TLS
ca_cert_path = "" # CA 证书路径(可选)
client_cert_path = "" # 客户端证书路径(可选)
client_key_path = "" # 客户端密钥路径(可选)
# =============================================================================
# 图片生成管理器配置
# =============================================================================
[image_manager]
image_height = 1920 # 图片高度
image_width = 1080 # 图片宽度
# =============================================================================
# 线程管理配置
# =============================================================================
[threading]
max_workers = 10 # 全局最大工作线程数
client_max_workers = 5 # 每个客户端最大工作线程数
thread_name_prefix = "NeoBot-Thread" # 线程名称前缀
# =============================================================================
# Bilibili 登录凭证配置(可选)
# =============================================================================
# 用于获取高清晰度视频等需要登录的功能
# 推荐通过环境变量 BILIBILI_SESSDATA / BILIBILI_BILI_JCT / BILIBILI_BUVID3 / BILIBILI_DEDEUSERID 设置
[bilibili]
sessdata = ""
bili_jct = ""
buvid3 = ""
dedeuserid = ""
# =============================================================================
# 本地文件服务器配置
# =============================================================================
[local_file_server]
enabled = true # 是否启用
host = "0.0.0.0" # 监听地址
port = 3003 # 监听端口
# =============================================================================
# Discord 适配器配置(可选)
# =============================================================================
[discord]
enabled = false # 是否启用
token = "" # Discord Bot Token
proxy = "" # 代理地址(可选)
proxy_type = "http" # 代理类型http / socks5
# =============================================================================
# 跨平台消息同步配置(可选)
# =============================================================================
[cross_platform]
enabled = false # 是否启用
# 平台映射表,键为平台代码(留空则不配置映射)
# [cross_platform.mappings]
# [cross_platform.mappings.10001]
# qq_group_id = 123456789
# name = "示例群组"
# =============================================================================
# 日志配置
# =============================================================================
[logging]
level = "DEBUG" # 全局日志级别
file_level = "DEBUG" # 文件日志级别
console_level = "INFO" # 控制台日志级别

View File

@@ -216,8 +216,8 @@ class Config:
self.logger.error(f"示例配置文件 {example_path} 不存在,无法生成配置") self.logger.error(f"示例配置文件 {example_path} 不存在,无法生成配置")
raise ConfigNotFoundError(message=f"示例配置文件 {example_path} 不存在") raise ConfigNotFoundError(message=f"示例配置文件 {example_path} 不存在")
content = example_path.read_text() content = example_path.read_text(encoding='utf-8')
self.path.write_text(content) self.path.write_text(content, encoding='utf-8')
# 通过属性访问配置 # 通过属性访问配置
@property @property

View File

@@ -2,6 +2,7 @@
import asyncio import asyncio
import re import re
import os import os
import shutil
import subprocess import subprocess
import tempfile import tempfile
from typing import Optional, Dict, Any, List, Union from typing import Optional, Dict, Any, List, Union
@@ -11,17 +12,17 @@ from neobot.models import MessageEvent, MessageSegment
from ..base import BaseParser from ..base import BaseParser
from ..utils import format_duration from ..utils import format_duration
from bilibili_api import video, select_client, Credential from bilibili_api import video, select_client, Credential, get_client, HEADERS
from bilibili_api.exceptions import ResponseCodeException from bilibili_api.exceptions import ResponseCodeException
from neobot.core.config_loader import global_config from neobot.core.config_loader import global_config
from neobot.core.services.local_file_server import download_to_local from neobot.core.services.local_file_server import download_to_local, get_local_file_server
try: try:
import aiohttp import aiohttp
AIOHTTP_AVAILABLE = True AIOHTTP_AVAILABLE = True
except ImportError: except ImportError:
AIOHTTP_AVAILABLE = False AIOHTTP_AVAILABLE = False
logger.warning("[B站解析器] aiohttp 未安装,音视频合并功能将不可用") logger.warning("[B站解析器] aiohttp 未安装,备用解析功能将不可用")
# bilibili_api-python 可用性标志 # bilibili_api-python 可用性标志
BILI_API_AVAILABLE = True BILI_API_AVAILABLE = True
@@ -284,264 +285,131 @@ class BiliParser(BaseParser):
try: try:
credential = self._get_credential() credential = self._get_credential()
v = video.Video(bvid=bvid, credential=credential) v = video.Video(bvid=bvid, credential=credential)
# 先获取视频信息以获取 cid
info = await v.get_info() info = await v.get_info()
cid = info.get('cid', 0) cid = info.get('cid', 0)
if not cid: if not cid:
return None return None
# 获取下载链接数据,使用 html5=True 获取网页格式(通常包含合并的音视频) download_url_data = await v.get_download_url(cid=cid)
download_url_data = await v.get_download_url(cid=cid, html5=True)
# 使用 VideoDownloadURLDataDetecter 解析数据
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data) detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
# 尝试获取 MP4 格式的合并流(包含音视频) if detecter.check_flv_mp4_stream():
streams = detecter.detect_best_streams() streams = detecter.detect_best_streams()
# 如果没有获取到流,尝试其他格式
if not streams: if not streams:
logger.warning(f"[{self.name}] 无法获取 html5 格式,尝试获取其他格式...")
download_url_data = await v.get_download_url(cid=cid, html5=False)
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detecter.detect_best_streams()
if streams:
# 获取视频直链
video_direct_url = streams[0].url
# 检查是否是分离的 m4s 流(可能没有声音)
is_m4s_stream = '.m4s' in video_direct_url
if is_m4s_stream:
logger.warning(f"[{self.name}] 检测到分离的 m4s 流B站 API 返回的 m4s 流通常是分离的视频和音频,需要客户端合并才能有声音")
logger.info(f"[{self.name}] 建议: 使用支持合并 m4s 流的下载工具(如 ffmpeg合并视频和音频")
logger.info(f"[{self.name}] 获取到视频直链,开始下载到本地...")
# B站下载需要 Referer 和 User-Agent
headers = {
"Referer": "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
# 调试:打印 download_url_data 结构
logger.debug(f"[{self.name}] download_url_data 类型: {type(download_url_data)}")
if isinstance(download_url_data, dict):
logger.debug(f"[{self.name}] download_url_data keys: {list(download_url_data.keys())}")
# 如果是 m4s 流且 ffmpeg 可用,先保存 download_url_data 供合并使用
if is_m4s_stream and FFMPEG_AVAILABLE and AIOHTTP_AVAILABLE:
local_url = await self._download_and_merge_m4s(video_direct_url, headers, bvid, download_url_data)
else:
# 使用本地文件服务器下载
local_url = await download_to_local(video_direct_url, timeout=120, headers=headers)
if local_url:
logger.success(f"[{self.name}] 视频已下载到本地: {local_url}")
return local_url
else:
logger.error(f"[{self.name}] 下载到本地失败")
return None return None
logger.info(f"[{self.name}] 检测到合并音视频流,直接下载...")
return await download_to_local(streams[0].url, timeout=120, headers=HEADERS)
if not FFMPEG_AVAILABLE:
logger.warning(f"[{self.name}] ffmpeg 不可用,无法合并音视频,仅下载视频流(无声音)")
streams = detecter.detect_best_streams()
if streams and streams[0]:
return await download_to_local(streams[0].url, timeout=120, headers=HEADERS)
return None
return await self._download_and_merge_m4s(detecter, bvid)
except (aiohttp.ClientError, asyncio.TimeoutError, ValueError, ResponseCodeException) as e: except (aiohttp.ClientError, asyncio.TimeoutError, ValueError, ResponseCodeException) as e:
logger.error(f"[{self.name}] 获取视频直链失败: {e}") logger.error(f"[{self.name}] 获取视频直链失败: {e}")
return None return None
async def _download_and_merge_m4s(self, video_url: str, headers: Dict[str, str], bvid: str, download_url_data: Dict) -> Optional[str]: async def _download_and_merge_m4s(self, detecter: video.VideoDownloadURLDataDetecter, bvid: str) -> Optional[str]:
""" """
下载并合并 m4s 视频和音频流 下载并合并 m4s 视频和音频流
Args: Args:
video_url (str): 视频流 URL detecter (VideoDownloadURLDataDetecter): 视频流检测器
headers (Dict[str, str]): 请求头
bvid (str): BV号 bvid (str): BV号
download_url_data (Dict): 下载 URL 数据
Returns: Returns:
Optional[str]: 合并后的本地视频 URL如果失败则返回None Optional[str]: 合并后的本地视频 URL如果失败则返回None
""" """
if not FFMPEG_AVAILABLE: if not FFMPEG_AVAILABLE:
logger.warning("[B站解析器] ffmpeg 不可用,无法合并音视频")
return None return None
if not AIOHTTP_AVAILABLE: streams = detecter.detect_best_streams()
logger.warning("[B站解析器] aiohttp 不可用,无法合并音视频") if not streams or not streams[0]:
logger.error(f"[{self.name}] 未检测到可用的视频流")
return None return None
video_stream = streams[0]
audio_stream = streams[1] if len(streams) > 1 else None
video_file = None
audio_file = None
merged_file = None
try: try:
logger.info(f"[{self.name}] 开始下载并合并 m4s 音视频...")
# 创建共享的 ClientSession 用于下载
async with aiohttp.ClientSession() as session:
# 下载视频流
video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False) video_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
video_file.close() video_file.close()
async with session.get(video_url, headers=headers, timeout=60) as response: dwn_id = await get_client().download_create(video_stream.url, HEADERS)
if response.status != 200: tot = get_client().download_content_length(dwn_id)
logger.error(f"[{self.name}] 下载视频流失败: HTTP {response.status}")
return None
with open(video_file.name, 'wb') as f: with open(video_file.name, 'wb') as f:
while True: while True:
chunk = await response.content.read(8192) chunk = await get_client().download_chunk(dwn_id)
if not chunk:
break
f.write(chunk) f.write(chunk)
if f.tell() >= tot:
break
await get_client().download_close(cnt=dwn_id)
logger.info(f"[{self.name}] 视频流下载完成: {video_file.name}") if not audio_stream:
logger.warning(f"[{self.name}] 未检测到音频流,仅返回视频")
return await download_to_local(video_stream.url, timeout=120, headers=HEADERS)
# 从 download_url_data 中提取音频 URL
# B站的 dash 格式包含视频和音频流
audio_url = None
if isinstance(download_url_data, dict):
# 尝试 dash 格式(推荐)
if 'dash' in download_url_data and isinstance(download_url_data['dash'], dict):
dash = download_url_data['dash']
if 'audio' in dash and isinstance(dash['audio'], list) and len(dash['audio']) > 0:
# 获取第一个音频流
audio_item = dash['audio'][0]
audio_url = audio_item.get('baseUrl') or audio_item.get('url') or audio_item.get('backupUrl')
logger.debug(f"[{self.name}] 从 dash.audio 提取音频 URL: {audio_url is not None}")
elif 'audio' in dash and isinstance(dash['audio'], dict):
audio_url = dash['audio'].get('baseUrl') or dash['audio'].get('url')
logger.debug(f"[{self.name}] 从 dash.audio (dict) 提取音频 URL: {audio_url is not None}")
# 尝试 durl 格式(非分段流)
elif 'durl' in download_url_data:
if isinstance(download_url_data['durl'], list) and len(download_url_data['durl']) > 0:
main_url = download_url_data['durl'][0].get('url') or download_url_data['durl'][0].get('baseUrl')
if main_url:
video_url = main_url
logger.debug(f"[{self.name}] 使用 durl 主 URL: {video_url}")
if not audio_url and not video_url.startswith('http'):
logger.warning(f"[{self.name}] 无法从 download_url_data 中提取音频 URL")
logger.debug(f"[{self.name}] download_url_data 结构: {download_url_data}")
os.unlink(video_file.name)
return None
# 下载音频流
audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False) audio_file = tempfile.NamedTemporaryFile(suffix='.m4s', delete=False)
audio_file.close() audio_file.close()
async with session.get(audio_url, headers=headers, timeout=60) as response: dwn_id = await get_client().download_create(audio_stream.url, HEADERS)
if response.status != 200: tot = get_client().download_content_length(dwn_id)
logger.error(f"[{self.name}] 下载音频流失败: HTTP {response.status}")
os.unlink(video_file.name)
return None
with open(audio_file.name, 'wb') as f: with open(audio_file.name, 'wb') as f:
while True: while True:
chunk = await response.content.read(8192) chunk = await get_client().download_chunk(dwn_id)
if not chunk:
break
f.write(chunk) f.write(chunk)
if f.tell() >= tot:
break
await get_client().download_close(cnt=dwn_id)
logger.info(f"[{self.name}] 音频流下载完成: {audio_file.name}")
# 使用 ffmpeg 合并视频和音频
merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) merged_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
merged_file.close() merged_file.close()
# ffmpeg命令使用ffmpeg -i多次输入然后合并
# 先转换视频流(移除音频),然后添加音频流
ffmpeg_cmd = [ ffmpeg_cmd = [
'ffmpeg', '-y', '-i', video_file.name, '-i', audio_file.name, 'ffmpeg', '-y',
'-c:v', 'libx264', '-c:a', 'aac', '-i', video_file.name,
'-shortest', merged_file.name '-i', audio_file.name,
'-c:v', 'copy',
'-c:a', 'copy',
merged_file.name
] ]
logger.debug(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}") subprocess.run(ffmpeg_cmd, capture_output=True, check=True)
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
# 详细记录ffmpeg输出
if result.stdout:
logger.debug(f"[{self.name}] ffmpeg stdout: {result.stdout}")
if result.stderr:
logger.debug(f"[{self.name}] ffmpeg stderr: {result.stderr}")
if result.returncode != 0:
logger.error(f"[{self.name}] ffmpeg 合并失败: {result.stderr}")
os.unlink(video_file.name)
os.unlink(audio_file.name)
return None
# 验证输出文件
merged_size = os.path.getsize(merged_file.name)
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
if merged_size == 0:
logger.error(f"[{self.name}] ffmpeg生成了空文件命令可能有问题")
logger.error(f"[{self.name}] ffmpeg命令: {' '.join(ffmpeg_cmd)}")
if result.stderr:
logger.error(f"[{self.name}] ffmpeg错误输出: {result.stderr}")
os.unlink(video_file.name)
os.unlink(audio_file.name)
return None
logger.info(f"[{self.name}] 音视频合并成功: {merged_file.name} ({merged_size} bytes)")
# 上传合并后的文件到本地文件服务器
from neobot.core.services.local_file_server import get_local_file_server
server = get_local_file_server() server = get_local_file_server()
if server: if server:
try: file_id = f"bili_{bvid}"
file_id = server._generate_file_id(f'file://{merged_file.name}')
dest_path = server.download_dir / file_id dest_path = server.download_dir / file_id
shutil.copy2(merged_file.name, str(dest_path))
# 获取合并文件大小
merged_size = os.path.getsize(merged_file.name)
logger.debug(f"[{self.name}] 合并文件大小: {merged_size} bytes")
if merged_size == 0:
logger.error(f"[{self.name}] 合并文件为空ffmpeg可能失败了")
merged_url = None
else:
# 复制本地文件到服务器目录
import shutil
shutil.copy2(merged_file.name, dest_path)
server.file_map[file_id] = dest_path server.file_map[file_id] = dest_path
logger.success(f"[{self.name}] 合并后的视频已注册到本地文件服务器")
return f"http://{server.host}:{server.port}/download?id={file_id}"
# 验证复制后的文件 logger.warning(f"[{self.name}] 本地文件服务器不可用")
if dest_path.exists():
dest_size = dest_path.stat().st_size
logger.debug(f"[{self.name}] 复制后文件大小: {dest_size} bytes")
if dest_size == merged_size:
merged_url = f"http://127.0.0.1:{server.port}/download?id={file_id}"
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
else:
logger.error(f"[{self.name}] 文件大小不匹配: 原始 {merged_size} vs 复制 {dest_size}")
merged_url = None
else:
logger.error(f"[{self.name}] 文件复制失败: {dest_path} 不存在")
merged_url = None
except Exception as e:
logger.error(f"[{self.name}] 上传合并文件失败: {e}")
merged_url = None
else:
merged_url = None
# 清理临时文件
try:
os.unlink(video_file.name)
os.unlink(audio_file.name)
os.unlink(merged_file.name)
except (OSError, PermissionError) as e:
logger.warning(f"[{self.name}] 清理临时文件失败: {e}")
if merged_url:
logger.success(f"[{self.name}] 合并后的视频已上传到本地服务器: {merged_url}")
return merged_url
except (aiohttp.ClientError, asyncio.TimeoutError, ValueError, OSError, subprocess.CalledProcessError) as e:
logger.error(f"[{self.name}] 合并音视频失败: {e}")
return None return None
except Exception as e:
logger.error(f"[{self.name}] 合并音视频失败: {e}")
return None
finally:
for f in [video_file, audio_file, merged_file]:
if f and os.path.exists(f.name):
try:
os.unlink(f.name)
except OSError:
pass
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
""" """
格式化B站视频响应消息 格式化B站视频响应消息