This commit is contained in:
2026-01-22 01:58:16 +08:00
13 changed files with 1157 additions and 121 deletions

View File

@@ -25,7 +25,11 @@ from .bot import Bot
from .config_loader import global_config
from .managers.command_manager import matcher
from .utils.executor import CodeExecutor
from .utils.logger import logger
from .utils.logger import logger, ModuleLogger
from .utils.exceptions import (
WebSocketError, WebSocketConnectionError, WebSocketAuthenticationError
)
from .utils.error_codes import ErrorCode, create_error_response
class WS:
@@ -45,11 +49,15 @@ class WS:
self.token = cfg.token
self.reconnect_interval = cfg.reconnect_interval
# 初始化状态
self.ws: Optional[WebSocketClientProtocol] = None
self._pending_requests: Dict[str, asyncio.Future] = {}
self._pending_requests: Dict[str, asyncio.Future] = {} # echo: future
self.bot: Bot | None = None
self.self_id: int | None = None
self.code_executor = code_executor
# 创建模块专用日志记录器
self.logger = ModuleLogger("WebSocket")
async def connect(self) -> None:
"""
@@ -62,24 +70,43 @@ class WS:
while True:
try:
logger.info(f"正在尝试连接至 NapCat: {self.url}")
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
logger.success("连接成功!")
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except websockets.exceptions.AuthenticationError as e:
error = WebSocketAuthenticationError(
message=f"WebSocket认证失败: {str(e)}",
code=ErrorCode.WS_AUTH_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {error.message}")
self.logger.log_custom_exception(error)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
logger.warning(f"连接断开或服务器拒绝访问: {e}")
error = WebSocketConnectionError(
message=f"连接断开或服务器拒绝访问: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.warning(f"连接失败: {error.message}")
except Exception as e:
logger.exception(f"运行异常: {e}")
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
logger.info(f"{self.reconnect_interval}秒后尝试重连...")
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket_connection: WebSocketClientProtocol) -> None:
@@ -111,8 +138,22 @@ class WS:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except json.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
self.logger.debug(f"原始消息: {message}")
except Exception as e:
logger.exception(f"解析消息异常: {e}")
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"解析消息异常: {error.message}")
self.logger.log_custom_exception(error)
async def on_event(self, event_data: Dict[str, Any]) -> None:
"""
@@ -136,17 +177,17 @@ class WS:
if self.bot is None and hasattr(event, 'self_id'):
self.self_id = event.self_id
self.bot = Bot(self)
logger.success(f"Bot 实例初始化完成: self_id={self.self_id}")
self.logger.success(f"Bot 实例初始化完成: self_id={self.self_id}")
# 将代码执行器注入到 Bot 和执行器自身
if self.code_executor:
self.bot.code_executor = self.code_executor
self.code_executor.bot = self.bot
logger.info("代码执行器已成功注入 Bot 实例。")
self.logger.info("代码执行器已成功注入 Bot 实例。")
# 如果 bot 尚未初始化,则不处理后续事件
if self.bot is None:
logger.warning("Bot 尚未初始化,跳过事件处理。")
self.logger.warning("Bot 尚未初始化,跳过事件处理。")
return
event.bot = self.bot # 注入 Bot 实例
@@ -157,23 +198,28 @@ class WS:
message_type = getattr(event, "message_type", "Unknown")
user_id = getattr(event, "user_id", "Unknown")
raw_message = getattr(event, "raw_message", "")
logger.info(f"[消息] {message_type} | {user_id}({sender_name}): {raw_message}")
self.logger.info(f"[消息] {message_type} | {user_id}({sender_name}): {raw_message}")
elif event.post_type == "notice":
notice_type = getattr(event, "notice_type", "Unknown")
logger.info(f"[通知] {notice_type}")
self.logger.info(f"[通知] {notice_type}")
elif event.post_type == "request":
request_type = getattr(event, "request_type", "Unknown")
logger.info(f"[请求] {request_type}")
self.logger.info(f"[请求] {request_type}")
elif event.post_type == "meta_event":
meta_event_type = getattr(event, "meta_event_type", "Unknown")
logger.debug(f"[元事件] {meta_event_type}")
self.logger.debug(f"[元事件] {meta_event_type}")
# 分发事件
await matcher.handle_event(self.bot, event)
except Exception as e:
logger.exception(f"事件处理异常: {e}")
self.logger.exception(f"事件处理异常: {str(e)}")
error = WebSocketError(
message=f"事件处理异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.log_custom_exception(error)
async def call_api(self, action: str, params: Optional[Dict[Any, Any]] = None) -> Dict[Any, Any]:
"""
@@ -191,14 +237,22 @@ class WS:
表示失败的字典
"""
if not self.ws:
logger.error("调用 API 失败: WebSocket 未初始化")
return {"status": "failed", "msg": "websocket not initialized"}
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
from websockets.protocol import State
if getattr(self.ws, "state", None) is not State.OPEN:
logger.error("调用 API 失败: WebSocket 连接未打开")
return {"status": "failed", "msg": "websocket is not open"}
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
@@ -207,12 +261,23 @@ class WS:
future = loop.create_future()
self._pending_requests[echo_id] = future
await self.ws.send(json.dumps(payload))
try:
await self.ws.send(json.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
logger.warning(f"API 调用超时: action={action}, params={params}")
return {"status": "failed", "retcode": -1, "msg": "api timeout"}
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)

View File

@@ -8,7 +8,9 @@ from pathlib import Path
import tomllib
from pydantic import ValidationError
from .config_models import ConfigModel, NapCatWSModel, BotModel, RedisModel, DockerModel
from .utils.logger import logger
from .utils.logger import logger, ModuleLogger
from .utils.exceptions import ConfigError, ConfigNotFoundError, ConfigValidationError
from .utils.error_codes import ErrorCode, create_error_response
class Config:
@@ -24,36 +26,66 @@ class Config:
"""
self.path = Path(file_path)
self._model: ConfigModel
# 创建模块专用日志记录器
self.logger = ModuleLogger("ConfigLoader")
self.load()
def load(self):
"""
加载并验证配置文件
:raises FileNotFoundError: 如果配置文件不存在
:raises ValidationError: 如果配置格式不正确
:raises ConfigNotFoundError: 如果配置文件不存在
:raises ConfigValidationError: 如果配置格式不正确
:raises ConfigError: 如果加载配置时发生其他错误
"""
if not self.path.exists():
logger.error(f"配置文件 {self.path} 未找到!")
raise FileNotFoundError(f"配置文件 {self.path} 未找到!")
error = ConfigNotFoundError(message=f"配置文件 {self.path} 未找到!")
self.logger.error(f"配置加载失败: {error.message}")
self.logger.log_custom_exception(error)
raise error
try:
logger.info(f"正在从 {self.path} 加载配置...")
self.logger.info(f"正在从 {self.path} 加载配置...")
with open(self.path, "rb") as f:
raw_config = tomllib.load(f)
self._model = ConfigModel(**raw_config)
logger.success("配置加载并验证成功!")
self.logger.success("配置加载并验证成功!")
except ValidationError as e:
logger.error("配置验证失败,请检查 `config.toml` 文件中的以下错误:")
error_details = []
for error in e.errors():
field = " -> ".join(map(str, error["loc"]))
logger.error(f" - 字段 '{field}': {error['msg']}")
raise
error_msg = f"字段 '{field}': {error['msg']}"
error_details.append(error_msg)
validation_error = ConfigValidationError(
message="配置验证失败",
original_error=e
)
self.logger.error("配置验证失败,请检查 `config.toml` 文件中的以下错误:")
for detail in error_details:
self.logger.error(f" - {detail}")
self.logger.log_custom_exception(validation_error)
raise validation_error
except tomllib.TOMLDecodeError as e:
error = ConfigError(
message=f"TOML解析错误: {str(e)}",
original_error=e
)
self.logger.error(f"加载配置文件时发生TOML解析错误: {error.message}")
self.logger.log_custom_exception(error)
raise error
except Exception as e:
logger.exception(f"加载配置文件时发生未知错误: {e}")
raise
error = ConfigError(
message=f"加载配置文件时发生未知错误: {str(e)}",
original_error=e
)
self.logger.exception(f"加载配置文件时发生未知错误: {error.message}")
self.logger.log_custom_exception(error)
raise error
# 通过属性访问配置
@property

View File

@@ -10,8 +10,9 @@ import sys
from typing import Set
from .command_manager import CommandManager
from ..utils.exceptions import SyncHandlerError
from ..utils.logger import logger
from ..utils.exceptions import SyncHandlerError, PluginError, PluginLoadError, PluginReloadError, PluginNotFoundError
from ..utils.logger import logger, ModuleLogger
from ..utils.error_codes import ErrorCode, create_error_response
# 确保logger在模块级别可见
__all__ = ['PluginManager', 'logger']
@@ -29,6 +30,8 @@ class PluginManager:
"""
self.command_manager = command_manager
self.loaded_plugins: Set[str] = set()
# 创建模块专用日志记录器
self.logger = ModuleLogger("PluginManager")
def load_all_plugins(self) -> None:
"""
@@ -45,10 +48,10 @@ class PluginManager:
package_name = "plugins"
if not os.path.exists(plugin_dir):
logger.error(f"插件目录不存在: {plugin_dir}")
self.logger.error(f"插件目录不存在: {plugin_dir}")
return
logger.info(f"正在从 {package_name} 加载插件 (路径: {plugin_dir})...")
self.logger.info(f"正在从 {package_name} 加载插件 (路径: {plugin_dir})...")
for _, module_name, is_pkg in pkgutil.iter_modules([plugin_dir]):
full_module_name = f"{package_name}.{module_name}"
@@ -70,23 +73,38 @@ class PluginManager:
self.loaded_plugins.add(full_module_name)
type_str = "" if is_pkg else "文件"
logger.success(f" [{type_str}] 成功{action}: {module_name}")
self.logger.success(f" [{type_str}] 成功{action}: {module_name}")
except SyncHandlerError as e:
logger.error(f" 插件 {module_name} 加载失败: {e} (跳过此插件)")
except Exception as e:
logger.exception(
f" 加载插件 {module_name} 失败: {e}"
error = PluginLoadError(
plugin_name=module_name,
message=f"同步处理器错误: {str(e)}",
original_error=e
)
self.logger.error(f" 插件 {module_name} 加载失败: {error.message} (跳过此插件)")
self.logger.log_custom_exception(error)
except Exception as e:
error = PluginLoadError(
plugin_name=module_name,
message=f"未知错误: {str(e)}",
original_error=e
)
self.logger.exception(f" 加载插件 {module_name} 失败: {error.message}")
self.logger.log_custom_exception(error)
def reload_plugin(self, full_module_name: str) -> None:
"""
精确重载单个插件。
"""
if full_module_name not in self.loaded_plugins:
logger.warning(f"尝试重载一个未被加载的插件: {full_module_name},将按首次加载处理。")
self.logger.warning(f"尝试重载一个未被加载的插件: {full_module_name},将按首次加载处理。")
if full_module_name not in sys.modules:
logger.error(f"重载失败: 模块 {full_module_name} 未在 sys.modules 中找到。")
error = PluginNotFoundError(
plugin_name=full_module_name,
message="模块未在sys.modules中找到"
)
self.logger.error(f"重载失败: {error.message}")
self.logger.log_custom_exception(error)
return
try:
@@ -97,6 +115,20 @@ class PluginManager:
meta = getattr(module, "__plugin_meta__")
self.command_manager.plugins[full_module_name] = meta
logger.success(f"插件 {full_module_name} 已成功重载。")
self.logger.success(f"插件 {full_module_name} 已成功重载。")
except SyncHandlerError as e:
error = PluginReloadError(
plugin_name=full_module_name,
message=f"同步处理器错误: {str(e)}",
original_error=e
)
self.logger.error(f"重载插件 {full_module_name} 失败: {error.message}")
self.logger.log_custom_exception(error)
except Exception as e:
logger.exception(f"重载插件 {full_module_name} 时发生错误: {e}")
error = PluginReloadError(
plugin_name=full_module_name,
message=f"未知错误: {str(e)}",
original_error=e
)
self.logger.exception(f"重载插件 {full_module_name} 时发生错误: {error.message}")
self.logger.log_custom_exception(error)

234
core/utils/error_codes.py Normal file
View File

@@ -0,0 +1,234 @@
"""
错误码和统一响应格式模块
该模块定义了项目中使用的错误码和统一的错误响应格式,确保所有模块返回一致的错误信息。
"""
# 错误码定义
class ErrorCode:
"""
错误码枚举类,包含所有系统错误码的定义。
错误码规则:
- 1xxx: 系统级错误
- 2xxx: WebSocket相关错误
- 3xxx: 插件相关错误
- 4xxx: 配置相关错误
- 5xxx: 权限相关错误
- 6xxx: 命令相关错误
- 7xxx: Redis相关错误
- 8xxx: 浏览器管理器相关错误
- 9xxx: 代码执行相关错误
"""
# 系统级错误
SUCCESS = 0 # 成功
UNKNOWN_ERROR = 1000 # 未知错误
INVALID_PARAMETER = 1001 # 参数无效
DATABASE_ERROR = 1002 # 数据库错误
NETWORK_ERROR = 1003 # 网络错误
TIMEOUT_ERROR = 1004 # 超时错误
RESOURCE_EXHAUSTED = 1005 # 资源耗尽
# WebSocket相关错误
WS_CONNECTION_FAILED = 2000 # WebSocket连接失败
WS_AUTH_FAILED = 2001 # WebSocket认证失败
WS_DISCONNECTED = 2002 # WebSocket已断开
WS_MESSAGE_ERROR = 2003 # WebSocket消息错误
# 插件相关错误
PLUGIN_LOAD_FAILED = 3000 # 插件加载失败
PLUGIN_RELOAD_FAILED = 3001 # 插件重载失败
PLUGIN_NOT_FOUND = 3002 # 插件未找到
PLUGIN_INVALID = 3003 # 插件无效
PLUGIN_DEPENDENCY_ERROR = 3004 # 插件依赖错误
# 配置相关错误
CONFIG_NOT_FOUND = 4000 # 配置文件未找到
CONFIG_PARSE_ERROR = 4001 # 配置解析错误
CONFIG_VALIDATION_ERROR = 4002 # 配置验证错误
CONFIG_KEY_NOT_FOUND = 4003 # 配置项未找到
# 权限相关错误
PERMISSION_DENIED = 5000 # 权限不足
NOT_ADMIN = 5001 # 不是管理员
USER_BANNED = 5002 # 用户已被禁止
# 命令相关错误
COMMAND_NOT_FOUND = 6000 # 命令未找到
COMMAND_PARAM_ERROR = 6001 # 命令参数错误
COMMAND_EXECUTE_ERROR = 6002 # 命令执行错误
COMMAND_TIMEOUT = 6003 # 命令执行超时
# Redis相关错误
REDIS_CONNECTION_FAILED = 7000 # Redis连接失败
REDIS_OPERATION_ERROR = 7001 # Redis操作错误
# 浏览器管理器相关错误
BROWSER_INIT_FAILED = 8000 # 浏览器初始化失败
BROWSER_POOL_ERROR = 8001 # 浏览器池错误
BROWSER_OPERATION_ERROR = 8002 # 浏览器操作错误
# 代码执行相关错误
CODE_EXECUTE_ERROR = 9000 # 代码执行错误
CODE_SECURITY_ERROR = 9001 # 代码安全错误
# 错误码到错误消息的映射
ERROR_MESSAGES = {
# 系统级错误
ErrorCode.SUCCESS: "操作成功",
ErrorCode.UNKNOWN_ERROR: "未知错误",
ErrorCode.INVALID_PARAMETER: "参数无效",
ErrorCode.DATABASE_ERROR: "数据库错误",
ErrorCode.NETWORK_ERROR: "网络错误",
ErrorCode.TIMEOUT_ERROR: "操作超时",
ErrorCode.RESOURCE_EXHAUSTED: "资源耗尽",
# WebSocket相关错误
ErrorCode.WS_CONNECTION_FAILED: "WebSocket连接失败",
ErrorCode.WS_AUTH_FAILED: "WebSocket认证失败",
ErrorCode.WS_DISCONNECTED: "WebSocket已断开连接",
ErrorCode.WS_MESSAGE_ERROR: "WebSocket消息格式错误",
# 插件相关错误
ErrorCode.PLUGIN_LOAD_FAILED: "插件加载失败",
ErrorCode.PLUGIN_RELOAD_FAILED: "插件重载失败",
ErrorCode.PLUGIN_NOT_FOUND: "插件未找到",
ErrorCode.PLUGIN_INVALID: "插件无效",
ErrorCode.PLUGIN_DEPENDENCY_ERROR: "插件依赖错误",
# 配置相关错误
ErrorCode.CONFIG_NOT_FOUND: "配置文件未找到",
ErrorCode.CONFIG_PARSE_ERROR: "配置文件解析错误",
ErrorCode.CONFIG_VALIDATION_ERROR: "配置验证失败",
ErrorCode.CONFIG_KEY_NOT_FOUND: "配置项未找到",
# 权限相关错误
ErrorCode.PERMISSION_DENIED: "权限不足",
ErrorCode.NOT_ADMIN: "需要管理员权限",
ErrorCode.USER_BANNED: "用户已被禁止操作",
# 命令相关错误
ErrorCode.COMMAND_NOT_FOUND: "命令未找到",
ErrorCode.COMMAND_PARAM_ERROR: "命令参数错误",
ErrorCode.COMMAND_EXECUTE_ERROR: "命令执行错误",
ErrorCode.COMMAND_TIMEOUT: "命令执行超时",
# Redis相关错误
ErrorCode.REDIS_CONNECTION_FAILED: "Redis连接失败",
ErrorCode.REDIS_OPERATION_ERROR: "Redis操作错误",
# 浏览器管理器相关错误
ErrorCode.BROWSER_INIT_FAILED: "浏览器初始化失败",
ErrorCode.BROWSER_POOL_ERROR: "浏览器池错误",
ErrorCode.BROWSER_OPERATION_ERROR: "浏览器操作错误",
# 代码执行相关错误
ErrorCode.CODE_EXECUTE_ERROR: "代码执行错误",
ErrorCode.CODE_SECURITY_ERROR: "代码存在安全风险",
}
def get_error_message(code: int) -> str:
"""
根据错误码获取错误消息
Args:
code: 错误码
Returns:
str: 错误消息
"""
return ERROR_MESSAGES.get(code, ERROR_MESSAGES[ErrorCode.UNKNOWN_ERROR])
def create_error_response(code: int, message: str = None, data: dict = None, request_id: str = None) -> dict:
"""
创建统一格式的错误响应
Args:
code: 错误码
message: 错误消息(可选,如果未提供则使用默认消息)
data: 附加数据(可选)
request_id: 请求ID可选用于追踪请求
Returns:
dict: 统一格式的错误响应
"""
error_message = message if message is not None else get_error_message(code)
response = {
"code": code,
"message": error_message,
"success": code == ErrorCode.SUCCESS,
}
if data is not None:
response["data"] = data
if request_id is not None:
response["request_id"] = request_id
return response
def exception_to_error_response(exception: Exception, code: int = None, request_id: str = None) -> dict:
"""
将异常对象转换为统一格式的错误响应
Args:
exception: 异常对象
code: 错误码(可选,如果未提供则根据异常类型自动推断)
request_id: 请求ID可选用于追踪请求
Returns:
dict: 统一格式的错误响应
"""
# 从自定义异常类中提取错误码
if hasattr(exception, "code") and exception.code is not None:
code = exception.code
# 如果仍未找到错误码,则根据异常类型推断
if code is None:
from .exceptions import (
WebSocketError, PluginError, ConfigError, PermissionError,
CommandError, RedisError, BrowserManagerError, CodeExecutionError
)
if isinstance(exception, WebSocketError):
code = ErrorCode.WS_CONNECTION_FAILED
elif isinstance(exception, PluginError):
code = ErrorCode.PLUGIN_LOAD_FAILED
elif isinstance(exception, ConfigError):
code = ErrorCode.CONFIG_PARSE_ERROR
elif isinstance(exception, PermissionError):
code = ErrorCode.PERMISSION_DENIED
elif isinstance(exception, CommandError):
code = ErrorCode.COMMAND_EXECUTE_ERROR
elif isinstance(exception, RedisError):
code = ErrorCode.REDIS_OPERATION_ERROR
elif isinstance(exception, BrowserManagerError):
code = ErrorCode.BROWSER_OPERATION_ERROR
elif isinstance(exception, CodeExecutionError):
code = ErrorCode.CODE_EXECUTE_ERROR
else:
code = ErrorCode.UNKNOWN_ERROR
# 获取错误消息
message = str(exception)
# 如果异常有原始错误,也包含在响应中
data = None
if hasattr(exception, "original_error") and exception.original_error is not None:
data = {"original_error": str(exception.original_error)}
return create_error_response(code, message, data, request_id)
# 将错误码导出以便其他模块使用
__all__ = [
"ErrorCode",
"get_error_message",
"create_error_response",
"exception_to_error_response"
]

View File

@@ -1,5 +1,7 @@
"""
自定义异常模块
该模块定义了项目中使用的各种自定义异常类,用于提供更精确、更友好的错误提示。
"""
class SyncHandlerError(Exception):
@@ -7,3 +9,213 @@ class SyncHandlerError(Exception):
当尝试注册同步函数作为异步事件处理器时抛出此异常。
"""
pass
class WebSocketError(Exception):
"""
WebSocket相关错误的基类。
Args:
message: 错误消息
code: 错误代码(可选)
original_error: 原始异常对象(可选)
"""
def __init__(self, message, code=None, original_error=None):
self.message = message
self.code = code
self.original_error = original_error
super().__init__(message)
class WebSocketConnectionError(WebSocketError):
"""
WebSocket连接失败时抛出此异常。
"""
pass
class WebSocketAuthenticationError(WebSocketError):
"""
WebSocket认证失败时抛出此异常。
"""
pass
class PluginError(Exception):
"""
插件相关错误的基类。
Args:
plugin_name: 插件名称
message: 错误消息
original_error: 原始异常对象(可选)
"""
def __init__(self, plugin_name, message, original_error=None):
self.plugin_name = plugin_name
self.message = message
self.original_error = original_error
super().__init__(f"插件 {plugin_name}: {message}")
class PluginLoadError(PluginError):
"""
插件加载失败时抛出此异常。
"""
pass
class PluginReloadError(PluginError):
"""
插件重载失败时抛出此异常。
"""
pass
class PluginNotFoundError(PluginError):
"""
找不到指定插件时抛出此异常。
"""
pass
class ConfigError(Exception):
"""
配置相关错误的基类。
Args:
section: 配置部分名称
key: 配置项名称
message: 错误消息
"""
def __init__(self, section=None, key=None, message=None):
self.section = section
self.key = key
self.message = message
if section and key and message:
super().__init__(f"配置错误 [{section}.{key}]: {message}")
elif section and message:
super().__init__(f"配置错误 [{section}]: {message}")
else:
super().__init__(message or "配置错误")
class ConfigNotFoundError(ConfigError):
"""
配置文件不存在时抛出此异常。
"""
pass
class ConfigValidationError(ConfigError):
"""
配置验证失败时抛出此异常。
"""
pass
class PermissionError(Exception):
"""
权限相关错误的基类。
Args:
user_id: 用户ID
operation: 操作名称
message: 错误消息
"""
def __init__(self, user_id=None, operation=None, message=None):
self.user_id = user_id
self.operation = operation
self.message = message
if user_id and operation and message:
super().__init__(f"权限错误 [用户 {user_id}]: 无权限执行操作 {operation} - {message}")
elif user_id and operation:
super().__init__(f"权限错误 [用户 {user_id}]: 无权限执行操作 {operation}")
else:
super().__init__(message or "权限错误")
class CommandError(Exception):
"""
命令处理相关错误的基类。
Args:
command: 命令名称
message: 错误消息
original_error: 原始异常对象(可选)
"""
def __init__(self, command=None, message=None, original_error=None):
self.command = command
self.message = message
self.original_error = original_error
if command and message:
super().__init__(f"命令错误 [{command}]: {message}")
else:
super().__init__(message or "命令错误")
class CommandNotFoundError(CommandError):
"""
找不到指定命令时抛出此异常。
"""
pass
class CommandParameterError(CommandError):
"""
命令参数错误时抛出此异常。
"""
pass
class RedisError(Exception):
"""
Redis相关错误的基类。
Args:
message: 错误消息
original_error: 原始异常对象(可选)
"""
def __init__(self, message, original_error=None):
self.message = message
self.original_error = original_error
super().__init__(message)
class BrowserManagerError(Exception):
"""
浏览器管理器相关错误的基类。
Args:
message: 错误消息
original_error: 原始异常对象(可选)
"""
def __init__(self, message, original_error=None):
self.message = message
self.original_error = original_error
super().__init__(message)
class BrowserPoolError(BrowserManagerError):
"""
浏览器池相关错误时抛出此异常。
"""
pass
class CodeExecutionError(Exception):
"""
代码执行相关错误的基类。
Args:
message: 错误消息
code: 执行的代码(可选)
original_error: 原始异常对象(可选)
"""
def __init__(self, message, code=None, original_error=None):
self.message = message
self.code = code
self.original_error = original_error
super().__init__(message)

View File

@@ -4,25 +4,40 @@
该模块负责初始化和配置 loguru 日志记录器,为整个应用程序提供统一的日志记录接口。
"""
import sys
import os
from pathlib import Path
from loguru import logger
# 定义日志格式
# 定义日志格式添加进程ID和线程ID作为上下文信息
LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<magenta>PID {process} TID {thread}</magenta> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level>"
)
# 开发环境日志格式(更详细)
DEBUG_LOG_FORMAT = (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<magenta>PID {process} TID {thread}</magenta> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<yellow>Module: {module}</yellow> | "
"<level>{message}</level>"
)
# 移除 loguru 默认的处理器
logger.remove()
# 获取当前环境
ENVIRONMENT = os.getenv("NEOBOT_ENV", "development")
# 添加控制台输出处理器
logger.add(
sys.stderr,
level="INFO",
format=LOG_FORMAT,
level="INFO" if ENVIRONMENT == "production" else "DEBUG",
format=LOG_FORMAT if ENVIRONMENT == "production" else DEBUG_LOG_FORMAT,
colorize=True,
enqueue=True # 异步写入
)
@@ -36,7 +51,7 @@ log_file_path = log_dir / "{time:YYYY-MM-DD}.log"
logger.add(
log_file_path,
level="DEBUG",
format=LOG_FORMAT,
format=DEBUG_LOG_FORMAT,
colorize=False,
rotation="00:00", # 每天午夜创建新文件
retention="7 days", # 保留最近 7 天的日志
@@ -46,5 +61,77 @@ logger.add(
diagnose=True # 添加异常诊断信息
)
# 导出配置好的 logger
__all__ = ["logger"]
# 为自定义异常添加专门的日志记录方法
def log_exception(exc, module_name="unknown", level="error"):
"""
记录自定义异常的详细信息
Args:
exc: 异常对象
module_name: 模块名称(可选)
level: 日志级别(可选,默认为 "error"
"""
log_func = getattr(logger, level)
log_func(f"模块 {module_name} 发生异常: {exc}")
# 如果异常对象有原始异常,也记录原始异常信息
if hasattr(exc, "original_error") and exc.original_error:
log_func(f"原始异常: {exc.original_error}")
# 如果是配置错误,记录配置相关信息
if hasattr(exc, "section") and hasattr(exc, "key"):
log_func(f"配置信息: 部分={exc.section}, 键={exc.key}")
# 如果是插件错误,记录插件名称
if hasattr(exc, "plugin_name"):
log_func(f"插件名称: {exc.plugin_name}")
# 如果是命令错误,记录命令名称
if hasattr(exc, "command"):
log_func(f"命令名称: {exc.command}")
# 如果是权限错误记录用户ID和操作
if hasattr(exc, "user_id") and hasattr(exc, "operation"):
log_func(f"权限信息: 用户ID={exc.user_id}, 操作={exc.operation}")
# 为不同模块提供日志工具
class ModuleLogger:
"""
模块专用日志记录器
Args:
module_name: 模块名称
"""
def __init__(self, module_name):
self.module_name = module_name
def debug(self, message):
logger.debug(f"[{self.module_name}] {message}")
def info(self, message):
logger.info(f"[{self.module_name}] {message}")
def success(self, message):
logger.success(f"[{self.module_name}] {message}")
def warning(self, message):
logger.warning(f"[{self.module_name}] {message}")
def error(self, message):
logger.error(f"[{self.module_name}] {message}")
def exception(self, message, exc_info=True):
logger.exception(f"[{self.module_name}] {message}", exc_info=exc_info)
def log_custom_exception(self, exc, level="error"):
"""
记录自定义异常
Args:
exc: 异常对象
level: 日志级别
"""
log_exception(exc, self.module_name, level)
# 导出配置好的 logger 和工具函数
__all__ = ["logger", "log_exception", "ModuleLogger"]