* 滚木

* feat: 重构核心架构,增强类型安全与插件管理

本次提交对核心模块进行了深度重构,引入 Pydantic 增强配置管理的类型安全性,并全面优化了插件管理系统。

主要变更详情:

1. 核心架构与配置
   - 重构配置加载模块:引入 Pydantic 模型 (`core/config_models.py`),提供严格的配置项类型检查、验证及默认值管理。
   - 统一模块结构:规范化模块导入路径,移除冗余的 `__init__.py` 文件,提升项目结构的清晰度。
   - 性能优化:集成 Redis 缓存支持 (`RedisManager`),有效降低高频 API 调用开销,提升响应速度。

2. 插件系统升级
   - 实现热重载机制:新增插件文件变更监听功能,支持开发过程中自动重载插件,提升开发效率。
   - 优化生命周期管理:改进插件加载与卸载逻辑,支持精确卸载指定插件及其关联的命令、事件处理器和定时任务。

3. 功能特性增强
   - 新增媒体 API:引入 `MediaAPI` 模块,封装图片、语音等富媒体资源的获取与处理接口。
   - 完善权限体系:重构权限管理系统,实现管理员与操作员的分级控制,支持更细粒度的命令权限校验。

4. 代码质量与稳定性
   - 全面类型修复:解决 `mypy` 静态类型检查发现的大量类型错误(包括 `CommandManager`、`EventFactory` 及 `Bot` API 签名不匹配问题)。
   - 增强错误处理:优化消息处理管道的异常捕获机制,完善关键路径的日志记录,提升系统运行稳定性。

* feat: 添加测试用例并优化代码结构

refactor(permission_manager): 调整初始化顺序和逻辑
fix(admin_manager): 修复初始化逻辑和目录创建问题
feat(ws): 优化Bot实例初始化条件
feat(message): 增强MessageSegment功能并添加测试
feat(events): 支持字符串格式的消息解析
test: 添加核心功能测试用例
refactor(plugin_manager): 改进插件路径处理
style: 清理无用导入和代码
chore: 更新依赖项

* refactor(handler): 移除TYPE_CHECKING并直接导入Bot类

简化类型注解,直接导入Bot类而非使用TYPE_CHECKING条件导入,提高代码可读性和维护性

* fix(command_manager): 修复插件卸载时元信息移除不精确的问题

修复 CommandManager 中 unload_plugin 方法移除插件元信息时使用 startswith 导致可能误删其他插件的问题,改为精确匹配
同时调整相关测试用例验证精确匹配行为

* refactor: 清理未使用的导入和更新文档结构

docs: 添加config_models.py到项目结构文档
docs: 调整数据目录位置到core/data下
docs: 更新权限管理器文档描述

* 文档更新

* 更新thpic插件 支持一次返回多张图

* feat: 添加测试覆盖率并修复相关问题

refactor(redis_manager): 移除冗余的ConnectionError处理
refactor(event_handler): 优化Bot类型注解
refactor(factory): 移除未使用的GroupCardNoticeEvent

test: 添加全面的单元测试覆盖
- 添加test_import.py测试模块导入
- 添加test_debug.py测试插件加载调试
- 添加test_plugin_error.py测试错误处理
- 添加test_config_loader.py测试配置加载
- 添加test_redis_manager.py测试Redis管理
- 添加test_bot.py测试Bot功能
- 扩展test_models.py测试消息模型
- 添加test_plugin_manager_coverage.py测试插件管理
- 添加test_executor.py测试代码执行器
- 添加test_ws.py测试WebSocket
- 添加test_api.py测试API接口
- 添加test_core_managers.py测试核心管理模块

fix(plugin_manager): 修复插件加载日志变量问题

覆盖率已到达86%(忽略插件)

* 更新/help指令,现在会发送图片

* feat(help): 重构帮助系统为图片渲染模式

添加浏览器管理器和图片管理器,用于通过 Playwright 渲染帮助菜单为图片
重构命令管理器以支持图片缓存和同步功能
添加 HTML 模板用于帮助菜单渲染

* build: 更新依赖文件 requirements.txt

* build: 更新依赖文件

* feat: 添加性能优化和架构文档,更新依赖和核心模块

refactor(browser_manager): 实现页面池机制以提升性能
refactor(image_manager): 添加模板缓存并集成页面池
refactor(bili_parser): 迁移到异步HTTP请求并实现会话复用
docs: 新增性能优化、架构设计和最佳实践文档
chore: 更新requirements.txt添加新依赖

