refactor(managers): 重构单例管理器实现并优化代码结构

feat(ws_pool): 新增 WebSocket 连接池实现

perf(json): 使用 orjson 替代标准 json 库提升性能

style: 清理未使用的导入和冗余代码

docs: 更新架构文档和开发规范

test: 添加 WebSocket 连接池测试用例

fix(plugins): 修复自动审批插件 API 调用参数格式
This commit is contained in:
2026-01-22 16:23:03 +08:00
parent d7d732ff4d
commit caf5b06097
42 changed files with 1285 additions and 261 deletions

View File

@@ -12,7 +12,7 @@ WebSocket 连接。它是整个机器人框架的底层通信基础。
- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
"""
import asyncio
import json
import orjson
from typing import Any, Dict, Optional, cast
import uuid
@@ -25,11 +25,12 @@ from .bot import Bot
from .config_loader import global_config
from .managers.command_manager import matcher
from .utils.executor import CodeExecutor
from .utils.logger import logger, ModuleLogger
from .utils.logger import ModuleLogger
from .utils.exceptions import (
WebSocketError, WebSocketConnectionError, WebSocketAuthenticationError
WebSocketError, WebSocketConnectionError
)
from .utils.error_codes import ErrorCode, create_error_response
from .ws_pool import WSConnectionPool
class WS:
@@ -37,11 +38,14 @@ class WS:
WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。
"""
def __init__(self, code_executor: Optional[CodeExecutor] = None) -> None:
def __init__(self, code_executor: Optional[CodeExecutor] = None, use_pool: bool = True) -> None:
"""
初始化 WebSocket 客户端。
从全局配置中读取 WebSocket URI、访问令牌Token和重连间隔。
:param code_executor: 代码执行器实例
:param use_pool: 是否使用连接池
"""
# 读取参数
cfg = global_config.napcat_ws
@@ -55,6 +59,8 @@ class WS:
self.bot: Bot | None = None
self.self_id: int | None = None
self.code_executor = code_executor
self.use_pool = use_pool
self.pool: Optional[WSConnectionPool] = None
# 创建模块专用日志记录器
self.logger = ModuleLogger("WebSocket")
@@ -68,46 +74,112 @@ class WS:
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
if self.use_pool:
# 使用连接池模式
self.pool = WSConnectionPool(pool_size=3)
await self.pool.initialize()
self.logger.success("WebSocket 连接池初始化完成")
# 启动连接池监听循环
await self._pool_listen_loop()
else:
# 单连接模式
while True:
try:
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
conn_error = WebSocketConnectionError(
message=f"WebSocket连接失败: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {conn_error.message}")
self.logger.log_custom_exception(conn_error)
except Exception as e:
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _pool_listen_loop(self):
"""
连接池模式下的监听循环
"""
while True:
try:
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except websockets.exceptions.AuthenticationError as e:
error = WebSocketAuthenticationError(
message=f"WebSocket认证失败: {str(e)}",
code=ErrorCode.WS_AUTH_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {error.message}")
self.logger.log_custom_exception(error)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
error = WebSocketConnectionError(
message=f"连接断开或服务器拒绝访问: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.warning(f"连接失败: {error.message}")
# 从连接池获取一个连接
conn = await self.pool.get_connection()
try:
# 监听连接上的消息
async for message in conn.conn:
await self._handle_message(message, conn)
except Exception as e:
self.logger.error(f"连接 {conn.conn_id} 监听异常: {e}")
finally:
# 释放连接回连接池
await self.pool.release_connection(conn)
except Exception as e:
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
self.logger.error(f"连接池监听循环异常: {e}")
await asyncio.sleep(self.reconnect_interval)
async def _handle_message(self, message: str, conn):
"""
处理从连接池获取的消息
"""
try:
data = orjson.loads(message)
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
# 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id)
if not future.done():
future.set_result(data)
return
# 2. 处理上报事件
# 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件
if "post_type" in data:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except orjson.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
# 如果message是bytes类型需要先解码
decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message
self.logger.debug(f"原始消息: {decoded_message}")
except Exception as e:
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"解析消息异常: {error.message}")
self.logger.log_custom_exception(error)
async def _listen_loop(self, websocket_connection: WebSocketClientProtocol) -> None:
"""
@@ -121,7 +193,7 @@ class WS:
"""
async for message in websocket_connection:
try:
data = json.loads(message)
data = orjson.loads(message)
# 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
@@ -138,14 +210,16 @@ class WS:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except json.JSONDecodeError as e:
except orjson.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
self.logger.debug(f"原始消息: {message}")
# 如果message是bytes类型需要先解码
decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message
self.logger.debug(f"原始消息: {decoded_message}")
except Exception as e:
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
@@ -236,48 +310,93 @@ class WS:
dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
表示失败的字典。
"""
if not self.ws:
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
if self.use_pool:
# 使用连接池模式
if not self.pool:
self.logger.error("调用 API 失败: WebSocket 连接池未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接池未初始化",
data={"action": action, "params": params}
)
# 从连接池获取一个连接
conn = await self.pool.get_connection()
try:
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
from websockets.protocol import State
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
if getattr(self.ws, "state", None) is not State.OPEN:
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
try:
await conn.send(orjson.dumps(payload))
result = await asyncio.wait_for(future, timeout=30.0)
return result
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)
finally:
# 释放连接回连接池
await self.pool.release_connection(conn)
else:
# 单连接模式
if not self.ws:
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
from websockets.protocol import State
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
if getattr(self.ws, "state", None) is not State.OPEN:
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
try:
await self.ws.send(json.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
try:
await self.ws.send(orjson.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)

View File

@@ -4,7 +4,7 @@
该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、
状态设置等相关的 OneBot v11 API 封装。
"""
import json
import orjson
from typing import Dict, Any
from .base import BaseAPI
from models.objects import LoginInfo, VersionInfo, Status
@@ -30,10 +30,10 @@ class AccountAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return LoginInfo(**json.loads(cached_data))
return LoginInfo(**orjson.loads(cached_data))
res = await self.call_api("get_login_info")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return LoginInfo(**res)
async def get_version_info(self) -> VersionInfo:
@@ -43,7 +43,7 @@ class AccountAPI(BaseAPI):
Returns:
VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。
"""
res = await self.call_api("get_version_info")
res = await self.call_api("get_friend_list")
return VersionInfo(**res)
async def get_status(self) -> Status:
@@ -189,10 +189,10 @@ class AccountAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return json.loads(cached_data)
return orjson.loads(cached_data)
res = await self.call_api("get_friend_list")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return res
async def get_group_list(self, no_cache: bool = False) -> list:
@@ -209,9 +209,9 @@ class AccountAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return json.loads(cached_data)
return orjson.loads(cached_data)
res = await self.call_api("get_group_list")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return res

View File

@@ -4,7 +4,7 @@
该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息
等相关的 OneBot v11 API 封装。
"""
import json
import orjson
from typing import List, Dict, Any
from .base import BaseAPI
from models.objects import FriendInfo, StrangerInfo
@@ -44,10 +44,10 @@ class FriendAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return StrangerInfo(**json.loads(cached_data))
return StrangerInfo(**orjson.loads(cached_data))
res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache})
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return StrangerInfo(**res)
async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]:
@@ -64,10 +64,10 @@ class FriendAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return [FriendInfo(**item) for item in json.loads(cached_data)]
return [FriendInfo(**item) for item in orjson.loads(cached_data)]
res = await self.call_api("get_friend_list")
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return [FriendInfo(**item) for item in res]
async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]:

