* 滚木 * feat: 重构核心架构,增强类型安全与插件管理 本次提交对核心模块进行了深度重构,引入 Pydantic 增强配置管理的类型安全性,并全面优化了插件管理系统。 主要变更详情: 1. 核心架构与配置 - 重构配置加载模块:引入 Pydantic 模型 (`core/config_models.py`),提供严格的配置项类型检查、验证及默认值管理。 - 统一模块结构:规范化模块导入路径,移除冗余的 `__init__.py` 文件,提升项目结构的清晰度。 - 性能优化:集成 Redis 缓存支持 (`RedisManager`),有效降低高频 API 调用开销,提升响应速度。 2. 插件系统升级 - 实现热重载机制:新增插件文件变更监听功能,支持开发过程中自动重载插件,提升开发效率。 - 优化生命周期管理:改进插件加载与卸载逻辑,支持精确卸载指定插件及其关联的命令、事件处理器和定时任务。 3. 功能特性增强 - 新增媒体 API:引入 `MediaAPI` 模块,封装图片、语音等富媒体资源的获取与处理接口。 - 完善权限体系:重构权限管理系统,实现管理员与操作员的分级控制,支持更细粒度的命令权限校验。 4. 代码质量与稳定性 - 全面类型修复:解决 `mypy` 静态类型检查发现的大量类型错误(包括 `CommandManager`、`EventFactory` 及 `Bot` API 签名不匹配问题)。 - 增强错误处理:优化消息处理管道的异常捕获机制,完善关键路径的日志记录,提升系统运行稳定性。 * feat: 添加测试用例并优化代码结构 refactor(permission_manager): 调整初始化顺序和逻辑 fix(admin_manager): 修复初始化逻辑和目录创建问题 feat(ws): 优化Bot实例初始化条件 feat(message): 增强MessageSegment功能并添加测试 feat(events): 支持字符串格式的消息解析 test: 添加核心功能测试用例 refactor(plugin_manager): 改进插件路径处理 style: 清理无用导入和代码 chore: 更新依赖项
162 lines
5.5 KiB
Python
162 lines
5.5 KiB
Python
"""
|
|
管理员管理器模块
|
|
|
|
该模块负责管理机器人的管理员列表。
|
|
它实现了文件和 Redis 缓存之间的数据同步,并提供了一套清晰的 API
|
|
供其他模块调用。
|
|
"""
|
|
import json
|
|
import os
|
|
from typing import Set
|
|
|
|
from ..utils.logger import logger
|
|
from ..utils.singleton import Singleton
|
|
from .redis_manager import redis_manager
|
|
|
|
|
|
class AdminManager(Singleton):
|
|
"""
|
|
管理员管理器类
|
|
|
|
负责加载、缓存和管理管理员列表。
|
|
使用单例模式,确保全局只有一个实例。
|
|
"""
|
|
_REDIS_KEY = "neobot:admins" # 用于存储管理员集合的 Redis 键
|
|
def __init__(self):
|
|
"""
|
|
初始化 AdminManager
|
|
"""
|
|
if hasattr(self, '_initialized') and self._initialized:
|
|
return
|
|
|
|
# 管理员数据文件路径
|
|
self.data_file = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
"..",
|
|
"data",
|
|
"admin.json"
|
|
)
|
|
|
|
self._admins: Set[int] = set()
|
|
|
|
# 确保数据目录存在
|
|
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
|
|
|
|
logger.info("管理员管理器初始化完成")
|
|
super().__init__()
|
|
|
|
async def initialize(self):
|
|
"""
|
|
异步初始化,加载数据并同步到 Redis
|
|
"""
|
|
await self._load_from_file()
|
|
await self._sync_to_redis()
|
|
logger.info("管理员数据加载并同步到 Redis 完成")
|
|
|
|
async def _load_from_file(self):
|
|
"""
|
|
从 admin.json 加载管理员列表
|
|
"""
|
|
try:
|
|
if os.path.exists(self.data_file):
|
|
with open(self.data_file, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
admins = data.get("admins", [])
|
|
self._admins = set(int(admin_id) for admin_id in admins)
|
|
logger.debug(f"从 {self.data_file} 加载了 {len(self._admins)} 位管理员")
|
|
else:
|
|
# 如果文件不存在,创建一个空的
|
|
self._admins = set()
|
|
await self._save_to_file()
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
logger.error(f"加载或解析 admin.json 失败: {e}")
|
|
self._admins = set()
|
|
|
|
async def _save_to_file(self):
|
|
"""
|
|
将当前管理员列表保存回 admin.json
|
|
"""
|
|
try:
|
|
# 确保目录存在
|
|
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
|
|
# 将 set 转换为 list 以便 JSON 序列化
|
|
admin_list = [str(admin_id) for admin_id in self._admins]
|
|
with open(self.data_file, "w", encoding="utf-8") as f:
|
|
json.dump({"admins": admin_list}, f, indent=2, ensure_ascii=False)
|
|
logger.debug(f"管理员列表已保存到 {self.data_file}")
|
|
except Exception as e:
|
|
logger.error(f"保存 admin.json 失败: {e}")
|
|
|
|
async def _sync_to_redis(self):
|
|
"""
|
|
将内存中的管理员集合同步到 Redis
|
|
"""
|
|
from core.managers.redis_manager import redis_manager
|
|
try:
|
|
# 首先清空旧的集合
|
|
await redis_manager.redis.delete(self._REDIS_KEY)
|
|
if self._admins:
|
|
# 将所有管理员ID添加到集合中
|
|
await redis_manager.redis.sadd(self._REDIS_KEY, *self._admins)
|
|
logger.debug(f"已将 {len(self._admins)} 位管理员同步到 Redis")
|
|
except Exception as e:
|
|
logger.error(f"同步管理员到 Redis 失败: {e}")
|
|
|
|
async def is_admin(self, user_id: int) -> bool:
|
|
"""
|
|
检查用户是否为管理员(从 Redis 缓存读取)
|
|
"""
|
|
|
|
try:
|
|
return await redis_manager.redis.sismember(self._REDIS_KEY, user_id)
|
|
except Exception as e:
|
|
logger.error(f"从 Redis 检查管理员权限失败: {e}")
|
|
# Redis 失败时,回退到内存检查
|
|
return user_id in self._admins
|
|
|
|
async def add_admin(self, user_id: int) -> bool:
|
|
"""
|
|
添加管理员,并同步到文件和 Redis
|
|
"""
|
|
from .redis_manager import redis_manager
|
|
if user_id in self._admins:
|
|
return False # 用户已经是管理员
|
|
|
|
self._admins.add(user_id)
|
|
await self._save_to_file()
|
|
try:
|
|
await redis_manager.redis.sadd(self._REDIS_KEY, user_id)
|
|
logger.info(f"已添加新管理员 {user_id} 并更新缓存")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"添加管理员 {user_id} 到 Redis 失败: {e}")
|
|
return False
|
|
|
|
async def remove_admin(self, user_id: int) -> bool:
|
|
"""
|
|
移除管理员,并同步到文件和 Redis
|
|
"""
|
|
from .redis_manager import redis_manager
|
|
if user_id not in self._admins:
|
|
return False # 用户不是管理员
|
|
|
|
self._admins.remove(user_id)
|
|
await self._save_to_file()
|
|
try:
|
|
await redis_manager.redis.srem(self._REDIS_KEY, user_id)
|
|
logger.info(f"已移除管理员 {user_id} 并更新缓存")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"从 Redis 移除管理员 {user_id} 失败: {e}")
|
|
return False
|
|
|
|
async def get_all_admins(self) -> Set[int]:
|
|
"""
|
|
获取所有管理员的集合
|
|
"""
|
|
return self._admins.copy()
|
|
|
|
|
|
# 全局 AdminManager 实例
|
|
admin_manager = AdminManager()
|