* docs: 更新文档内容并优化语言风格

重构所有文档内容,使用更简洁直接的语言风格
更新架构、插件开发、部署等核心文档
优化代码示例和图表说明
统一术语和格式规范

* docs: 更新文档内容,简化语言并修正格式

- 简化插件开发指南中的描述,移除冗余内容
- 调整部署文档中的Python版本说明
- 优化最佳实践文档的措辞和格式
- 更新性能优化文档,删除不准确的数据
- 重构核心概念文档,使用更简洁的语言
- 修正README中的项目描述和技术栈说明
- 更新快速上手文档,简化安装步骤
- 调整事件流转文档的描述方式
- 简化架构文档内容
- 更新指令处理文档,添加参数注入示例
- 优化单例管理器文档的表述

* refactor(core): 优化权限管理和事件模型

- 重构 AdminManager 和 PermissionManager 以 Redis 为主要数据源
- 为所有事件模型添加 slots=True 提升性能
- 更新文档说明 Mypyc 编译注意事项
- 清理测试和调试文件
- 移动静态资源到 web_static 目录

* feat: 添加模块编译脚本和导出依赖功能

refactor(events): 移除数据类的slots参数以提升兼容性
build: 更新requirements.txt依赖列表

* docs: 更新性能优化文档并修复命令管理器帮助输出

更新性能优化相关文档,详细说明 Python 3.14 JIT 编译器的使用方法和原理,补充与 Mypyc 的互补策略。同时修复命令管理器中帮助信息的输出方式,移除图片发送仅保留文本输出。

调整部署文档结构,明确两种性能优化方案(AOT 和 JIT)的配置方法和适用场景。完善架构文档中关于 JIT 的原理和启用方式说明。

* feat(help): 重构帮助菜单界面并优化样式

refactor(bili_parser): 修复 API 响应 content-type 问题
fix(command_manager): 添加帮助图片获取的错误处理
docs(deployment): 简化部署文档并移除 JIT 相关内容

* feat: 新增自动同意请求插件和API文档

docs: 更新文档结构和内容

* refactor(scripts): 重构并优化脚本文件结构

feat(scripts): 添加Python环境检查脚本
feat(scripts): 增强依赖导出脚本功能
perf(plugins/bili_parser): 优化B站解析器性能和代码结构
style(plugins/bili_parser): 统一代码风格和常量命名

* fix(scripts): 修复编码问题并添加错误追踪

在compile_machine_code.py中添加utf-8编码设置以避免潜在编码问题
添加traceback.print_exc()以在编译失败时打印完整错误堆栈
更新.gitignore以忽略config.toml文件

* feat(性能分析): 实现性能分析工具模块并添加相关测试

添加性能分析工具模块,包括时间测量、内存分析和性能统计功能
添加测试文件和示例配置,完善性能分析工具的使用场景
在工具模块中实现单例装饰器并导出到__init__.py

* feat(douyin_parser): 新增抖音视频解析插件

refactor(performance): 移除未使用的asyncio导入并优化性能测试
style(compile_modules): 修正字符串引号格式
chore: 删除废弃的编译脚本和临时文件
fix(bili_parser): 增强B站链接解析的健壮性
refactor(singleton): 重构单例模式实现
docs: 更新配置文件和事件模型注释

* feat: 添加抖音视频解析插件并优化代码结构

添加抖音视频解析插件,支持自动解析抖音分享链接并提取视频信息。优化现有代码结构,包括:
- 重构单例模式实现
- 移除未使用的导入和文件
- 修复性能测试脚本中的异步调用
- 优化消息事件模型中的权限常量定义
- 改进编译脚本的错误处理
- 增强B站解析插件的稳定性

同时清理了多个废弃脚本和临时文件,提升代码可维护性。

* 1

* Delete core/data/temp/help_menu.png

* fix(权限管理): 增强权限检查的类型安全并修复权限引用

修复权限检查中可能传入非Permission类型导致的错误,将echo插件的权限引用从MessageEvent.ADMIN迁移到Permission.ADMIN

* redis取消tls

* feat(github_parser): 添加GitHub仓库信息查询功能