View File

@@ -5,7 +5,7 @@
等相关的 OneBot v11 API 封装。
"""
from typing import List, Dict, Any, Optional
import json
import orjson
from ..managers.redis_manager import redis_manager
from .base import BaseAPI
from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo
@@ -181,10 +181,10 @@ class GroupAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return GroupInfo(**json.loads(cached_data))
return GroupInfo(**orjson.loads(cached_data))
res = await self.call_api("get_group_info", {"group_id": group_id})
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return GroupInfo(**res)
async def get_group_list(self) -> Any:
@@ -232,10 +232,10 @@ class GroupAPI(BaseAPI):
if not no_cache:
cached_data = await redis_manager.redis.get(cache_key)
if cached_data:
return GroupMemberInfo(**json.loads(cached_data))
return GroupMemberInfo(**orjson.loads(cached_data))
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id})
await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
await redis_manager.redis.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return GroupMemberInfo(**res)
async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]:

View File

@@ -8,9 +8,8 @@ from pathlib import Path
import tomllib
from pydantic import ValidationError
from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel
from .utils.logger import logger, ModuleLogger
from .utils.logger import ModuleLogger
from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError
from .utils.error_codes import ErrorCode, create_error_response
class Config:

View File

@@ -4,7 +4,7 @@
该模块负责管理机器人的管理员列表。
它现在以 Redis 作为主要数据源,文件仅用作备份。
"""
import json
import orjson
import os
from typing import Set
@@ -66,7 +66,7 @@ class AdminManager(Singleton):
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = json.load(f)
data = orjson.loads(f.read())
admins = data.get("admins", [])
admins_to_migrate = set(int(admin_id) for admin_id in admins)
@@ -76,7 +76,7 @@ class AdminManager(Singleton):
else:
logger.info("admin.json 文件为空或不存在,无需迁移。")
except (json.JSONDecodeError, ValueError) as e:
except ValueError as e:
logger.error(f"解析 admin.json 失败,无法迁移: {e}")
except Exception as e:
logger.error(f"迁移管理员数据到 Redis 失败: {e}")
@@ -89,7 +89,7 @@ class AdminManager(Singleton):
admins = await self.get_all_admins()
admin_list = [str(admin_id) for admin_id in admins]
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump({"admins": admin_list}, f, indent=2, ensure_ascii=False)
f.write(orjson.dumps({"admins": admin_list}, indent=2, ensure_ascii=False).decode('utf-8'))
logger.debug(f"管理员列表已备份到 {self.data_file}")
except Exception as e:
logger.error(f"备份管理员列表到 admin.json 失败: {e}")

