feat: 重构核心架构,增强类型安全与插件管理
本次提交对核心模块进行了深度重构,引入 Pydantic 增强配置管理的类型安全性,并全面优化了插件管理系统。 主要变更详情: 1. 核心架构与配置 - 重构配置加载模块:引入 Pydantic 模型 (`core/config_models.py`),提供严格的配置项类型检查、验证及默认值管理。 - 统一模块结构:规范化模块导入路径,移除冗余的 `__init__.py` 文件,提升项目结构的清晰度。 - 性能优化:集成 Redis 缓存支持 (`RedisManager`),有效降低高频 API 调用开销,提升响应速度。 2. 插件系统升级 - 实现热重载机制:新增插件文件变更监听功能,支持开发过程中自动重载插件,提升开发效率。 - 优化生命周期管理:改进插件加载与卸载逻辑,支持精确卸载指定插件及其关联的命令、事件处理器和定时任务。 3. 功能特性增强 - 新增媒体 API:引入 `MediaAPI` 模块,封装图片、语音等富媒体资源的获取与处理接口。 - 完善权限体系:重构权限管理系统,实现管理员与操作员的分级控制,支持更细粒度的命令权限校验。 4. 代码质量与稳定性 - 全面类型修复:解决 `mypy` 静态类型检查发现的大量类型错误(包括 `CommandManager`、`EventFactory` 及 `Bot` API 签名不匹配问题)。 - 增强错误处理:优化消息处理管道的异常捕获机制,完善关键路径的日志记录,提升系统运行稳定性。
This commit is contained in:
@@ -13,64 +13,17 @@
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from functools import total_ordering
|
||||
from typing import Dict
|
||||
from typing import Dict, Optional
|
||||
|
||||
from ..utils.logger import logger
|
||||
from ..utils.singleton import Singleton
|
||||
from .admin_manager import admin_manager
|
||||
from ..permission import Permission
|
||||
|
||||
|
||||
@total_ordering
|
||||
class Permission:
|
||||
"""
|
||||
权限封装类
|
||||
|
||||
封装了权限的名称和等级,并提供了比较方法。
|
||||
使用 @total_ordering 装饰器可以自动生成所有的比较运算符。
|
||||
"""
|
||||
def __init__(self, name: str, level: int):
|
||||
"""
|
||||
初始化权限对象
|
||||
|
||||
Args:
|
||||
name (str): 权限名称 (e.g., "admin", "op")
|
||||
level (int): 权限等级,数字越大权限越高
|
||||
"""
|
||||
self.name = name
|
||||
self.level = level
|
||||
|
||||
def __eq__(self, other):
|
||||
"""
|
||||
判断权限是否相等
|
||||
"""
|
||||
if not isinstance(other, Permission):
|
||||
return NotImplemented
|
||||
return self.level == other.level
|
||||
|
||||
def __lt__(self, other):
|
||||
"""
|
||||
判断权限是否小于另一个权限
|
||||
"""
|
||||
if not isinstance(other, Permission):
|
||||
return NotImplemented
|
||||
return self.level < other.level
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
返回权限的字符串表示(即权限名称)
|
||||
"""
|
||||
return self.name
|
||||
|
||||
|
||||
# 定义全局权限常量
|
||||
ADMIN = Permission("admin", 3)
|
||||
OP = Permission("op", 2)
|
||||
USER = Permission("user", 1)
|
||||
|
||||
# 用于从字符串名称查找权限对象的字典
|
||||
_PERMISSIONS: Dict[str, Permission] = {
|
||||
p.name: p for p in [ADMIN, OP, USER]
|
||||
p.value: p for p in Permission
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +42,7 @@ class PermissionManager(Singleton):
|
||||
如果已经初始化过,则直接返回。
|
||||
"""
|
||||
super().__init__()
|
||||
if not self._initialized:
|
||||
if hasattr(self, '_initialized') and self._initialized:
|
||||
return
|
||||
|
||||
# 权限数据文件路径
|
||||
@@ -111,6 +64,7 @@ class PermissionManager(Singleton):
|
||||
self.load()
|
||||
|
||||
logger.info("权限管理器初始化完成")
|
||||
self._initialized = True
|
||||
|
||||
def load(self) -> None:
|
||||
"""
|
||||
@@ -164,12 +118,12 @@ class PermissionManager(Singleton):
|
||||
"""
|
||||
# 首先,通过 AdminManager 检查是否为管理员
|
||||
if await admin_manager.is_admin(user_id):
|
||||
return ADMIN
|
||||
return Permission.ADMIN
|
||||
|
||||
# 如果不是管理员,则从 permissions.json 中查找
|
||||
user_id_str = str(user_id)
|
||||
level_name = self._data["users"].get(user_id_str, USER.name)
|
||||
return _PERMISSIONS.get(level_name, USER)
|
||||
level_name = self._data["users"].get(user_id_str, Permission.USER.value)
|
||||
return _PERMISSIONS.get(level_name, Permission.USER)
|
||||
|
||||
def set_user_permission(self, user_id: int, permission: Permission) -> None:
|
||||
"""
|
||||
@@ -182,13 +136,13 @@ class PermissionManager(Singleton):
|
||||
Raises:
|
||||
ValueError: 如果权限对象无效
|
||||
"""
|
||||
if not isinstance(permission, Permission) or permission.name not in _PERMISSIONS:
|
||||
if not isinstance(permission, Permission):
|
||||
raise ValueError(f"无效的权限对象: {permission}")
|
||||
|
||||
user_id_str = str(user_id)
|
||||
self._data["users"][user_id_str] = permission.name
|
||||
self._data["users"][user_id_str] = permission.value
|
||||
self.save()
|
||||
logger.info(f"设置用户 {user_id} 的权限级别为 {permission.name}")
|
||||
logger.info(f"设置用户 {user_id} 的权限级别为 {permission.value}")
|
||||
|
||||
def remove_user(self, user_id: int) -> None:
|
||||
"""
|
||||
@@ -214,17 +168,17 @@ class PermissionManager(Singleton):
|
||||
Returns:
|
||||
bool: 如果用户权限 >= 所需权限,返回 True,否则返回 False
|
||||
"""
|
||||
# 如果传入的是字符串,先转换为 Permission 对象
|
||||
if isinstance(required_permission, str):
|
||||
required_permission = _PERMISSIONS.get(required_permission.lower())
|
||||
if not required_permission:
|
||||
# 如果是无效的权限字符串,默认拒绝
|
||||
logger.warning(f"检测到无效的权限检查字符串: {required_permission}")
|
||||
return False
|
||||
|
||||
user_permission = await self.get_user_permission(user_id)
|
||||
return user_permission >= required_permission
|
||||
|
||||
def get_all_user_permissions(self) -> Dict[str, str]:
|
||||
"""
|
||||
获取所有已配置的用户权限
|
||||
|
||||
:return: 一个包含所有用户权限的字典
|
||||
"""
|
||||
return self._data["users"].copy()
|
||||
|
||||
def get_all_users(self) -> Dict[str, str]:
|
||||
"""
|
||||
获取所有设置了权限的用户及其级别名称
|
||||
@@ -243,22 +197,22 @@ class PermissionManager(Singleton):
|
||||
logger.info("已清空所有权限设置")
|
||||
|
||||
|
||||
# 全局权限管理器实例
|
||||
permission_manager = PermissionManager()
|
||||
|
||||
def require_admin(func):
|
||||
"""
|
||||
一个装饰器,用于限制命令只能由管理员执行。
|
||||
"""
|
||||
from functools import wraps
|
||||
from models.events.message import MessageEvent
|
||||
from core.managers import permission_manager
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(event: MessageEvent, *args, **kwargs):
|
||||
user_id = event.user_id
|
||||
if await permission_manager.check_permission(user_id, ADMIN):
|
||||
if await permission_manager.check_permission(user_id, Permission.ADMIN):
|
||||
return await func(event, *args, **kwargs)
|
||||
else:
|
||||
await event.reply("抱歉,您没有权限执行此命令。")
|
||||
# 假设 event 对象有 reply 方法
|
||||
if hasattr(event, "reply"):
|
||||
await event.reply("抱歉,您没有权限执行此命令。")
|
||||
return None
|
||||
return wrapper
|
||||
|
||||
Reference in New Issue
Block a user