- 新增github_parser插件,支持通过命令或自动解析链接查询GitHub仓库信息
- 添加github_repo.html模板用于渲染仓库信息图片
- 优化图片管理器支持高质量截图和CSS缩放
- 重构消息事件类权限常量定义方式
- 更新帮助页面样式为三列布局并优化响应式设计

* feat(web_parser): 新增通用web链接解析插件框架

refactor: 重构B站、抖音、GitHub解析器为模块化结构

fix(executor): 增强docker容器错误处理和回调稳定性

style(templates): 优化帮助页面和代码执行结果的样式

perf(web_parser): 添加API缓存和消息去重机制

docs: 更新插件元信息和注释

chore: 移除旧的独立解析器插件文件

* refactor(managers): 重构单例管理器实现并优化代码结构

feat(ws_pool): 新增 WebSocket 连接池实现

perf(json): 使用 orjson 替代标准 json 库提升性能

style: 清理未使用的导入和冗余代码

docs: 更新架构文档和开发规范

test: 添加 WebSocket 连接池测试用例

fix(plugins): 修复自动审批插件 API 调用参数格式

---------

Co-authored-by: baby20162016 <2185823427@qq.com>
Co-authored-by: web vscode <youremail@example.com>
This commit is contained in:
镀铬酸钾
2026-01-22 16:25:13 +08:00
committed by GitHub
parent 8a6af1ea2a
commit 12d1eb3438
42 changed files with 1285 additions and 261 deletions

View File