View File

@@ -7,21 +7,23 @@ import asyncio
from typing import Optional
from playwright.async_api import async_playwright, Browser, Playwright, Page
from ..utils.logger import logger
from ..utils.singleton import Singleton
class BrowserManager:
class BrowserManager(Singleton):
"""
浏览器管理器(异步单例)
"""
_instance = None
_playwright: Optional[Playwright] = None
_browser: Optional[Browser] = None
_page_pool: Optional[asyncio.Queue] = None
_pool_size: int = 3
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
初始化浏览器管理器
"""
# 调用父类 __init__ 确保单例初始化
super().__init__()
async def initialize(self):
"""

View File

@@ -7,12 +7,9 @@
"""
from typing import Any, Callable, Dict, Optional, Tuple
import os
import base64
from models.events.message import MessageSegment
from models.events.message import MessageSegment
from ..config_loader import global_config
from ..handlers.event_handler import MessageHandler, NoticeHandler, RequestHandler

View File

@@ -10,19 +10,21 @@ from jinja2 import Template
from .browser_manager import browser_manager
from ..utils.logger import logger
from ..utils.singleton import Singleton
class ImageManager:
class ImageManager(Singleton):
"""
图片生成管理器(单例)
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
初始化图片生成管理器
"""
# 检查是否已经初始化
if hasattr(self, 'template_dir'):
return
# 模板目录
self.template_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "templates")
# 临时文件目录

View File

@@ -4,7 +4,7 @@
该模块负责管理用户权限,支持 admin、op、user 三个权限级别。
以 Redis Hash 作为主要数据源,文件仅用作备份和首次数据迁移。
"""
import json
import orjson
import os
from typing import Dict
@@ -71,7 +71,7 @@ class PermissionManager(Singleton):
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = json.load(f)
data = orjson.loads(f.read())
perms_to_migrate = data.get("users", {})
if perms_to_migrate:
@@ -84,7 +84,7 @@ class PermissionManager(Singleton):
else:
logger.info("permissions.json 文件为空或不存在,无需迁移。")
except (json.JSONDecodeError, ValueError) as e:
except ValueError as e:
logger.error(f"解析 permissions.json 失败,无法迁移: {e}")
except Exception as e:
logger.error(f"迁移权限数据到 Redis 失败: {e}")
@@ -98,7 +98,7 @@ class PermissionManager(Singleton):
# Redis 返回的是 bytes需要解码
users_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in all_perms.items()}
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump({"users": users_data}, f, indent=2, ensure_ascii=False)
f.write(orjson.dumps({"users": users_data}, indent=2, ensure_ascii=False).decode('utf-8'))
logger.debug(f"权限数据已备份到 {self.data_file}")
except Exception as e:
logger.error(f"备份权限数据到 permissions.json 失败: {e}")

