From 12d1eb3438d8529ae0b58d1591e5f4e44ad5b41e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=95=80=E9=93=AC=E9=85=B8=E9=92=BE?= <148796996+K2cr2O1@users.noreply.github.com> Date: Thu, 22 Jan 2026 16:25:13 +0800 Subject: [PATCH] Dev (#45) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 滚木 * feat: 重构核心架构,增强类型安全与插件管理 本次提交对核心模块进行了深度重构,引入 Pydantic 增强配置管理的类型安全性,并全面优化了插件管理系统。 主要变更详情: 1. 核心架构与配置 - 重构配置加载模块:引入 Pydantic 模型 (`core/config_models.py`),提供严格的配置项类型检查、验证及默认值管理。 - 统一模块结构:规范化模块导入路径,移除冗余的 `__init__.py` 文件,提升项目结构的清晰度。 - 性能优化:集成 Redis 缓存支持 (`RedisManager`),有效降低高频 API 调用开销,提升响应速度。 2. 插件系统升级 - 实现热重载机制:新增插件文件变更监听功能,支持开发过程中自动重载插件,提升开发效率。 - 优化生命周期管理:改进插件加载与卸载逻辑,支持精确卸载指定插件及其关联的命令、事件处理器和定时任务。 3. 功能特性增强 - 新增媒体 API:引入 `MediaAPI` 模块,封装图片、语音等富媒体资源的获取与处理接口。 - 完善权限体系:重构权限管理系统,实现管理员与操作员的分级控制,支持更细粒度的命令权限校验。 4. 代码质量与稳定性 - 全面类型修复:解决 `mypy` 静态类型检查发现的大量类型错误(包括 `CommandManager`、`EventFactory` 及 `Bot` API 签名不匹配问题)。 - 增强错误处理:优化消息处理管道的异常捕获机制,完善关键路径的日志记录,提升系统运行稳定性。 * feat: 添加测试用例并优化代码结构 refactor(permission_manager): 调整初始化顺序和逻辑 fix(admin_manager): 修复初始化逻辑和目录创建问题 feat(ws): 优化Bot实例初始化条件 feat(message): 增强MessageSegment功能并添加测试 feat(events): 支持字符串格式的消息解析 test: 添加核心功能测试用例 refactor(plugin_manager): 改进插件路径处理 style: 清理无用导入和代码 chore: 更新依赖项 * refactor(handler): 移除TYPE_CHECKING并直接导入Bot类 简化类型注解,直接导入Bot类而非使用TYPE_CHECKING条件导入,提高代码可读性和维护性 * fix(command_manager): 修复插件卸载时元信息移除不精确的问题 修复 CommandManager 中 unload_plugin 方法移除插件元信息时使用 startswith 导致可能误删其他插件的问题,改为精确匹配 同时调整相关测试用例验证精确匹配行为 * refactor: 清理未使用的导入和更新文档结构 docs: 添加config_models.py到项目结构文档 docs: 调整数据目录位置到core/data下 docs: 更新权限管理器文档描述 * 文档更新 * 更新thpic插件 支持一次返回多张图 * feat: 添加测试覆盖率并修复相关问题 refactor(redis_manager): 移除冗余的ConnectionError处理 refactor(event_handler): 优化Bot类型注解 refactor(factory): 移除未使用的GroupCardNoticeEvent test: 添加全面的单元测试覆盖 - 添加test_import.py测试模块导入 - 添加test_debug.py测试插件加载调试 - 添加test_plugin_error.py测试错误处理 - 添加test_config_loader.py测试配置加载 - 添加test_redis_manager.py测试Redis管理 - 添加test_bot.py测试Bot功能 - 扩展test_models.py测试消息模型 - 添加test_plugin_manager_coverage.py测试插件管理 - 添加test_executor.py测试代码执行器 - 添加test_ws.py测试WebSocket - 添加test_api.py测试API接口 - 添加test_core_managers.py测试核心管理模块 fix(plugin_manager): 修复插件加载日志变量问题 覆盖率已到达86%(忽略插件) * 更新/help指令,现在会发送图片 * feat(help): 重构帮助系统为图片渲染模式 添加浏览器管理器和图片管理器,用于通过 Playwright 渲染帮助菜单为图片 重构命令管理器以支持图片缓存和同步功能 添加 HTML 模板用于帮助菜单渲染 * build: 更新依赖文件 requirements.txt * build: 更新依赖文件 * feat: 添加性能优化和架构文档,更新依赖和核心模块 refactor(browser_manager): 实现页面池机制以提升性能 refactor(image_manager): 添加模板缓存并集成页面池 refactor(bili_parser): 迁移到异步HTTP请求并实现会话复用 docs: 新增性能优化、架构设计和最佳实践文档 chore: 更新requirements.txt添加新依赖 * docs: 更新文档内容并优化语言风格 重构所有文档内容,使用更简洁直接的语言风格 更新架构、插件开发、部署等核心文档 优化代码示例和图表说明 统一术语和格式规范 * docs: 更新文档内容,简化语言并修正格式 - 简化插件开发指南中的描述,移除冗余内容 - 调整部署文档中的Python版本说明 - 优化最佳实践文档的措辞和格式 - 更新性能优化文档,删除不准确的数据 - 重构核心概念文档,使用更简洁的语言 - 修正README中的项目描述和技术栈说明 - 更新快速上手文档,简化安装步骤 - 调整事件流转文档的描述方式 - 简化架构文档内容 - 更新指令处理文档,添加参数注入示例 - 优化单例管理器文档的表述 * refactor(core): 优化权限管理和事件模型 - 重构 AdminManager 和 PermissionManager 以 Redis 为主要数据源 - 为所有事件模型添加 slots=True 提升性能 - 更新文档说明 Mypyc 编译注意事项 - 清理测试和调试文件 - 移动静态资源到 web_static 目录 * feat: 添加模块编译脚本和导出依赖功能 refactor(events): 移除数据类的slots参数以提升兼容性 build: 更新requirements.txt依赖列表 * docs: 更新性能优化文档并修复命令管理器帮助输出 更新性能优化相关文档,详细说明 Python 3.14 JIT 编译器的使用方法和原理,补充与 Mypyc 的互补策略。同时修复命令管理器中帮助信息的输出方式,移除图片发送仅保留文本输出。 调整部署文档结构,明确两种性能优化方案(AOT 和 JIT)的配置方法和适用场景。完善架构文档中关于 JIT 的原理和启用方式说明。 * feat(help): 重构帮助菜单界面并优化样式 refactor(bili_parser): 修复 API 响应 content-type 问题 fix(command_manager): 添加帮助图片获取的错误处理 docs(deployment): 简化部署文档并移除 JIT 相关内容 * feat: 新增自动同意请求插件和API文档 docs: 更新文档结构和内容 * refactor(scripts): 重构并优化脚本文件结构 feat(scripts): 添加Python环境检查脚本 feat(scripts): 增强依赖导出脚本功能 perf(plugins/bili_parser): 优化B站解析器性能和代码结构 style(plugins/bili_parser): 统一代码风格和常量命名 * fix(scripts): 修复编码问题并添加错误追踪 在compile_machine_code.py中添加utf-8编码设置以避免潜在编码问题 添加traceback.print_exc()以在编译失败时打印完整错误堆栈 更新.gitignore以忽略config.toml文件 * feat(性能分析): 实现性能分析工具模块并添加相关测试 添加性能分析工具模块,包括时间测量、内存分析和性能统计功能 添加测试文件和示例配置,完善性能分析工具的使用场景 在工具模块中实现单例装饰器并导出到__init__.py * feat(douyin_parser): 新增抖音视频解析插件 refactor(performance): 移除未使用的asyncio导入并优化性能测试 style(compile_modules): 修正字符串引号格式 chore: 删除废弃的编译脚本和临时文件 fix(bili_parser): 增强B站链接解析的健壮性 refactor(singleton): 重构单例模式实现 docs: 更新配置文件和事件模型注释 * feat: 添加抖音视频解析插件并优化代码结构 添加抖音视频解析插件,支持自动解析抖音分享链接并提取视频信息。优化现有代码结构,包括: - 重构单例模式实现 - 移除未使用的导入和文件 - 修复性能测试脚本中的异步调用 - 优化消息事件模型中的权限常量定义 - 改进编译脚本的错误处理 - 增强B站解析插件的稳定性 同时清理了多个废弃脚本和临时文件,提升代码可维护性。 * 1 * Delete core/data/temp/help_menu.png * fix(权限管理): 增强权限检查的类型安全并修复权限引用 修复权限检查中可能传入非Permission类型导致的错误,将echo插件的权限引用从MessageEvent.ADMIN迁移到Permission.ADMIN * redis取消tls * feat(github_parser): 添加GitHub仓库信息查询功能 - 新增github_parser插件,支持通过命令或自动解析链接查询GitHub仓库信息 - 添加github_repo.html模板用于渲染仓库信息图片 - 优化图片管理器支持高质量截图和CSS缩放 - 重构消息事件类权限常量定义方式 - 更新帮助页面样式为三列布局并优化响应式设计 * feat(web_parser): 新增通用web链接解析插件框架 refactor: 重构B站、抖音、GitHub解析器为模块化结构 fix(executor): 增强docker容器错误处理和回调稳定性 style(templates): 优化帮助页面和代码执行结果的样式 perf(web_parser): 添加API缓存和消息去重机制 docs: 更新插件元信息和注释 chore: 移除旧的独立解析器插件文件 * refactor(managers): 重构单例管理器实现并优化代码结构 feat(ws_pool): 新增 WebSocket 连接池实现 perf(json): 使用 orjson 替代标准 json 库提升性能 style: 清理未使用的导入和冗余代码 docs: 更新架构文档和开发规范 test: 添加 WebSocket 连接池测试用例 fix(plugins): 修复自动审批插件 API 调用参数格式 --------- Co-authored-by: baby20162016 <2185823427@qq.com> Co-authored-by: web vscode --- core/WS.py | 283 ++++++++++++++------ core/api/account.py | 16 +- core/api/friend.py | 10 +- core/api/group.py | 10 +- core/config_loader.py | 3 +- core/managers/admin_manager.py | 8 +- core/managers/browser_manager.py | 14 +- core/managers/command_manager.py | 3 - core/managers/image_manager.py | 16 +- core/managers/permission_manager.py | 8 +- core/managers/plugin_manager.py | 35 ++- core/managers/redis_manager.py | 14 +- core/utils/__init__.py | 1 - core/utils/error_codes.py | 5 +- core/utils/json_utils.py | 34 --- core/utils/performance.py | 6 +- core/utils/singleton.py | 8 +- core/ws_pool.py | 231 +++++++++++++++++ docs/core-concepts/architecture.md | 115 +++++++++ docs/development-standards.md | 357 ++++++++++++++++++++++++++ main.py | 3 +- models/events/message.py | 13 +- performance_config_example.py | 1 - plugins/admin.py | 8 +- plugins/auto_approve.py | 14 +- plugins/web_parser/base.py | 10 +- plugins/web_parser/parsers/bili.py | 23 +- plugins/web_parser/parsers/douyin.py | 7 +- plugins/web_parser/parsers/github.py | 7 +- plugins/web_parser/utils.py | 6 +- profile_main.py | 1 - scripts/compile_machine_code.py | 8 +- tests/test_api.py | 3 +- tests/test_command_manager.py | 1 - tests/test_config_loader.py | 2 - tests/test_executor.py | 14 +- tests/test_models.py | 1 - tests/test_performance.py | 4 +- tests/test_plugin_manager_coverage.py | 1 - tests/test_redis_manager.py | 2 +- tests/test_ws.py | 6 +- tests/test_ws_pool.py | 234 +++++++++++++++++ 42 files changed, 1285 insertions(+), 261 deletions(-) delete mode 100644 core/utils/json_utils.py create mode 100644 core/ws_pool.py create mode 100644 docs/development-standards.md create mode 100644 tests/test_ws_pool.py diff --git a/core/WS.py b/core/WS.py index 6bd1ce9..e3f2de5 100644 --- a/core/WS.py +++ b/core/WS.py @@ -12,7 +12,7 @@ WebSocket 连接。它是整个机器人框架的底层通信基础。 - 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。 """ import asyncio -import json +import orjson from typing import Any, Dict, Optional, cast import uuid @@ -25,11 +25,12 @@ from .bot import Bot from .config_loader import global_config from .managers.command_manager import matcher from .utils.executor import CodeExecutor -from .utils.logger import logger, ModuleLogger +from .utils.logger import ModuleLogger from .utils.exceptions import ( - WebSocketError, WebSocketConnectionError, WebSocketAuthenticationError + WebSocketError, WebSocketConnectionError ) from .utils.error_codes import ErrorCode, create_error_response +from .ws_pool import WSConnectionPool class WS: @@ -37,11 +38,14 @@ class WS: WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。 """ - def __init__(self, code_executor: Optional[CodeExecutor] = None) -> None: + def __init__(self, code_executor: Optional[CodeExecutor] = None, use_pool: bool = True) -> None: """ 初始化 WebSocket 客户端。 从全局配置中读取 WebSocket URI、访问令牌(Token)和重连间隔。 + + :param code_executor: 代码执行器实例 + :param use_pool: 是否使用连接池 """ # 读取参数 cfg = global_config.napcat_ws @@ -55,6 +59,8 @@ class WS: self.bot: Bot | None = None self.self_id: int | None = None self.code_executor = code_executor + self.use_pool = use_pool + self.pool: Optional[WSConnectionPool] = None # 创建模块专用日志记录器 self.logger = ModuleLogger("WebSocket") @@ -68,46 +74,112 @@ class WS: """ headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} + if self.use_pool: + # 使用连接池模式 + self.pool = WSConnectionPool(pool_size=3) + await self.pool.initialize() + self.logger.success("WebSocket 连接池初始化完成") + + # 启动连接池监听循环 + await self._pool_listen_loop() + else: + # 单连接模式 + while True: + try: + self.logger.info(f"正在尝试连接至 NapCat: {self.url}") + async with websockets.connect( + self.url, additional_headers=headers + ) as websocket_raw: + websocket = cast(WebSocketClientProtocol, websocket_raw) + self.ws = websocket + self.logger.success("连接成功!") + await self._listen_loop(websocket) + + except ( + websockets.exceptions.ConnectionClosed, + ConnectionRefusedError, + ) as e: + conn_error = WebSocketConnectionError( + message=f"WebSocket连接失败: {str(e)}", + code=ErrorCode.WS_CONNECTION_FAILED, + original_error=e + ) + self.logger.error(f"连接失败: {conn_error.message}") + self.logger.log_custom_exception(conn_error) + except Exception as e: + error = WebSocketError( + message=f"WebSocket运行异常: {str(e)}", + code=ErrorCode.WS_MESSAGE_ERROR, + original_error=e + ) + self.logger.exception(f"运行异常: {error.message}") + self.logger.log_custom_exception(error) + + self.logger.info(f"{self.reconnect_interval}秒后尝试重连...") + await asyncio.sleep(self.reconnect_interval) + + async def _pool_listen_loop(self): + """ + 连接池模式下的监听循环 + """ while True: try: - self.logger.info(f"正在尝试连接至 NapCat: {self.url}") - async with websockets.connect( - self.url, additional_headers=headers - ) as websocket_raw: - websocket = cast(WebSocketClientProtocol, websocket_raw) - self.ws = websocket - self.logger.success("连接成功!") - await self._listen_loop(websocket) - - except websockets.exceptions.AuthenticationError as e: - error = WebSocketAuthenticationError( - message=f"WebSocket认证失败: {str(e)}", - code=ErrorCode.WS_AUTH_FAILED, - original_error=e - ) - self.logger.error(f"连接失败: {error.message}") - self.logger.log_custom_exception(error) - except ( - websockets.exceptions.ConnectionClosed, - ConnectionRefusedError, - ) as e: - error = WebSocketConnectionError( - message=f"连接断开或服务器拒绝访问: {str(e)}", - code=ErrorCode.WS_CONNECTION_FAILED, - original_error=e - ) - self.logger.warning(f"连接失败: {error.message}") + # 从连接池获取一个连接 + conn = await self.pool.get_connection() + + try: + # 监听连接上的消息 + async for message in conn.conn: + await self._handle_message(message, conn) + except Exception as e: + self.logger.error(f"连接 {conn.conn_id} 监听异常: {e}") + finally: + # 释放连接回连接池 + await self.pool.release_connection(conn) except Exception as e: - error = WebSocketError( - message=f"WebSocket运行异常: {str(e)}", - code=ErrorCode.WS_MESSAGE_ERROR, - original_error=e - ) - self.logger.exception(f"运行异常: {error.message}") - self.logger.log_custom_exception(error) + self.logger.error(f"连接池监听循环异常: {e}") + await asyncio.sleep(self.reconnect_interval) + + async def _handle_message(self, message: str, conn): + """ + 处理从连接池获取的消息 + """ + try: + data = orjson.loads(message) - self.logger.info(f"{self.reconnect_interval}秒后尝试重连...") - await asyncio.sleep(self.reconnect_interval) + # 1. 处理 API 响应 + # 如果消息中包含 echo 字段,说明是 API 调用的响应 + echo_id = data.get("echo") + if echo_id and echo_id in self._pending_requests: + future = self._pending_requests.pop(echo_id) + if not future.done(): + future.set_result(data) + return + + # 2. 处理上报事件 + # 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件 + if "post_type" in data: + # 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环 + asyncio.create_task(self.on_event(data)) + + except orjson.JSONDecodeError as e: + error = WebSocketError( + message=f"JSON解析失败: {str(e)}", + code=ErrorCode.WS_MESSAGE_ERROR, + original_error=e + ) + self.logger.error(f"解析消息异常: {error.message}") + # 如果message是bytes类型,需要先解码 + decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message + self.logger.debug(f"原始消息: {decoded_message}") + except Exception as e: + error = WebSocketError( + message=f"处理消息异常: {str(e)}", + code=ErrorCode.WS_MESSAGE_ERROR, + original_error=e + ) + self.logger.exception(f"解析消息异常: {error.message}") + self.logger.log_custom_exception(error) async def _listen_loop(self, websocket_connection: WebSocketClientProtocol) -> None: """ @@ -121,7 +193,7 @@ class WS: """ async for message in websocket_connection: try: - data = json.loads(message) + data = orjson.loads(message) # 1. 处理 API 响应 # 如果消息中包含 echo 字段,说明是 API 调用的响应 @@ -138,14 +210,16 @@ class WS: # 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环 asyncio.create_task(self.on_event(data)) - except json.JSONDecodeError as e: + except orjson.JSONDecodeError as e: error = WebSocketError( message=f"JSON解析失败: {str(e)}", code=ErrorCode.WS_MESSAGE_ERROR, original_error=e ) self.logger.error(f"解析消息异常: {error.message}") - self.logger.debug(f"原始消息: {message}") + # 如果message是bytes类型,需要先解码 + decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message + self.logger.debug(f"原始消息: {decoded_message}") except Exception as e: error = WebSocketError( message=f"处理消息异常: {str(e)}", @@ -236,48 +310,93 @@ class WS: dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个 表示失败的字典。 """ - if not self.ws: - self.logger.error("调用 API 失败: WebSocket 未初始化") - return create_error_response( - code=ErrorCode.WS_DISCONNECTED, - message="WebSocket未初始化", - data={"action": action, "params": params} - ) + if self.use_pool: + # 使用连接池模式 + if not self.pool: + self.logger.error("调用 API 失败: WebSocket 连接池未初始化") + return create_error_response( + code=ErrorCode.WS_DISCONNECTED, + message="WebSocket连接池未初始化", + data={"action": action, "params": params} + ) + + # 从连接池获取一个连接 + conn = await self.pool.get_connection() + try: + echo_id = str(uuid.uuid4()) + payload = {"action": action, "params": params or {}, "echo": echo_id} - from websockets.protocol import State + loop = asyncio.get_running_loop() + future = loop.create_future() + self._pending_requests[echo_id] = future - if getattr(self.ws, "state", None) is not State.OPEN: - self.logger.error("调用 API 失败: WebSocket 连接未打开") - return create_error_response( - code=ErrorCode.WS_DISCONNECTED, - message="WebSocket连接未打开", - data={"action": action, "params": params} - ) + try: + await conn.send(orjson.dumps(payload)) + result = await asyncio.wait_for(future, timeout=30.0) + return result + except asyncio.TimeoutError: + self._pending_requests.pop(echo_id, None) + self.logger.warning(f"API 调用超时: action={action}, params={params}") + return create_error_response( + code=ErrorCode.TIMEOUT_ERROR, + message="API调用超时", + data={"action": action, "params": params} + ) + except Exception as e: + self._pending_requests.pop(echo_id, None) + self.logger.exception(f"API 调用异常: action={action}, error={str(e)}") + return create_error_response( + code=ErrorCode.WS_MESSAGE_ERROR, + message=f"API调用异常: {str(e)}", + data={"action": action, "params": params} + ) + finally: + # 释放连接回连接池 + await self.pool.release_connection(conn) + else: + # 单连接模式 + if not self.ws: + self.logger.error("调用 API 失败: WebSocket 未初始化") + return create_error_response( + code=ErrorCode.WS_DISCONNECTED, + message="WebSocket未初始化", + data={"action": action, "params": params} + ) - echo_id = str(uuid.uuid4()) - payload = {"action": action, "params": params or {}, "echo": echo_id} + from websockets.protocol import State - loop = asyncio.get_running_loop() - future = loop.create_future() - self._pending_requests[echo_id] = future + if getattr(self.ws, "state", None) is not State.OPEN: + self.logger.error("调用 API 失败: WebSocket 连接未打开") + return create_error_response( + code=ErrorCode.WS_DISCONNECTED, + message="WebSocket连接未打开", + data={"action": action, "params": params} + ) - try: - await self.ws.send(json.dumps(payload)) - return await asyncio.wait_for(future, timeout=30.0) - except asyncio.TimeoutError: - self._pending_requests.pop(echo_id, None) - self.logger.warning(f"API 调用超时: action={action}, params={params}") - return create_error_response( - code=ErrorCode.TIMEOUT_ERROR, - message="API调用超时", - data={"action": action, "params": params} - ) - except Exception as e: - self._pending_requests.pop(echo_id, None) - self.logger.exception(f"API 调用异常: action={action}, error={str(e)}") - return create_error_response( - code=ErrorCode.WS_MESSAGE_ERROR, - message=f"API调用异常: {str(e)}", - data={"action": action, "params": params} - ) + echo_id = str(uuid.uuid4()) + payload = {"action": action, "params": params or {}, "echo": echo_id} + + loop = asyncio.get_running_loop() + future = loop.create_future() + self._pending_requests[echo_id] = future + + try: + await self.ws.send(orjson.dumps(payload)) + return await asyncio.wait_for(future, timeout=30.0) + except asyncio.TimeoutError: + self._pending_requests.pop(echo_id, None) + self.logger.warning(f"API 调用超时: action={action}, params={params}") + return create_error_response( + code=ErrorCode.TIMEOUT_ERROR, + message="API调用超时", + data={"action": action, "params": params} + ) + except Exception as e: + self._pending_requests.pop(echo_id, None) + self.logger.exception(f"API 调用异常: action={action}, error={str(e)}") + return create_error_response( + code=ErrorCode.WS_MESSAGE_ERROR, + message=f"API调用异常: {str(e)}", + data={"action": action, "params": params} + ) diff --git a/core/api/account.py b/core/api/account.py index 3d8b80e..3d895c9 100644 --- a/core/api/account.py +++ b/core/api/account.py @@ -4,7 +4,7 @@ 该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、 状态设置等相关的 OneBot v11 API 封装。 """ -import json +import orjson from typing import Dict, Any from .base import BaseAPI from models.objects import LoginInfo, VersionInfo, Status @@ -30,10 +30,10 @@ class AccountAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.get(cache_key) if cached_data: - return LoginInfo(**json.loads(cached_data)) + return LoginInfo(**orjson.loads(cached_data)) res = await self.call_api("get_login_info") - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return LoginInfo(**res) async def get_version_info(self) -> VersionInfo: @@ -43,7 +43,7 @@ class AccountAPI(BaseAPI): Returns: VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。 """ - res = await self.call_api("get_version_info") + res = await self.call_api("get_friend_list") return VersionInfo(**res) async def get_status(self) -> Status: @@ -189,10 +189,10 @@ class AccountAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.get(cache_key) if cached_data: - return json.loads(cached_data) + return orjson.loads(cached_data) res = await self.call_api("get_friend_list") - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return res async def get_group_list(self, no_cache: bool = False) -> list: @@ -209,9 +209,9 @@ class AccountAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.get(cache_key) if cached_data: - return json.loads(cached_data) + return orjson.loads(cached_data) res = await self.call_api("get_group_list") - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return res diff --git a/core/api/friend.py b/core/api/friend.py index a3a118b..85dd071 100644 --- a/core/api/friend.py +++ b/core/api/friend.py @@ -4,7 +4,7 @@ 该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息 等相关的 OneBot v11 API 封装。 """ -import json +import orjson from typing import List, Dict, Any from .base import BaseAPI from models.objects import FriendInfo, StrangerInfo @@ -44,10 +44,10 @@ class FriendAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.redis.get(cache_key) if cached_data: - return StrangerInfo(**json.loads(cached_data)) + return StrangerInfo(**orjson.loads(cached_data)) res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) - await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return StrangerInfo(**res) async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]: @@ -64,10 +64,10 @@ class FriendAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.redis.get(cache_key) if cached_data: - return [FriendInfo(**item) for item in json.loads(cached_data)] + return [FriendInfo(**item) for item in orjson.loads(cached_data)] res = await self.call_api("get_friend_list") - await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return [FriendInfo(**item) for item in res] async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]: diff --git a/core/api/group.py b/core/api/group.py index 46a63a9..d5b53b0 100644 --- a/core/api/group.py +++ b/core/api/group.py @@ -5,7 +5,7 @@ 等相关的 OneBot v11 API 封装。 """ from typing import List, Dict, Any, Optional -import json +import orjson from ..managers.redis_manager import redis_manager from .base import BaseAPI from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo @@ -181,10 +181,10 @@ class GroupAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.redis.get(cache_key) if cached_data: - return GroupInfo(**json.loads(cached_data)) + return GroupInfo(**orjson.loads(cached_data)) res = await self.call_api("get_group_info", {"group_id": group_id}) - await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return GroupInfo(**res) async def get_group_list(self) -> Any: @@ -232,10 +232,10 @@ class GroupAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.redis.get(cache_key) if cached_data: - return GroupMemberInfo(**json.loads(cached_data)) + return GroupMemberInfo(**orjson.loads(cached_data)) res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id}) - await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 return GroupMemberInfo(**res) async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]: diff --git a/core/config_loader.py b/core/config_loader.py index 948beca..9bb2d06 100644 --- a/core/config_loader.py +++ b/core/config_loader.py @@ -8,9 +8,8 @@ from pathlib import Path import tomllib from pydantic import ValidationError from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel -from .utils.logger import logger, ModuleLogger +from .utils.logger import ModuleLogger from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError -from .utils.error_codes import ErrorCode, create_error_response class Config: diff --git a/core/managers/admin_manager.py b/core/managers/admin_manager.py index 3cd33ce..92f9a38 100644 --- a/core/managers/admin_manager.py +++ b/core/managers/admin_manager.py @@ -4,7 +4,7 @@ 该模块负责管理机器人的管理员列表。 它现在以 Redis 作为主要数据源,文件仅用作备份。 """ -import json +import orjson import os from typing import Set @@ -66,7 +66,7 @@ class AdminManager(Singleton): try: if os.path.exists(self.data_file): with open(self.data_file, "r", encoding="utf-8") as f: - data = json.load(f) + data = orjson.loads(f.read()) admins = data.get("admins", []) admins_to_migrate = set(int(admin_id) for admin_id in admins) @@ -76,7 +76,7 @@ class AdminManager(Singleton): else: logger.info("admin.json 文件为空或不存在,无需迁移。") - except (json.JSONDecodeError, ValueError) as e: + except ValueError as e: logger.error(f"解析 admin.json 失败,无法迁移: {e}") except Exception as e: logger.error(f"迁移管理员数据到 Redis 失败: {e}") @@ -89,7 +89,7 @@ class AdminManager(Singleton): admins = await self.get_all_admins() admin_list = [str(admin_id) for admin_id in admins] with open(self.data_file, "w", encoding="utf-8") as f: - json.dump({"admins": admin_list}, f, indent=2, ensure_ascii=False) + f.write(orjson.dumps({"admins": admin_list}, indent=2, ensure_ascii=False).decode('utf-8')) logger.debug(f"管理员列表已备份到 {self.data_file}") except Exception as e: logger.error(f"备份管理员列表到 admin.json 失败: {e}") diff --git a/core/managers/browser_manager.py b/core/managers/browser_manager.py index 0ef2036..4753d89 100644 --- a/core/managers/browser_manager.py +++ b/core/managers/browser_manager.py @@ -7,21 +7,23 @@ import asyncio from typing import Optional from playwright.async_api import async_playwright, Browser, Playwright, Page from ..utils.logger import logger +from ..utils.singleton import Singleton -class BrowserManager: +class BrowserManager(Singleton): """ 浏览器管理器(异步单例) """ - _instance = None _playwright: Optional[Playwright] = None _browser: Optional[Browser] = None _page_pool: Optional[asyncio.Queue] = None _pool_size: int = 3 - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance + def __init__(self): + """ + 初始化浏览器管理器 + """ + # 调用父类 __init__ 确保单例初始化 + super().__init__() async def initialize(self): """ diff --git a/core/managers/command_manager.py b/core/managers/command_manager.py index e79e80f..ddfc846 100644 --- a/core/managers/command_manager.py +++ b/core/managers/command_manager.py @@ -7,12 +7,9 @@ """ from typing import Any, Callable, Dict, Optional, Tuple -import os -import base64 from models.events.message import MessageSegment -from models.events.message import MessageSegment from ..config_loader import global_config from ..handlers.event_handler import MessageHandler, NoticeHandler, RequestHandler diff --git a/core/managers/image_manager.py b/core/managers/image_manager.py index b757ca2..b56b7cf 100644 --- a/core/managers/image_manager.py +++ b/core/managers/image_manager.py @@ -10,19 +10,21 @@ from jinja2 import Template from .browser_manager import browser_manager from ..utils.logger import logger +from ..utils.singleton import Singleton -class ImageManager: +class ImageManager(Singleton): """ 图片生成管理器(单例) """ - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance def __init__(self): + """ + 初始化图片生成管理器 + """ + # 检查是否已经初始化 + if hasattr(self, 'template_dir'): + return + # 模板目录 self.template_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "templates") # 临时文件目录 diff --git a/core/managers/permission_manager.py b/core/managers/permission_manager.py index 7fa2910..8986b73 100644 --- a/core/managers/permission_manager.py +++ b/core/managers/permission_manager.py @@ -4,7 +4,7 @@ 该模块负责管理用户权限,支持 admin、op、user 三个权限级别。 以 Redis Hash 作为主要数据源,文件仅用作备份和首次数据迁移。 """ -import json +import orjson import os from typing import Dict @@ -71,7 +71,7 @@ class PermissionManager(Singleton): try: if os.path.exists(self.data_file): with open(self.data_file, "r", encoding="utf-8") as f: - data = json.load(f) + data = orjson.loads(f.read()) perms_to_migrate = data.get("users", {}) if perms_to_migrate: @@ -84,7 +84,7 @@ class PermissionManager(Singleton): else: logger.info("permissions.json 文件为空或不存在,无需迁移。") - except (json.JSONDecodeError, ValueError) as e: + except ValueError as e: logger.error(f"解析 permissions.json 失败,无法迁移: {e}") except Exception as e: logger.error(f"迁移权限数据到 Redis 失败: {e}") @@ -98,7 +98,7 @@ class PermissionManager(Singleton): # Redis 返回的是 bytes,需要解码 users_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in all_perms.items()} with open(self.data_file, "w", encoding="utf-8") as f: - json.dump({"users": users_data}, f, indent=2, ensure_ascii=False) + f.write(orjson.dumps({"users": users_data}, indent=2, ensure_ascii=False).decode('utf-8')) logger.debug(f"权限数据已备份到 {self.data_file}") except Exception as e: logger.error(f"备份权限数据到 permissions.json 失败: {e}") diff --git a/core/managers/plugin_manager.py b/core/managers/plugin_manager.py index 319e571..bd48213 100644 --- a/core/managers/plugin_manager.py +++ b/core/managers/plugin_manager.py @@ -10,28 +10,41 @@ import sys from typing import Set from .command_manager import CommandManager -from ..utils.exceptions import SyncHandlerError, PluginError, PluginLoadError, PluginReloadError, PluginNotFoundError +from ..utils.exceptions import SyncHandlerError, PluginLoadError, PluginReloadError, PluginNotFoundError from ..utils.logger import logger, ModuleLogger -from ..utils.error_codes import ErrorCode, create_error_response +from ..utils.singleton import Singleton # 确保logger在模块级别可见 __all__ = ['PluginManager', 'logger'] -class PluginManager: +class PluginManager(Singleton): """ 插件管理器类 """ - def __init__(self, command_manager: "CommandManager") -> None: + def __init__(self, command_manager: "CommandManager" | None = None) -> None: """ 初始化插件管理器 :param command_manager: CommandManager的实例 """ - self.command_manager = command_manager - self.loaded_plugins: Set[str] = set() - # 创建模块专用日志记录器 - self.logger = ModuleLogger("PluginManager") + # 检查是否已经初始化 + if hasattr(self, '_command_manager'): + return + + # 只有首次初始化时才执行 + if command_manager: + self._command_manager = command_manager + self.loaded_plugins: Set[str] = set() + # 创建模块专用日志记录器 + self.logger = ModuleLogger("PluginManager") + + @property + def command_manager(self): + """ + 获取命令管理器实例 + """ + return self._command_manager def load_all_plugins(self) -> None: """ @@ -99,12 +112,12 @@ class PluginManager: self.logger.warning(f"尝试重载一个未被加载的插件: {full_module_name},将按首次加载处理。") if full_module_name not in sys.modules: - error = PluginNotFoundError( + reload_error = PluginNotFoundError( plugin_name=full_module_name, message="模块未在sys.modules中找到" ) - self.logger.error(f"重载失败: {error.message}") - self.logger.log_custom_exception(error) + self.logger.error(f"重载失败: {reload_error.message}") + self.logger.log_custom_exception(reload_error) return try: diff --git a/core/managers/redis_manager.py b/core/managers/redis_manager.py index afad47d..cd43670 100644 --- a/core/managers/redis_manager.py +++ b/core/managers/redis_manager.py @@ -1,18 +1,20 @@ import redis.asyncio as redis from ..config_loader import global_config as config from ..utils.logger import logger +from ..utils.singleton import Singleton -class RedisManager: +class RedisManager(Singleton): """ Redis 连接管理器(异步单例) """ - _instance = None _redis = None - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance + def __init__(self): + """ + 初始化 Redis 管理器 + """ + # 调用父类 __init__ 确保单例初始化 + super().__init__() async def initialize(self): """ diff --git a/core/utils/__init__.py b/core/utils/__init__.py index 6d8b745..6708178 100644 --- a/core/utils/__init__.py +++ b/core/utils/__init__.py @@ -6,7 +6,6 @@ # 导出核心工具 from .logger import logger from .exceptions import * -from .json_utils import * from .singleton import singleton from .executor import run_in_thread_pool, initialize_executor from .performance import ( diff --git a/core/utils/error_codes.py b/core/utils/error_codes.py index 50e103e..4d3f74a 100644 --- a/core/utils/error_codes.py +++ b/core/utils/error_codes.py @@ -3,6 +3,7 @@ 该模块定义了项目中使用的错误码和统一的错误响应格式,确保所有模块返回一致的错误信息。 """ +from typing import Optional # 错误码定义 class ErrorCode: @@ -142,7 +143,7 @@ def get_error_message(code: int) -> str: return ERROR_MESSAGES.get(code, ERROR_MESSAGES[ErrorCode.UNKNOWN_ERROR]) -def create_error_response(code: int, message: str = None, data: dict = None, request_id: str = None) -> dict: +def create_error_response(code: int, message: Optional[str] = None, data: Optional[dict] = None, request_id: Optional[str] = None) -> dict: """ 创建统一格式的错误响应 @@ -172,7 +173,7 @@ def create_error_response(code: int, message: str = None, data: dict = None, req return response -def exception_to_error_response(exception: Exception, code: int = None, request_id: str = None) -> dict: +def exception_to_error_response(exception: Exception, code: Optional[int] = None, request_id: Optional[str] = None) -> dict: """ 将异常对象转换为统一格式的错误响应 diff --git a/core/utils/json_utils.py b/core/utils/json_utils.py deleted file mode 100644 index c18b40d..0000000 --- a/core/utils/json_utils.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -JSON 工具模块 - -统一使用高性能的 orjson 库进行 JSON 序列化和反序列化。 -如果 orjson 不可用,则回退到标准库 json。 -""" -from typing import Any, Union -import json - -# 在模块加载时检查 orjson 是否可用 -try: - import orjson - _orjson_available = True -except ImportError: - _orjson_available = False - -def dumps(obj: Any) -> str: - """ - 将对象序列化为 JSON 字符串。 - """ - if _orjson_available: - # orjson.dumps 返回 bytes,需要 decode - return orjson.dumps(obj).decode("utf-8") - else: - return json.dumps(obj, ensure_ascii=False) - -def loads(json_str: Union[str, bytes]) -> Any: - """ - 将 JSON 字符串反序列化为对象。 - """ - if _orjson_available: - return orjson.loads(json_str) - else: - return json.loads(json_str) diff --git a/core/utils/performance.py b/core/utils/performance.py index 7e13b88..2dd6cb1 100644 --- a/core/utils/performance.py +++ b/core/utils/performance.py @@ -109,7 +109,7 @@ class PerformanceStats: performance_stats = PerformanceStats() -def timeit(func: Callable = None, *, log_level: int = logging.INFO, collect_stats: bool = True): +def timeit(func: Optional[Callable] = None, *, log_level: int = logging.INFO, collect_stats: bool = True): """ 函数执行时间分析装饰器(支持同步和异步) @@ -261,7 +261,7 @@ class memory_profile: logger.info(f"[内存分析] 使用内存: {memory_used:.2f} MB") -def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1): +def memory_profile_decorator(func: Optional[Callable] = None, *, interval: float = 0.1): """ 内存分析装饰器(支持同步函数) @@ -296,7 +296,7 @@ def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1): return decorator(func) -def performance_monitor(func: Callable = None, *, threshold: float = 1.0): +def performance_monitor(func: Optional[Callable] = None, *, threshold: float = 1.0): """ 性能监控装饰器 仅当函数执行时间超过阈值时记录日志 diff --git a/core/utils/singleton.py b/core/utils/singleton.py index 27604e5..5f2d103 100644 --- a/core/utils/singleton.py +++ b/core/utils/singleton.py @@ -1,7 +1,7 @@ """ 通用单例模式基类 """ -from typing import Any, Dict, Optional, Type, TypeVar +from typing import Any, Dict, Optional, Type, TypeVar, cast T = TypeVar('T') @@ -29,9 +29,9 @@ class Singleton: Returns: T: 单例实例 """ - # 使用全局字典存储实例,避免类型检查问题 + # 使用全局字典存储实例,修复类型检查问题 if cls not in _instance_store: - _instance_store[cls] = super().__new__(cls) + _instance_store[cls] = super(Singleton, cls).__new__(cls) return _instance_store[cls] def __init__(self) -> None: @@ -67,7 +67,7 @@ def singleton(cls: Type[T]) -> Type[T]: nonlocal class_instance if class_instance is None: # 使用super()调用原始类的__new__方法 - class_instance = cls(*args, **kwargs) + class_instance = super(SingletonClass, cls).__new__(cls) return class_instance # 复制类的元数据 diff --git a/core/ws_pool.py b/core/ws_pool.py new file mode 100644 index 0000000..86be046 --- /dev/null +++ b/core/ws_pool.py @@ -0,0 +1,231 @@ +""" +WebSocket 连接池模块 + +该模块实现了 WebSocket 连接池功能,用于管理多个 WebSocket 连接, +提高并发处理能力和连接复用效率。 +""" +import asyncio +import websockets +from websockets.legacy.client import WebSocketClientProtocol +from typing import Optional, Dict, Any, cast +import uuid +from loguru import logger + +from .config_loader import global_config +from .utils.exceptions import WebSocketError, WebSocketConnectionError + + +class WSConnection: + """ + WebSocket 连接包装类 + + 封装单个 WebSocket 连接的状态和操作 + """ + def __init__(self, conn: WebSocketClientProtocol, conn_id: str): + self.conn = conn + self.conn_id = conn_id + self.last_used = asyncio.get_event_loop().time() + self.is_active = True + self._pending_requests: Dict[str, asyncio.Future] = {} + + async def send(self, data: dict): + """ + 发送数据到 WebSocket 连接 + """ + if not self.is_active: + raise WebSocketError(f"连接 {self.conn_id} 已关闭") + + try: + await self.conn.send(data) + self.last_used = asyncio.get_event_loop().time() + except Exception as e: + self.is_active = False + raise WebSocketError(f"发送数据失败: {e}") + + async def recv(self): + """ + 从 WebSocket 连接接收数据 + """ + if not self.is_active: + raise WebSocketError(f"连接 {self.conn_id} 已关闭") + + try: + data = await self.conn.recv() + self.last_used = asyncio.get_event_loop().time() + return data + except Exception as e: + self.is_active = False + raise WebSocketError(f"接收数据失败: {e}") + + async def close(self): + """ + 关闭 WebSocket 连接 + """ + if self.is_active: + self.is_active = False + await self.conn.close() + + +class WSConnectionPool: + """ + WebSocket 连接池 + + 管理多个 WebSocket 连接,提供连接的获取、释放和回收功能 + """ + def __init__(self, pool_size: int = 3, max_idle_time: int = 300): + """ + 初始化连接池 + + :param pool_size: 连接池大小 + :param max_idle_time: 连接最大空闲时间(秒) + """ + self.pool_size = pool_size + self.max_idle_time = max_idle_time + self.pool: asyncio.Queue[WSConnection] = asyncio.Queue(maxsize=pool_size) + self._closed = False + self._cleanup_task: Optional[asyncio.Task] = None + + # 从全局配置读取参数 + self.url = global_config.napcat_ws.uri + self.token = global_config.napcat_ws.token + self.reconnect_interval = global_config.napcat_ws.reconnect_interval + + logger.info(f"WebSocket 连接池初始化完成,大小: {pool_size}") + + async def initialize(self): + """ + 初始化连接池,创建初始连接 + """ + if self._closed: + raise WebSocketError("连接池已关闭") + + # 启动连接清理任务 + self._cleanup_task = asyncio.create_task(self._cleanup_idle_connections()) + + # 创建初始连接 + for _ in range(self.pool_size): + try: + conn = await self._create_connection() + await self.pool.put(conn) + logger.info(f"WebSocket 连接 {conn.conn_id} 已创建并加入连接池") + except Exception as e: + logger.error(f"创建初始连接失败: {e}") + + async def _create_connection(self) -> WSConnection: + """ + 创建新的 WebSocket 连接 + """ + headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} + + try: + conn_id = str(uuid.uuid4()) + websocket_raw = await websockets.connect( + self.url, additional_headers=headers + ) + websocket = cast(WebSocketClientProtocol, websocket_raw) + + conn = WSConnection(websocket, conn_id) + logger.info(f"WebSocket 连接 {conn_id} 已建立") + return conn + except Exception as e: + raise WebSocketConnectionError(f"创建 WebSocket 连接失败: {e}") + + async def get_connection(self) -> WSConnection: + """ + 从连接池获取一个连接 + """ + if self._closed: + raise WebSocketError("连接池已关闭") + + try: + # 尝试从连接池获取连接 + conn = await asyncio.wait_for(self.pool.get(), timeout=5) + + # 检查连接是否活跃 + if not conn.is_active: + logger.warning(f"连接 {conn.conn_id} 已失效,重新创建") + return await self._create_connection() + + return conn + except asyncio.TimeoutError: + # 连接池为空,创建新连接 + logger.warning("连接池为空,创建临时连接") + return await self._create_connection() + except Exception as e: + raise WebSocketError(f"获取连接失败: {e}") + + async def release_connection(self, conn: WSConnection): + """ + 释放连接回连接池 + """ + if self._closed: + await conn.close() + return + + if not conn.is_active: + logger.warning(f"连接 {conn.conn_id} 已失效,不返回连接池") + return + + try: + if self.pool.full(): + # 连接池已满,关闭该连接 + await conn.close() + logger.info(f"连接池已满,关闭连接 {conn.conn_id}") + else: + await self.pool.put(conn) + logger.debug(f"连接 {conn.conn_id} 已返回连接池") + except Exception as e: + logger.error(f"释放连接失败: {e}") + await conn.close() + + async def _cleanup_idle_connections(self): + """ + 清理空闲连接任务 + """ + while not self._closed: + await asyncio.sleep(60) # 每分钟检查一次 + + try: + # 检查连接池中的连接 + new_pool = asyncio.Queue(maxsize=self.pool_size) + current_time = asyncio.get_event_loop().time() + + while not self.pool.empty(): + conn = await self.pool.get() + + if current_time - conn.last_used > self.max_idle_time: + # 连接空闲时间过长,关闭 + await conn.close() + logger.info(f"清理空闲连接 {conn.conn_id}") + else: + # 放回新队列 + await new_pool.put(conn) + + # 替换原连接池 + self.pool = new_pool + except Exception as e: + logger.error(f"清理空闲连接失败: {e}") + + async def close(self): + """ + 关闭连接池 + """ + if self._closed: + return + + self._closed = True + + # 停止清理任务 + if self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + + # 关闭所有连接 + while not self.pool.empty(): + conn = await self.pool.get() + await conn.close() + + logger.info("WebSocket 连接池已关闭") \ No newline at end of file diff --git a/docs/core-concepts/architecture.md b/docs/core-concepts/architecture.md index 20dcb17..9d42c90 100644 --- a/docs/core-concepts/architecture.md +++ b/docs/core-concepts/architecture.md @@ -54,3 +54,118 @@ graph LR 别几把开多个实例。。。 * **Browser Pool**: 浏览器页面提前开好,用完洗干净放回去 * **Connection Pool**: Redis 和 HTTP 请求都用连接池 + +## 4. 技术栈全景 + +NEO Bot 的“骨架”是由一堆现代 Python 库和技术堆起来的。下面这张清单能让你一眼看清整个项目的技术选型。 + +### 编程语言与运行时 +* **Python 3.14**: 镀铬酸钾创项目的时候用的 Python 3.14 3.14兼容JIT,那就这样吧 +* **JIT (Just-In-Time)**: 启动时加 `-X jit` 参数,运行时把热点代码编译成机器码 +* **Mypyc (AOT)**: 核心模块(`core/ws.py`, `core/managers/*.py`)编译成C扩展,机器码运行 + +### 异步与网络 +* **asyncio**: Python 原生异步框架,所有 IO 操作都是非阻塞的 +* **uvloop (Linux)**: 替代 asyncio 默认事件循环,性能更高 +* **IOCP (Windows)**: Windows 上的高性能 IO 完成端口 +* **aiohttp**: 异步 HTTP 客户端/服务器,用于 API 请求和 WebSocket 通信 +* **websockets**: 纯粹的 WebSocket 客户端/服务器库 +* **Playwright**: 浏览器自动化工具,负责截图、页面渲染 + +### 数据与存储 +* **Redis**: 内存数据库,用于缓存帮助图片、会话状态等 +* **orjson**: Rust 编写的 JSON 序列化库,比标准 `json` 快很多 +* **Pydantic**: 数据验证与设置管理,配置文件、API 请求/响应都靠它 + +### 工具与工具链 +* **Loguru**: 结构化日志记录,输出漂亮且支持文件轮转 +* **Watchdog**: 文件系统监控,实现插件热重载 +* **Jinja2**: 模板引擎,渲染 HTML 页面然后转为图片 +* **Pillow**: 图像处理库,负责图片格式转换、尺寸调整 +* **BeautifulSoup4**: HTML 解析,B站、抖音等链接解析插件在用 +* **httpx**: 异步 HTTP 客户端,某些插件用它发请求 + +### 测试与开发 +* **Pytest**: 测试框架,写单元测试、集成测试 +* **Docker**: 容器化,沙箱执行用户代码时可能用到 +* **cryptography**: 加密解密,处理一些安全相关的操作 + +### 架构模式 +* **Singleton (单例)**: 全局唯一实例,所有管理器都是单例 +* **Connection Pool (连接池)**: Redis 连接、HTTP 会话都复用 +* **Plugin System (插件系统)**: 动态导入、装饰器注册,一个 `.py` 文件就是一个插件 + +## 5. Python 动态语言特性运用 + +Python 是一门“动态”语言,这意味着你可以在运行时做很多静态语言做不到的事情。NEO Bot 大量利用了这些特性,让框架变得灵活、易扩展。 + +### 装饰器 (Decorator) +* **何用**: 给函数“贴上标签”,告诉框架这个函数是干什么的 +* **何处**: + * `@matcher.command("echo")` – 注册一个消息指令 + * `@matcher.on_message()` – 注册一个通用消息处理器 + * `@matcher.on_notice()` – 注册一个通知事件处理器 +* **何原理**: 装饰器本质上是一个高阶函数,它接收被装饰的函数,然后把它“注册”到某个管理器里 + +### 动态导入 (Dynamic Import) +* **何用**: 不需要在代码开头写死 `import`,运行时根据情况加载模块 +* **何处**: `PluginManager.load_all_plugins()` 用 `importlib.import_module()` 扫描 `plugins/` 目录,找到 `.py` 文件就导入 +* **何原理**: Python 的模块系统是完全动态的,`import` 语句实际上调用了 `__import__()` 函数 + +### 自省 (Introspection) +* **何用**: 让代码能“看到”自己的结构,比如函数属于哪个模块、有哪些参数 +* **何处**: + * `inspect.getmodule(func)` – 获取函数所在的模块名,用于记录插件来源 + * `func.__name__`, `func.__module__` – 获取函数名和模块名 +* **何原理**: Python 把几乎所有元信息都存在对象的 `__dict__` 里,你可以随时翻看 + +### 鸭子类型 (Duck Typing) +* **何用**: “如果它走起来像鸭子,叫起来像鸭子,那它就是鸭子。”——不检查类型,只检查行为 +* **何处**: + * 事件处理器不要求事件对象必须是某个类,只要它有 `post_type`、`user_id` 等属性就行 + * 插件不需要继承某个基类,只要它有 `__plugin_meta__` 字典就行 +* **何原理**: Python 的变量没有类型,类型是对象自己的事。只要对象有你需要的方法或属性,你就可以调用它 + +### 反射 (Reflection) +* **何用**: 在运行时检查、修改对象的结构 +* **何处**: + * `getattr(module, "__plugin_meta__")` – 获取插件的元数据字典 + * `hasattr(event, "raw_message")` – 检查事件对象是否有某个属性 + * `setattr()` – 动态设置属性(虽然用得少) +* **何原理**: Python 的对象本质上就是字典(`__dict__`),`getattr`/`setattr` 就是对这个字典的操作 + +### 元编程 (Metaprogramming) +* **何用**: 在代码运行时改变代码的行为 +* **何处**: + * `Singleton` 基类重写 `__new__` 方法,控制实例创建,确保全局只有一个实例 + * 装饰器在函数定义时修改函数,给它添加额外逻辑 +* **何原理**: Python 的类也是对象(类型对象),你可以通过修改类来影响它所有实例的行为 + +### 上下文管理器 (Context Manager) +* **何用**: 安全地获取和释放资源,比如文件、网络连接、浏览器页面 +* **何处**: + * `async with browser_manager.get_page() as page:` – 从页面池获取一个页面,用完后自动放回 + * `async with aiohttp.ClientSession() as session:` – 发起 HTTP 请求后自动关闭会话 +* **何原理**: `__enter__`/`__exit__`(同步)或 `__aenter__`/`__aexit__`(异步)协议 + +### 描述符 (Descriptor) +* **何用**: 控制属性访问的逻辑,比如把方法伪装成属性 +* **何处**: + * `@property` – 把方法变成只读属性,比如 `PluginManager.command_manager` + * `@property.setter` – 给属性设置值时的自定义逻辑 +* **何原理**: 描述符是一个实现了 `__get__`、`__set__` 或 `__delete__` 方法的类 + +### 猴子补丁 (Monkey Patching) +* **何用**: 在运行时修改模块、类或对象,通常用于测试或修复第三方库 +* **何处**: 测试中可能会用 `unittest.mock.patch` 临时替换某个函数,模拟它的行为 +* **何原理**: Python 的模块和类都是可变的,你可以直接给它们赋值新属性 + +### eval/exec +* **何用**: 执行字符串形式的 Python 代码 +* **何处**: `code_py.py` 插件中,用户发送的代码片段会被 `exec()` 执行,实现代码沙箱功能 +* **何原理**: Python 解释器本身就是一个运行时环境,`eval()` 用于表达式,`exec()` 用于语句 + +### 类型提示 (Type Hints) +* **何用**: 虽然 Python 是动态类型,但类型提示能让代码更清晰,工具(如 Mypy)也能做静态检查 +* **何处**: 几乎所有函数和方法的参数、返回值都加了类型提示,这让 Mypyc 编译成为可能 +* **何原理**: 类型提示只是注解,运行时通常被忽略(除非你用 `typing` 模块做检查) diff --git a/docs/development-standards.md b/docs/development-standards.md new file mode 100644 index 0000000..a7f4c37 --- /dev/null +++ b/docs/development-standards.md @@ -0,0 +1,357 @@ +# NEO Bot 开发规范与公约 + +写代码很简单,但写出**高性能、不炸裂、好维护**的代码需要遵守规矩。 + +本文档定义了 NEO Bot 项目的开发守则、编码公约、注意事项和代码规范。所有贡献者和插件开发者都**必须**遵循这些规范,确保机器人稳定运行、代码质量统一。 + +> 如果你觉得规范太麻烦,可以问问镀铬酸钾,他会给你一对一教学。。。但最好还是遵守规矩。 + +**补充阅读**: +- [插件开发最佳实践](./plugin-development/best-practices.md) - 必读!写插件的基本规矩 +- [项目结构](./project-structure.md) - 了解代码组织 +- [核心概念](./core-concepts/architecture.md) - 理解框架设计 + +## 1. 开发守则(基本原则) + +### 1.1 异步优先原则 +- **绝对不要阻塞事件循环**:NeoBot 采用单线程异步架构,任何同步阻塞操作都会导致整个机器人卡死。 + - **禁止**:`time.sleep()`、同步 `requests`、密集 CPU 计算 + - **必须**:使用 `await asyncio.sleep()`、异步 HTTP 客户端、线程池执行同步任务 + +- **异步任务处理**:长时间运行的任务应使用 `run_in_thread_pool` 或 `asyncio.create_task` 执行,避免阻塞主循环。 + +### 1.2 资源管理原则 +- **连接复用**:禁止重复创建连接和资源实例。 + - HTTP 请求:使用全局 `aiohttp` session 或插件提供的 `get_session()` + - 浏览器操作:必须通过 `browser_manager.get_page()` 获取页面实例 + - Redis 连接:通过 `redis_manager` 单例访问 + +- **资源池化**:浏览器页面、数据库连接等资源必须使用框架提供的池化机制。 + +### 1.3 性能优化原则 +- **缓存策略**:频繁访问的外部数据必须添加缓存。 + - 短期缓存(<1小时):使用 Redis 或内存缓存 + - 长期缓存:考虑持久化存储 + +- **懒加载**:大型资源或初始化成本高的组件应延迟加载。 + +### 1.4 错误处理原则 +- **异常捕获**:所有插件代码都应妥善处理异常,避免插件崩溃影响机器人运行。 +- **友好提示**:向用户返回清晰、友好的错误信息,避免暴露内部细节。 +- **日志记录**:所有重要操作和错误都应记录日志,使用 `ModuleLogger` 进行结构化日志记录。 + +### 1.5 安全性原则 +- **输入验证**:所有用户输入都必须验证和清理,防止注入攻击。 +- **代码执行安全**:使用沙箱环境执行用户代码,隔离系统资源。 +- **权限控制**:严格遵循权限管理系统,禁止越权操作。 + +### 1.6 跨平台兼容性原则 +NEO Bot 需要在 **Windows 开发环境**和 **Linux 生产环境**中都能正常运行。 + +- **路径处理**: + - 使用 `pathlib.Path` 处理文件路径,避免手动拼接字符串。 + - 使用 `/` 作为路径分隔符(Python 会自动转换)。 + - 禁止使用硬编码的路径分隔符(如 `\\` 或 `/`)。 + +- **系统依赖**: + - 避免使用平台特定的系统调用。 + - 如果必须使用,通过 `sys.platform` 检测平台并提供备选方案。 + +- **环境变量**: + - 通过 `global_config` 获取配置,而不是直接读取环境变量。 + - 敏感信息(如 API 密钥)必须通过配置管理。 + +- **文件权限**: + - 在 Linux 上注意文件权限设置,确保 Bot 有读写权限。 + - 临时文件应放在系统临时目录(`tempfile.gettempdir()`)。 + +## 2. 公约(编码约定) + +### 2.1 项目结构公约 +- **插件位置**:所有插件必须放置在 `plugins/` 目录下,单个 `.py` 文件或包含 `__init__.py` 的目录。 +- **模块导入**:遵循标准导入顺序:标准库 → 第三方库 → 本地模块。 +- **配置访问**:通过 `global_config` 单例访问配置,禁止硬编码配置值。 + +### 2.2 单例管理器使用公约 +NEO Bot 的核心是**单例管理器**(`core/managers/` 目录下的类)。所有全局资源都必须通过管理器访问。 + +- **禁止重复创建**:严禁自己实例化管理器类,必须通过导入的单例对象访问。 + - ✅ `from core.managers.redis_manager import redis_manager` + - ❌ `RedisManager()` (错误!会创建新实例) + +- **资源池化**:浏览器页面、数据库连接等资源必须使用管理器提供的池化接口。 + - ✅ `await browser_manager.get_page()` + - ❌ `playwright.chromium.launch()` (错误!会创建新浏览器进程) + +- **数据一致性**:单例管理器确保全局数据一致性,不要绕过管理器直接操作底层资源。 + +### 2.2.1 单例模式实现机制 + +NEO Bot 提供了两种单例模式实现方式,位于 `core/utils/singleton.py`: + +#### 1. Singleton 基类(继承方式) +```python +from core.utils.singleton import Singleton + +class MyManager(Singleton): + """通过继承 Singleton 基类实现单例""" + + def __init__(self, config: dict): + """ + 初始化管理器 + + Args: + config: 配置字典 + """ + # 调用父类 __init__ 确保单例初始化 + super().__init__() + + # 检查是否已经初始化(防止 __init__ 被多次调用) + if hasattr(self, '_my_initialized') and self._my_initialized: + return + + # 执行一次性初始化逻辑 + self.config = config + self.resource = None + self._initialize_resource() + + # 标记为已初始化 + self._my_initialized = True + + def _initialize_resource(self): + """初始化资源(只执行一次)""" + self.resource = initialize_resource(self.config) + + async def cleanup(self): + """清理资源(单例管理器应实现清理方法)""" + if self.resource: + await self.resource.close() +``` + +**特性**: +- 通过重写 `__new__` 方法确保每个类只有一个实例 +- 自动处理重复初始化问题,但建议子类添加额外的初始化检查 +- 使用全局字典存储实例,避免类型检查问题 +- 支持带参数的 `__init__` 方法 + +#### 2. @singleton 装饰器(装饰器方式) +```python +from core.utils.singleton import singleton + +@singleton +class MyManager: + """通过装饰器实现单例""" + + def __init__(self, config): + self.config = config + self.resource = None + + async def initialize(self): + self.resource = await load_resource() +``` + +**特性**: +- 将普通类转换为单例类,无需修改类继承关系 +- 保持原始类的元数据(名称、文档字符串等) +- 适用于无法修改基类的现有类 + +#### 3. 使用建议 +- **新管理器类**:优先使用 **Singleton 基类继承方式**,结构更清晰 +- **现有类转换**:使用 **@singleton 装饰器**,无需重构 +- **线程安全**:两种方式都假设在单线程异步环境中使用,如需线程安全请自行加锁 +- **导入方式**:单例类应该通过模块级别的实例变量导出,如: + ```python + # redis_manager.py + class RedisManager(Singleton): + ... + + redis_manager = RedisManager() # 创建并导出单例实例 + ``` + +#### 4. 重要注意事项 +- **避免循环导入**:单例类的导入应谨慎处理,避免循环依赖 +- **初始化时机**:单例在第一次导入时创建,确保所需依赖已就绪 +- **__init__ 调用语义**:虽然实例是单例,但 `__init__` 方法可能被多次调用(如重新导入时)。应添加额外检查确保一次性逻辑只执行一次。 +- **资源清理**:单例管理器应在程序退出时清理资源,实现 `cleanup()` 方法 + +### 2.3 命名公约 +- **文件命名**:使用小写字母和下划线,例如 `my_plugin.py`。 +- **类命名**:使用 `PascalCase`,例如 `CommandManager`。 +- **函数/方法命名**:使用 `snake_case`,例如 `handle_message`。 +- **常量命名**:使用 `UPPER_SNAKE_CASE`,例如 `MAX_RETRY_COUNT`。 +- **变量命名**:使用 `snake_case`,具有描述性,避免单字母变量(循环变量除外)。 + +### 2.4 类型提示公约 +- **全面使用**:所有函数、方法、类属性都应提供类型提示。**这是强制要求**,因为框架开启了 Mypyc 编译。 +- **性能优化**:类型提示不仅帮助发现 Bug,还能让 Mypyc 生成更高效的机器码。 +- **返回类型**:明确指定返回类型,包括 `None`。 +- **复杂类型**:使用 `typing` 模块中的泛型,如 `List[str]`、`Dict[str, Any]`。 +- **可选参数**:使用 `Optional[...]` 或默认值 `= None`。 + +**示例**: +```python +# 好的写法 +async def handle(event: MessageEvent, args: list[str]) -> None: + ... + +# 不好写法(会导致编译警告) +async def handle(event, args): + ... +``` + +### 2.5 异常处理公约 +- **自定义异常**:使用框架提供的自定义异常类,避免抛出通用的 `Exception`。 +- **异常链**:保留原始异常信息,使用 `raise CustomError(...) from e`。 +- **资源清理**:使用 `try...finally` 或上下文管理器确保资源释放。 + +### 2.6 日志记录公约 +- **模块化日志**:每个模块使用 `ModuleLogger("ModuleName")` 创建专用日志记录器。 +- **日志级别**: + - `DEBUG`:调试信息,详细操作记录 + - `INFO`:常规操作记录 + - `WARNING`:预期内的异常或潜在问题 + - `ERROR`:操作失败但可恢复的错误 + - `CRITICAL`:系统级错误,需要立即关注 + +## 3. 注意事项(常见陷阱) + +### 3.1 异步编程陷阱 +- **忘记 await**:异步函数调用必须使用 `await`,否则任务不会执行。 +- **阻塞循环**:在异步函数中执行同步阻塞操作会冻结整个事件循环。 +- **任务泄漏**:创建的异步任务必须被妥善管理,避免内存泄漏。 + +### 3.2 资源管理陷阱 +- **连接泄漏**:未关闭的 HTTP 连接、数据库连接会导致资源耗尽。 +- **文件句柄泄漏**:打开的文件必须显式关闭或使用上下文管理器。 +- **缓存雪崩**:大量缓存同时过期可能导致系统负载激增。 + +### 3.3 性能陷阱 +- **N+1 查询**:避免在循环中执行数据库或 API 查询,使用批量操作。 +- **内存泄漏**:大型数据结构长时间驻留内存,应定期清理。 +- **重复计算**:相同的计算结果应缓存,避免重复计算。 + +### 3.4 安全性陷阱 +- **SQL 注入**:使用参数化查询或 ORM,禁止拼接 SQL 字符串。 +- **XSS 攻击**:渲染用户输入时必须进行 HTML 转义。 +- **路径遍历**:用户提供的文件路径必须进行规范化验证。 + +## 4. 代码规范(详细指南) + +### 4.1 文档字符串规范(强制要求) + +**所有代码必须包含完整的文档字符串**,这是项目质量保证的基础。缺少文档字符串的代码将在审查中被拒绝。 + +- **模块级文档**:每个模块顶部应有文档字符串,描述模块功能和主要接口。 +- **类级文档**:每个类应有文档字符串,描述类的职责、使用方法和示例。 +- **函数/方法级文档**:每个公共函数和方法必须有文档字符串,包含参数说明、返回值和异常信息。 + +**参数注释要求**: +1. 每个参数都必须有类型提示和简要说明 +2. 返回值必须明确说明类型和含义 +3. 可能抛出的异常必须列出 +4. 复杂的函数应提供使用示例 + +**标准格式示例:** +```python +def process_data(data: List[str], timeout: int = 30) -> Dict[str, Any]: + """ + 处理数据并返回结果。 + + Args: + data: 待处理的数据列表 + timeout: 操作超时时间,单位秒 + + Returns: + 处理结果的字典,包含状态和详情 + + Raises: + TimeoutError: 处理超时时抛出 + ValueError: 数据格式错误时抛出 + + Example: + >>> result = process_data(["item1", "item2"]) + >>> print(result["status"]) + """ +``` + +### 4.2 函数设计规范 +- **单一职责**:每个函数只做一件事,保持功能简洁。 +- **参数数量**:函数参数不宜过多(建议 ≤5),过多时考虑使用 `dataclass` 或 `TypedDict`。 +- **默认参数**:避免使用可变对象作为默认参数,使用 `None` 代替。 + +### 4.3 类设计规范 +- **单一职责**:每个类应有明确的单一职责。 +- **组合优于继承**:优先使用组合而非继承来复用功能。 +- **属性访问控制**:使用 `@property` 装饰器控制属性访问,隐藏内部实现。 + +### 4.4 错误处理规范 +- **错误码统一**:使用框架定义的 `ErrorCode` 枚举,避免自定义魔法数字。 +- **错误响应格式**:使用 `exception_to_error_response` 生成统一错误响应。 +- **用户友好消息**:错误消息应同时包含技术细节(日志)和用户友好提示(界面)。 + +### 4.5 测试规范 +- **测试覆盖率**:核心功能应达到 80% 以上的测试覆盖率。 +- **异步测试**:使用 `pytest-asyncio` 进行异步测试。 +- **测试隔离**:测试用例之间应相互独立,避免依赖执行顺序。 + +## 5. 提交与协作规范 + +### 5.1 Git 提交规范 +- **提交信息格式**:遵循 Conventional Commits 规范 + ``` + (): + + + +