@@ -12,7 +12,7 @@ WebSocket 连接。它是整个机器人框架的底层通信基础。
- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
"""
import asyncio
import json
import orjson
from typing import Any, Dict, Optional, cast
import uuid
@@ -25,11 +25,12 @@ from .bot import Bot
from .config_loader import global_config
from .managers.command_manager import matcher
from .utils.executor import CodeExecutor
from .utils.logger import logger, ModuleLogger
from .utils.logger import ModuleLogger
from .utils.exceptions import (
WebSocketError, WebSocketConnectionError, WebSocketAuthenticationError
WebSocketError, WebSocketConnectionError
)
from .utils.error_codes import ErrorCode, create_error_response
from .ws_pool import WSConnectionPool
class WS:
@@ -37,11 +38,14 @@ class WS:
WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。
"""
def __init__(self, code_executor: Optional[CodeExecutor] = None) -> None:
def __init__(self, code_executor: Optional[CodeExecutor] = None, use_pool: bool = True) -> None:
"""
初始化 WebSocket 客户端。
从全局配置中读取 WebSocket URI、访问令牌Token和重连间隔。
:param code_executor: 代码执行器实例
:param use_pool: 是否使用连接池
"""
# 读取参数
cfg = global_config.napcat_ws
@@ -55,6 +59,8 @@ class WS:
self.bot: Bot | None = None
self.self_id: int | None = None
self.code_executor = code_executor
self.use_pool = use_pool
self.pool: Optional[WSConnectionPool] = None
# 创建模块专用日志记录器
self.logger = ModuleLogger("WebSocket")
@@ -68,46 +74,112 @@ class WS:
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
if self.use_pool:
# 使用连接池模式
self.pool = WSConnectionPool(pool_size=3)
await self.pool.initialize()
self.logger.success("WebSocket 连接池初始化完成")
# 启动连接池监听循环
await self._pool_listen_loop()
else:
# 单连接模式
while True:
try:
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
conn_error = WebSocketConnectionError(
message=f"WebSocket连接失败: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {conn_error.message}")
self.logger.log_custom_exception(conn_error)
except Exception as e:
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _pool_listen_loop(self):
"""
连接池模式下的监听循环
"""
while True:
try:
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except websockets.exceptions.AuthenticationError as e:
error = WebSocketAuthenticationError(
message=f"WebSocket认证失败: {str(e)}",
code=ErrorCode.WS_AUTH_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {error.message}")
self.logger.log_custom_exception(error)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
error = WebSocketConnectionError(
message=f"连接断开或服务器拒绝访问: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.warning(f"连接失败: {error.message}")
# 从连接池获取一个连接
conn = await self.pool.get_connection()
try:
# 监听连接上的消息
async for message in conn.conn:
await self._handle_message(message, conn)
except Exception as e:
self.logger.error(f"连接 {conn.conn_id} 监听异常: {e}")
finally:
# 释放连接回连接池
await self.pool.release_connection(conn)
except Exception as e:
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
self.logger.error(f"连接池监听循环异常: {e}")
await asyncio.sleep(self.reconnect_interval)
async def _handle_message(self, message: str, conn):
"""
处理从连接池获取的消息
"""
try:
data = orjson.loads(message)
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
# 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id)
if not future.done():
future.set_result(data)
return
# 2. 处理上报事件
# 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件
if "post_type" in data:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except orjson.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
# 如果message是bytes类型需要先解码
decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message
self.logger.debug(f"原始消息: {decoded_message}")
except Exception as e:
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"解析消息异常: {error.message}")
self.logger.log_custom_exception(error)
async def _listen_loop(self, websocket_connection: WebSocketClientProtocol) -> None:
"""
@@ -121,7 +193,7 @@ class WS:
"""
async for message in websocket_connection:
try:
data = json.loads(message)
data = orjson.loads(message)
# 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
@@ -138,14 +210,16 @@ class WS:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except json.JSONDecodeError as e:
except orjson.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
self.logger.debug(f"原始消息: {message}")
# 如果message是bytes类型需要先解码
decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message
self.logger.debug(f"原始消息: {decoded_message}")
except Exception as e:
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
@@ -236,48 +310,93 @@ class WS:
dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
表示失败的字典。
"""
if not self.ws:
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
if self.use_pool:
# 使用连接池模式
if not self.pool:
self.logger.error("调用 API 失败: WebSocket 连接池未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接池未初始化",
data={"action": action, "params": params}
)
# 从连接池获取一个连接
conn = await self.pool.get_connection()
try:
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
from websockets.protocol import State
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
if getattr(self.ws, "state", None) is not State.OPEN:
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
try:
await conn.send(orjson.dumps(payload))
result = await asyncio.wait_for(future, timeout=30.0)
return result
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)
finally:
# 释放连接回连接池
await self.pool.release_connection(conn)
else:
# 单连接模式
if not self.ws:
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
from websockets.protocol import State
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
if getattr(self.ws, "state", None) is not State.OPEN:
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
try:
await self.ws.send(json.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
try:
await self.ws.send(orjson.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)

View File

@@ -4,7 +4,7 @@
该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、
状态设置等相关的 OneBot v11 API 封装。
"""
import json
import orjson
from typing import Dict, Any
from .base import BaseAPI
from models.objects import LoginInfo, VersionInfo, Status
@@ -30,10 +30,10 @@ class AccountAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return LoginInfo(**json.loads(cached_data))
return LoginInfo(**orjson.loads(cached_data))
res = await self.call_api("get_login_info")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return LoginInfo(**res)
async def get_version_info(self) -> VersionInfo:
@@ -43,7 +43,7 @@ class AccountAPI(BaseAPI):
Returns:
VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。
"""
res = await self.call_api("get_version_info")
res = await self.call_api("get_friend_list")
return VersionInfo(**res)
async def get_status(self) -> Status:
@@ -189,10 +189,10 @@ class AccountAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return json.loads(cached_data)
return orjson.loads(cached_data)
res = await self.call_api("get_friend_list")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return res
async def get_group_list(self, no_cache: bool = False) -> list:
@@ -209,9 +209,9 @@ class AccountAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return json.loads(cached_data)
return orjson.loads(cached_data)
res = await self.call_api("get_group_list")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return res

View File

@@ -4,7 +4,7 @@
该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息
等相关的 OneBot v11 API 封装。
"""
import json
import orjson
from typing import List, Dict, Any
from .base import BaseAPI
from models.objects import FriendInfo, StrangerInfo
@@ -44,10 +44,10 @@ class FriendAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return StrangerInfo(**json.loads(cached_data))
return StrangerInfo(**orjson.loads(cached_data))
res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache})
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return StrangerInfo(**res)
async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]:
@@ -64,10 +64,10 @@ class FriendAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return [FriendInfo(**item) for item in json.loads(cached_data)]
return [FriendInfo(**item) for item in orjson.loads(cached_data)]
res = await self.call_api("get_friend_list")
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return [FriendInfo(**item) for item in res]
async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]:

View File