View File

@@ -10,28 +10,41 @@ import sys
from typing import Set
from .command_manager import CommandManager
from ..utils.exceptions import SyncHandlerError, PluginError, PluginLoadError, PluginReloadError, PluginNotFoundError
from ..utils.exceptions import SyncHandlerError, PluginLoadError, PluginReloadError, PluginNotFoundError
from ..utils.logger import logger, ModuleLogger
from ..utils.error_codes import ErrorCode, create_error_response
from ..utils.singleton import Singleton
# 确保logger在模块级别可见
__all__ = ['PluginManager', 'logger']
class PluginManager:
class PluginManager(Singleton):
"""
插件管理器类
"""
def __init__(self, command_manager: "CommandManager") -> None:
def __init__(self, command_manager: "CommandManager" | None = None) -> None:
"""
初始化插件管理器
:param command_manager: CommandManager的实例
"""
self.command_manager = command_manager
self.loaded_plugins: Set[str] = set()
# 创建模块专用日志记录器
self.logger = ModuleLogger("PluginManager")
# 检查是否已经初始化
if hasattr(self, '_command_manager'):
return
# 只有首次初始化时才执行
if command_manager:
self._command_manager = command_manager
self.loaded_plugins: Set[str] = set()
# 创建模块专用日志记录器
self.logger = ModuleLogger("PluginManager")
@property
def command_manager(self):
"""
获取命令管理器实例
"""
return self._command_manager
def load_all_plugins(self) -> None:
"""
@@ -99,12 +112,12 @@ class PluginManager:
self.logger.warning(f"尝试重载一个未被加载的插件: {full_module_name},将按首次加载处理。")
if full_module_name not in sys.modules:
error = PluginNotFoundError(
reload_error = PluginNotFoundError(
plugin_name=full_module_name,
message="模块未在sys.modules中找到"
)
self.logger.error(f"重载失败: {error.message}")
self.logger.log_custom_exception(error)
self.logger.error(f"重载失败: {reload_error.message}")
self.logger.log_custom_exception(reload_error)
return
try:

View File

@@ -1,18 +1,20 @@
import redis.asyncio as redis
from ..config_loader import global_config as config
from ..utils.logger import logger
from ..utils.singleton import Singleton
class RedisManager:
class RedisManager(Singleton):
"""
Redis 连接管理器(异步单例)
"""
_instance = None
_redis = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
初始化 Redis 管理器
"""
# 调用父类 __init__ 确保单例初始化
super().__init__()
async def initialize(self):
"""

View File

@@ -6,7 +6,6 @@
# 导出核心工具
from .logger import logger
from .exceptions import *
from .json_utils import *
from .singleton import singleton
from .executor import run_in_thread_pool, initialize_executor
from .performance import (

View File

@@ -3,6 +3,7 @@
该模块定义了项目中使用的错误码和统一的错误响应格式,确保所有模块返回一致的错误信息。
"""
from typing import Optional
# 错误码定义
class ErrorCode:
@@ -142,7 +143,7 @@ def get_error_message(code: int) -> str:
return ERROR_MESSAGES.get(code, ERROR_MESSAGES[ErrorCode.UNKNOWN_ERROR])
def create_error_response(code: int, message: str = None, data: dict = None, request_id: str = None) -> dict:
def create_error_response(code: int, message: Optional[str] = None, data: Optional[dict] = None, request_id: Optional[str] = None) -> dict:
"""
创建统一格式的错误响应
@@ -172,7 +173,7 @@ def create_error_response(code: int, message: str = None, data: dict = None, req
return response
def exception_to_error_response(exception: Exception, code: int = None, request_id: str = None) -> dict:
def exception_to_error_response(exception: Exception, code: Optional[int] = None, request_id: Optional[str] = None) -> dict:
"""
将异常对象转换为统一格式的错误响应

View File

@@ -1,34 +0,0 @@
"""
JSON 工具模块
统一使用高性能的 orjson 库进行 JSON 序列化和反序列化。
如果 orjson 不可用,则回退到标准库 json。
"""
from typing import Any, Union
import json
# 在模块加载时检查 orjson 是否可用
try:
import orjson
_orjson_available = True
except ImportError:
_orjson_available = False
def dumps(obj: Any) -> str:
"""
将对象序列化为 JSON 字符串。
"""
if _orjson_available:
# orjson.dumps 返回 bytes需要 decode
return orjson.dumps(obj).decode("utf-8")
else:
return json.dumps(obj, ensure_ascii=False)
def loads(json_str: Union[str, bytes]) -> Any:
"""
将 JSON 字符串反序列化为对象。
"""
if _orjson_available:
return orjson.loads(json_str)
else:
return json.loads(json_str)

View File

@@ -109,7 +109,7 @@ class PerformanceStats:
performance_stats = PerformanceStats()
def timeit(func: Callable = None, *, log_level: int = logging.INFO, collect_stats: bool = True):
def timeit(func: Optional[Callable] = None, *, log_level: int = logging.INFO, collect_stats: bool = True):
"""
函数执行时间分析装饰器(支持同步和异步)
@@ -261,7 +261,7 @@ class memory_profile:
logger.info(f"[内存分析] 使用内存: {memory_used:.2f} MB")
def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1):
def memory_profile_decorator(func: Optional[Callable] = None, *, interval: float = 0.1):
"""
内存分析装饰器(支持同步函数)
@@ -296,7 +296,7 @@ def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1):
return decorator(func)
def performance_monitor(func: Callable = None, *, threshold: float = 1.0):
def performance_monitor(func: Optional[Callable] = None, *, threshold: float = 1.0):
"""
性能监控装饰器
仅当函数执行时间超过阈值时记录日志

View File

@@ -1,7 +1,7 @@
"""
通用单例模式基类
"""
from typing import Any, Dict, Optional, Type, TypeVar
from typing import Any, Dict, Optional, Type, TypeVar, cast
T = TypeVar('T')
@@ -29,9 +29,9 @@ class Singleton:
Returns:
T: 单例实例
"""
# 使用全局字典存储实例,避免类型检查问题
# 使用全局字典存储实例,修复类型检查问题
if cls not in _instance_store:
_instance_store[cls] = super().__new__(cls)
_instance_store[cls] = super(Singleton, cls).__new__(cls)
return _instance_store[cls]
def __init__(self) -> None:
@@ -67,7 +67,7 @@ def singleton(cls: Type[T]) -> Type[T]:
nonlocal class_instance
if class_instance is None:
# 使用super()调用原始类的__new__方法
class_instance = cls(*args, **kwargs)
class_instance = super(SingletonClass, cls).__new__(cls)
return class_instance
# 复制类的元数据

231
core/ws_pool.py Normal file
View File