@@ -5,7 +5,7 @@
等相关的 OneBot v11 API 封装。
"""
from typing import List, Dict, Any, Optional
import json
import orjson
from ..managers.redis_manager import redis_manager
from .base import BaseAPI
from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo
@@ -181,10 +181,10 @@ class GroupAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return GroupInfo(**json.loads(cached_data))
return GroupInfo(**orjson.loads(cached_data))
res = await self.call_api("get_group_info", {"group_id": group_id})
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return GroupInfo(**res)
async def get_group_list(self) -> Any:
@@ -232,10 +232,10 @@ class GroupAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return GroupMemberInfo(**json.loads(cached_data))
return GroupMemberInfo(**orjson.loads(cached_data))
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id})
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return GroupMemberInfo(**res)
async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]:

View File

@@ -8,9 +8,8 @@ from pathlib import Path
import tomllib
from pydantic import ValidationError
from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel
from .utils.logger import logger, ModuleLogger
from .utils.logger import ModuleLogger
from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError
from .utils.error_codes import ErrorCode, create_error_response
class Config:

View File

@@ -4,7 +4,7 @@
该模块负责管理机器人的管理员列表。
它现在以 Redis 作为主要数据源,文件仅用作备份。
"""
import json
import orjson
import os
from typing import Set
@@ -66,7 +66,7 @@ class AdminManager(Singleton):
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = json.load(f)
data = orjson.loads(f.read())
admins = data.get("admins", [])
admins_to_migrate = set(int(admin_id) for admin_id in admins)
@@ -76,7 +76,7 @@ class AdminManager(Singleton):
else:
logger.info("admin.json 文件为空或不存在,无需迁移。")
except (json.JSONDecodeError, ValueError) as e:
except ValueError as e:
logger.error(f"解析 admin.json 失败,无法迁移: {e}")
except Exception as e:
logger.error(f"迁移管理员数据到 Redis 失败: {e}")
@@ -89,7 +89,7 @@ class AdminManager(Singleton):
admins = await self.get_all_admins()
admin_list = [str(admin_id) for admin_id in admins]
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump({"admins": admin_list}, f, indent=2, ensure_ascii=False)
f.write(orjson.dumps({"admins": admin_list}, indent=2, ensure_ascii=False).decode('utf-8'))
logger.debug(f"管理员列表已备份到 {self.data_file}")
except Exception as e:
logger.error(f"备份管理员列表到 admin.json 失败: {e}")

View File

@@ -7,21 +7,23 @@ import asyncio
from typing import Optional
from playwright.async_api import async_playwright, Browser, Playwright, Page
from ..utils.logger import logger
from ..utils.singleton import Singleton
class BrowserManager:
class BrowserManager(Singleton):
"""
浏览器管理器(异步单例)
"""
_instance = None
_playwright: Optional[Playwright] = None
_browser: Optional[Browser] = None
_page_pool: Optional[asyncio.Queue] = None
_pool_size: int = 3
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
初始化浏览器管理器
"""
# 调用父类 __init__ 确保单例初始化
super().__init__()
async def initialize(self):
"""

View File

@@ -7,12 +7,9 @@
"""
from typing import Any, Callable, Dict, Optional, Tuple
import os
import base64
from models.events.message import MessageSegment
from models.events.message import MessageSegment
from ..config_loader import global_config
from ..handlers.event_handler import MessageHandler, NoticeHandler, RequestHandler

View File

@@ -10,19 +10,21 @@ from jinja2 import Template
from .browser_manager import browser_manager
from ..utils.logger import logger
from ..utils.singleton import Singleton
class ImageManager:
class ImageManager(Singleton):
"""
图片生成管理器(单例)
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
初始化图片生成管理器
"""
# 检查是否已经初始化
if hasattr(self, 'template_dir'):
return
# 模板目录
self.template_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "templates")
# 临时文件目录

View File

@@ -4,7 +4,7 @@
该模块负责管理用户权限,支持 admin、op、user 三个权限级别。
以 Redis Hash 作为主要数据源,文件仅用作备份和首次数据迁移。
"""
import json
import orjson
import os
from typing import Dict
@@ -71,7 +71,7 @@ class PermissionManager(Singleton):
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = json.load(f)
data = orjson.loads(f.read())
perms_to_migrate = data.get("users", {})
if perms_to_migrate:
@@ -84,7 +84,7 @@ class PermissionManager(Singleton):
else:
logger.info("permissions.json 文件为空或不存在,无需迁移。")
except (json.JSONDecodeError, ValueError) as e:
except ValueError as e:
logger.error(f"解析 permissions.json 失败,无法迁移: {e}")
except Exception as e:
logger.error(f"迁移权限数据到 Redis 失败: {e}")
@@ -98,7 +98,7 @@ class PermissionManager(Singleton):
# Redis 返回的是 bytes需要解码
users_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in all_perms.items()}
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump({"users": users_data}, f, indent=2, ensure_ascii=False)
f.write(orjson.dumps({"users": users_data}, indent=2, ensure_ascii=False).decode('utf-8'))
logger.debug(f"权限数据已备份到 {self.data_file}")
except Exception as e:
logger.error(f"备份权限数据到 permissions.json 失败: {e}")

View File

@@ -10,28 +10,41 @@ import sys
from typing import Set
from .command_manager import CommandManager
from ..utils.exceptions import SyncHandlerError, PluginError, PluginLoadError, PluginReloadError, PluginNotFoundError
from ..utils.exceptions import SyncHandlerError, PluginLoadError, PluginReloadError, PluginNotFoundError
from ..utils.logger import logger, ModuleLogger
from ..utils.error_codes import ErrorCode, create_error_response
from ..utils.singleton import Singleton
# 确保logger在模块级别可见
__all__ = ['PluginManager', 'logger']
class PluginManager:
class PluginManager(Singleton):
"""
插件管理器类
"""
def __init__(self, command_manager: "CommandManager") -> None:
def __init__(self, command_manager: "CommandManager" | None = None) -> None:
"""
初始化插件管理器
:param command_manager: CommandManager的实例
"""
self.command_manager = command_manager
self.loaded_plugins: Set[str] = set()
# 创建模块专用日志记录器
self.logger = ModuleLogger("PluginManager")
# 检查是否已经初始化
if hasattr(self, '_command_manager'):
return
# 只有首次初始化时才执行
if command_manager:
self._command_manager = command_manager
self.loaded_plugins: Set[str] = set()
# 创建模块专用日志记录器
self.logger = ModuleLogger("PluginManager")
@property
def command_manager(self):
"""
获取命令管理器实例
"""
return self._command_manager
def load_all_plugins(self) -> None:
"""
@@ -99,12 +112,12 @@ class PluginManager:
self.logger.warning(f"尝试重载一个未被加载的插件: {full_module_name},将按首次加载处理。")
if full_module_name not in sys.modules:
error = PluginNotFoundError(
reload_error = PluginNotFoundError(
plugin_name=full_module_name,
message="模块未在sys.modules中找到"
)
self.logger.error(f"重载失败: {error.message}")
self.logger.log_custom_exception(error)
self.logger.error(f"重载失败: {reload_error.message}")
self.logger.log_custom_exception(reload_error)
return
try:

View File

@@ -1,18 +1,20 @@
import redis.asyncio as redis
from ..config_loader import global_config as config
from ..utils.logger import logger
from ..utils.singleton import Singleton
class RedisManager:
class RedisManager(Singleton):
"""
Redis 连接管理器(异步单例)
"""
_instance = None
_redis = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
初始化 Redis 管理器
"""
# 调用父类 __init__ 确保单例初始化
super().__init__()
async def initialize(self):
"""

View File

@@ -6,7 +6,6 @@
# 导出核心工具
from .logger import logger
from .exceptions import *
from .json_utils import *
from .singleton import singleton
from .executor import run_in_thread_pool, initialize_executor
from .performance import (

View File

@@ -3,6 +3,7 @@
该模块定义了项目中使用的错误码和统一的错误响应格式,确保所有模块返回一致的错误信息。
"""
from typing import Optional
# 错误码定义
class ErrorCode:
@@ -142,7 +143,7 @@ def get_error_message(code: int) -> str:
return ERROR_MESSAGES.get(code, ERROR_MESSAGES[ErrorCode.UNKNOWN_ERROR])
def create_error_response(code: int, message: str = None, data: dict = None, request_id: str = None) -> dict:
def create_error_response(code: int, message: Optional[str] = None, data: Optional[dict] = None, request_id: Optional[str] = None) -> dict:
"""
创建统一格式的错误响应
@@ -172,7 +173,7 @@ def create_error_response(code: int, message: str = None, data: dict = None, req
return response
def exception_to_error_response(exception: Exception, code: int = None, request_id: str = None) -> dict:
def exception_to_error_response(exception: Exception, code: Optional[int] = None, request_id: Optional[str] = None) -> dict:
"""
将异常对象转换为统一格式的错误响应

View File