@@ -0,0 +1,231 @@
"""
WebSocket 连接池模块
该模块实现了 WebSocket 连接池功能,用于管理多个 WebSocket 连接,
提高并发处理能力和连接复用效率。
"""
import asyncio
import websockets
from websockets.legacy.client import WebSocketClientProtocol
from typing import Optional, Dict, Any, cast
import uuid
from loguru import logger
from .config_loader import global_config
from .utils.exceptions import WebSocketError, WebSocketConnectionError
class WSConnection:
"""
WebSocket 连接包装类
封装单个 WebSocket 连接的状态和操作
"""
def __init__(self, conn: WebSocketClientProtocol, conn_id: str):
self.conn = conn
self.conn_id = conn_id
self.last_used = asyncio.get_event_loop().time()
self.is_active = True
self._pending_requests: Dict[str, asyncio.Future] = {}
async def send(self, data: dict):
"""
发送数据到 WebSocket 连接
"""
if not self.is_active:
raise WebSocketError(f"连接 {self.conn_id} 已关闭")
try:
await self.conn.send(data)
self.last_used = asyncio.get_event_loop().time()
except Exception as e:
self.is_active = False
raise WebSocketError(f"发送数据失败: {e}")
async def recv(self):
"""
从 WebSocket 连接接收数据
"""
if not self.is_active:
raise WebSocketError(f"连接 {self.conn_id} 已关闭")
try:
data = await self.conn.recv()
self.last_used = asyncio.get_event_loop().time()
return data
except Exception as e:
self.is_active = False
raise WebSocketError(f"接收数据失败: {e}")
async def close(self):
"""
关闭 WebSocket 连接
"""
if self.is_active:
self.is_active = False
await self.conn.close()
class WSConnectionPool:
"""
WebSocket 连接池
管理多个 WebSocket 连接,提供连接的获取、释放和回收功能
"""
def __init__(self, pool_size: int = 3, max_idle_time: int = 300):
"""
初始化连接池
:param pool_size: 连接池大小
:param max_idle_time: 连接最大空闲时间(秒)
"""
self.pool_size = pool_size
self.max_idle_time = max_idle_time
self.pool: asyncio.Queue[WSConnection] = asyncio.Queue(maxsize=pool_size)
self._closed = False
self._cleanup_task: Optional[asyncio.Task] = None
# 从全局配置读取参数
self.url = global_config.napcat_ws.uri
self.token = global_config.napcat_ws.token
self.reconnect_interval = global_config.napcat_ws.reconnect_interval
logger.info(f"WebSocket 连接池初始化完成,大小: {pool_size}")
async def initialize(self):
"""
初始化连接池,创建初始连接
"""
if self._closed:
raise WebSocketError("连接池已关闭")
# 启动连接清理任务
self._cleanup_task = asyncio.create_task(self._cleanup_idle_connections())
# 创建初始连接
for _ in range(self.pool_size):
try:
conn = await self._create_connection()
await self.pool.put(conn)
logger.info(f"WebSocket 连接 {conn.conn_id} 已创建并加入连接池")
except Exception as e:
logger.error(f"创建初始连接失败: {e}")
async def _create_connection(self) -> WSConnection:
"""
创建新的 WebSocket 连接
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
try:
conn_id = str(uuid.uuid4())
websocket_raw = await websockets.connect(
self.url, additional_headers=headers
)
websocket = cast(WebSocketClientProtocol, websocket_raw)
conn = WSConnection(websocket, conn_id)
logger.info(f"WebSocket 连接 {conn_id} 已建立")
return conn
except Exception as e:
raise WebSocketConnectionError(f"创建 WebSocket 连接失败: {e}")
async def get_connection(self) -> WSConnection:
"""
从连接池获取一个连接
"""
if self._closed:
raise WebSocketError("连接池已关闭")
try:
# 尝试从连接池获取连接
conn = await asyncio.wait_for(self.pool.get(), timeout=5)
# 检查连接是否活跃
if not conn.is_active:
logger.warning(f"连接 {conn.conn_id} 已失效,重新创建")
return await self._create_connection()
return conn
except asyncio.TimeoutError:
# 连接池为空,创建新连接
logger.warning("连接池为空,创建临时连接")
return await self._create_connection()
except Exception as e:
raise WebSocketError(f"获取连接失败: {e}")
async def release_connection(self, conn: WSConnection):
"""
释放连接回连接池
"""
if self._closed:
await conn.close()
return
if not conn.is_active:
logger.warning(f"连接 {conn.conn_id} 已失效,不返回连接池")
return
try:
if self.pool.full():
# 连接池已满,关闭该连接
await conn.close()
logger.info(f"连接池已满,关闭连接 {conn.conn_id}")
else:
await self.pool.put(conn)
logger.debug(f"连接 {conn.conn_id} 已返回连接池")
except Exception as e:
logger.error(f"释放连接失败: {e}")
await conn.close()
async def _cleanup_idle_connections(self):
"""
清理空闲连接任务
"""
while not self._closed:
await asyncio.sleep(60) # 每分钟检查一次
try:
# 检查连接池中的连接
new_pool = asyncio.Queue(maxsize=self.pool_size)
current_time = asyncio.get_event_loop().time()
while not self.pool.empty():
conn = await self.pool.get()
if current_time - conn.last_used > self.max_idle_time:
# 连接空闲时间过长,关闭
await conn.close()
logger.info(f"清理空闲连接 {conn.conn_id}")
else:
# 放回新队列
await new_pool.put(conn)
# 替换原连接池
self.pool = new_pool
except Exception as e:
logger.error(f"清理空闲连接失败: {e}")
async def close(self):
"""
关闭连接池
"""
if self._closed:
return
self._closed = True
# 停止清理任务
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# 关闭所有连接
while not self.pool.empty():
conn = await self.pool.get()
await conn.close()
logger.info("WebSocket 连接池已关闭")