@@ -1,34 +0,0 @@
"""
JSON 工具模块
统一使用高性能的 orjson 库进行 JSON 序列化和反序列化。
如果 orjson 不可用,则回退到标准库 json。
"""
from typing import Any, Union
import json
# 在模块加载时检查 orjson 是否可用
try:
import orjson
_orjson_available = True
except ImportError:
_orjson_available = False
def dumps(obj: Any) -> str:
"""
将对象序列化为 JSON 字符串。
"""
if _orjson_available:
# orjson.dumps 返回 bytes需要 decode
return orjson.dumps(obj).decode("utf-8")
else:
return json.dumps(obj, ensure_ascii=False)
def loads(json_str: Union[str, bytes]) -> Any:
"""
将 JSON 字符串反序列化为对象。
"""
if _orjson_available:
return orjson.loads(json_str)
else:
return json.loads(json_str)

View File

@@ -109,7 +109,7 @@ class PerformanceStats:
performance_stats = PerformanceStats()
def timeit(func: Callable = None, *, log_level: int = logging.INFO, collect_stats: bool = True):
def timeit(func: Optional[Callable] = None, *, log_level: int = logging.INFO, collect_stats: bool = True):
"""
函数执行时间分析装饰器(支持同步和异步)
@@ -261,7 +261,7 @@ class memory_profile:
logger.info(f"[内存分析] 使用内存: {memory_used:.2f} MB")
def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1):
def memory_profile_decorator(func: Optional[Callable] = None, *, interval: float = 0.1):
"""
内存分析装饰器(支持同步函数)
@@ -296,7 +296,7 @@ def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1):
return decorator(func)
def performance_monitor(func: Callable = None, *, threshold: float = 1.0):
def performance_monitor(func: Optional[Callable] = None, *, threshold: float = 1.0):
"""
性能监控装饰器
仅当函数执行时间超过阈值时记录日志

View File

@@ -1,7 +1,7 @@
"""
通用单例模式基类
"""
from typing import Any, Dict, Optional, Type, TypeVar
from typing import Any, Dict, Optional, Type, TypeVar, cast
T = TypeVar('T')
@@ -29,9 +29,9 @@ class Singleton:
Returns:
T: 单例实例
"""
# 使用全局字典存储实例,避免类型检查问题
# 使用全局字典存储实例,修复类型检查问题
if cls not in _instance_store:
_instance_store[cls] = super().__new__(cls)
_instance_store[cls] = super(Singleton, cls).__new__(cls)
return _instance_store[cls]
def __init__(self) -> None:
@@ -67,7 +67,7 @@ def singleton(cls: Type[T]) -> Type[T]:
nonlocal class_instance
if class_instance is None:
# 使用super()调用原始类的__new__方法
class_instance = cls(*args, **kwargs)
class_instance = super(SingletonClass, cls).__new__(cls)
return class_instance
# 复制类的元数据

231
core/ws_pool.py Normal file
View File

@@ -0,0 +1,231 @@
"""
WebSocket 连接池模块
该模块实现了 WebSocket 连接池功能,用于管理多个 WebSocket 连接,
提高并发处理能力和连接复用效率。
"""
import asyncio
import websockets
from websockets.legacy.client import WebSocketClientProtocol
from typing import Optional, Dict, Any, cast
import uuid
from loguru import logger
from .config_loader import global_config
from .utils.exceptions import WebSocketError, WebSocketConnectionError
class WSConnection:
"""
WebSocket 连接包装类
封装单个 WebSocket 连接的状态和操作
"""
def __init__(self, conn: WebSocketClientProtocol, conn_id: str):
self.conn = conn
self.conn_id = conn_id
self.last_used = asyncio.get_event_loop().time()
self.is_active = True
self._pending_requests: Dict[str, asyncio.Future] = {}
async def send(self, data: dict):
"""
发送数据到 WebSocket 连接
"""
if not self.is_active:
raise WebSocketError(f"连接 {self.conn_id} 已关闭")
try:
await self.conn.send(data)
self.last_used = asyncio.get_event_loop().time()
except Exception as e:
self.is_active = False
raise WebSocketError(f"发送数据失败: {e}")
async def recv(self):
"""
从 WebSocket 连接接收数据
"""
if not self.is_active:
raise WebSocketError(f"连接 {self.conn_id} 已关闭")
try:
data = await self.conn.recv()
self.last_used = asyncio.get_event_loop().time()
return data
except Exception as e:
self.is_active = False
raise WebSocketError(f"接收数据失败: {e}")
async def close(self):
"""
关闭 WebSocket 连接
"""
if self.is_active:
self.is_active = False
await self.conn.close()
class WSConnectionPool:
"""
WebSocket 连接池
管理多个 WebSocket 连接,提供连接的获取、释放和回收功能
"""
def __init__(self, pool_size: int = 3, max_idle_time: int = 300):
"""
初始化连接池
:param pool_size: 连接池大小
:param max_idle_time: 连接最大空闲时间(秒)
"""
self.pool_size = pool_size
self.max_idle_time = max_idle_time
self.pool: asyncio.Queue[WSConnection] = asyncio.Queue(maxsize=pool_size)
self._closed = False
self._cleanup_task: Optional[asyncio.Task] = None
# 从全局配置读取参数
self.url = global_config.napcat_ws.uri
self.token = global_config.napcat_ws.token
self.reconnect_interval = global_config.napcat_ws.reconnect_interval
logger.info(f"WebSocket 连接池初始化完成,大小: {pool_size}")
async def initialize(self):
"""
初始化连接池,创建初始连接
"""
if self._closed:
raise WebSocketError("连接池已关闭")
# 启动连接清理任务
self._cleanup_task = asyncio.create_task(self._cleanup_idle_connections())
# 创建初始连接
for _ in range(self.pool_size):
try:
conn = await self._create_connection()
await self.pool.put(conn)
logger.info(f"WebSocket 连接 {conn.conn_id} 已创建并加入连接池")
except Exception as e:
logger.error(f"创建初始连接失败: {e}")
async def _create_connection(self) -> WSConnection:
"""
创建新的 WebSocket 连接
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
try:
conn_id = str(uuid.uuid4())
websocket_raw = await websockets.connect(
self.url, additional_headers=headers
)
websocket = cast(WebSocketClientProtocol, websocket_raw)
conn = WSConnection(websocket, conn_id)
logger.info(f"WebSocket 连接 {conn_id} 已建立")
return conn
except Exception as e:
raise WebSocketConnectionError(f"创建 WebSocket 连接失败: {e}")
async def get_connection(self) -> WSConnection:
"""
从连接池获取一个连接
"""
if self._closed:
raise WebSocketError("连接池已关闭")
try:
# 尝试从连接池获取连接
conn = await asyncio.wait_for(self.pool.get(), timeout=5)
# 检查连接是否活跃
if not conn.is_active:
logger.warning(f"连接 {conn.conn_id} 已失效,重新创建")
return await self._create_connection()
return conn
except asyncio.TimeoutError:
# 连接池为空,创建新连接
logger.warning("连接池为空,创建临时连接")
return await self._create_connection()
except Exception as e:
raise WebSocketError(f"获取连接失败: {e}")
async def release_connection(self, conn: WSConnection):
"""
释放连接回连接池
"""
if self._closed:
await conn.close()
return
if not conn.is_active:
logger.warning(f"连接 {conn.conn_id} 已失效,不返回连接池")
return
try:
if self.pool.full():
# 连接池已满,关闭该连接
await conn.close()
logger.info(f"连接池已满,关闭连接 {conn.conn_id}")
else:
await self.pool.put(conn)
logger.debug(f"连接 {conn.conn_id} 已返回连接池")
except Exception as e:
logger.error(f"释放连接失败: {e}")
await conn.close()
async def _cleanup_idle_connections(self):
"""
清理空闲连接任务
"""
while not self._closed:
await asyncio.sleep(60) # 每分钟检查一次
try:
# 检查连接池中的连接
new_pool = asyncio.Queue(maxsize=self.pool_size)
current_time = asyncio.get_event_loop().time()
while not self.pool.empty():
conn = await self.pool.get()
if current_time - conn.last_used > self.max_idle_time:
# 连接空闲时间过长,关闭
await conn.close()
logger.info(f"清理空闲连接 {conn.conn_id}")
else:
# 放回新队列
await new_pool.put(conn)
# 替换原连接池
self.pool = new_pool
except Exception as e:
logger.error(f"清理空闲连接失败: {e}")
async def close(self):
"""
关闭连接池
"""
if self._closed:
return
self._closed = True
# 停止清理任务
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# 关闭所有连接
while not self.pool.empty():
conn = await self.pool.get()
await conn.close()
logger.info("WebSocket 连接池已关闭")