""" 权限管理器模块 该模块负责管理用户权限,支持 admin、op、user 三个权限级别。 权限数据存储在 `permissions.json` 文件中,格式为: { "users": { "123456": "admin", "789012": "op", "345678": "user" } } """ import json import os from typing import Dict from ..utils.logger import logger from ..utils.singleton import Singleton from .admin_manager import admin_manager from ..permission import Permission # 用于从字符串名称查找权限对象的字典 _PERMISSIONS: Dict[str, Permission] = { p.value: p for p in Permission } class PermissionManager(Singleton): """ 权限管理器类 负责加载、保存和查询用户权限数据。 使用单例模式,确保全局只有一个权限管理器实例。 """ def __init__(self): """ 初始化权限管理器 如果已经初始化过,则直接返回。 """ if hasattr(self, '_initialized') and self._initialized: return # 权限数据文件路径 self.data_file = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "data", "permissions.json" ) # 确保数据目录存在 data_dir = os.path.dirname(self.data_file) os.makedirs(data_dir, exist_ok=True) # 权限数据存储结构:{"users": {"user_id": "level_name"}} self._data: Dict[str, Dict[str, str]] = {"users": {}} # 加载现有数据 self.load() logger.info("权限管理器初始化完成") super().__init__() def load(self) -> None: """ 从文件加载权限数据 如果文件不存在,则创建空文件并初始化默认数据结构。 """ try: if os.path.exists(self.data_file): with open(self.data_file, "r", encoding="utf-8") as f: data = json.load(f) # 兼容旧格式 if "users" in data: self._data["users"] = data["users"] else: self._data["users"] = {} logger.debug(f"权限数据已从 {self.data_file} 加载") else: # 文件不存在,创建空文件 self.save() logger.debug(f"创建空的权限数据文件: {self.data_file}") except json.JSONDecodeError as e: logger.error(f"权限数据文件格式错误: {e}") # 文件损坏,重置为空数据 self._data["users"] = {} self.save() except Exception as e: logger.error(f"加载权限数据失败: {e}") self._data["users"] = {} def save(self) -> None: """ 将权限数据保存到文件 """ try: with open(self.data_file, "w", encoding="utf-8") as f: json.dump(self._data, f, indent=2, ensure_ascii=False) logger.debug(f"权限数据已保存到 {self.data_file}") except Exception as e: logger.error(f"保存权限数据失败: {e}") async def get_user_permission(self, user_id: int) -> Permission: """ 获取指定用户的权限对象 Args: user_id (int): 用户 QQ 号 Returns: Permission: 用户的权限对象,如果用户不存在则返回默认级别 USER """ # 首先,通过 AdminManager 检查是否为管理员 if await admin_manager.is_admin(user_id): return Permission.ADMIN # 如果不是管理员,则从 permissions.json 中查找 user_id_str = str(user_id) level_name = self._data["users"].get(user_id_str, Permission.USER.value) return _PERMISSIONS.get(level_name, Permission.USER) def set_user_permission(self, user_id: int, permission: Permission) -> None: """ 设置指定用户的权限级别 Args: user_id (int): 用户 QQ 号 permission (Permission): 权限对象 Raises: ValueError: 如果权限对象无效 """ if not isinstance(permission, Permission): raise ValueError(f"无效的权限对象: {permission}") user_id_str = str(user_id) self._data["users"][user_id_str] = permission.value self.save() logger.info(f"设置用户 {user_id} 的权限级别为 {permission.value}") def remove_user(self, user_id: int) -> None: """ 移除指定用户的权限设置,恢复为默认级别 Args: user_id (int): 用户 QQ 号 """ user_id_str = str(user_id) if user_id_str in self._data["users"]: del self._data["users"][user_id_str] self.save() logger.info(f"移除用户 {user_id} 的权限设置") async def check_permission(self, user_id: int, required_permission: Permission) -> bool: """ 检查用户是否具有指定权限级别 Args: user_id (int): 用户 QQ 号 required_permission (Permission): 所需的权限对象 Returns: bool: 如果用户权限 >= 所需权限,返回 True,否则返回 False """ user_permission = await self.get_user_permission(user_id) return user_permission >= required_permission async def get_all_user_permissions(self) -> Dict[str, str]: """ 获取所有已配置的用户权限(包括 AdminManager 中的管理员) :return: 一个包含所有用户权限的字典 """ permissions = self._data["users"].copy() # 合并 AdminManager 中的管理员 admins = await admin_manager.get_all_admins() for admin_id in admins: permissions[str(admin_id)] = Permission.ADMIN.value return permissions def get_all_users(self) -> Dict[str, str]: """ 获取所有设置了权限的用户及其级别名称 Returns: Dict[str, str]: 用户ID到权限级别名称的映射 """ return self._data["users"].copy() def clear_all(self) -> None: """ 清空所有权限设置 """ self._data["users"].clear() self.save() logger.info("已清空所有权限设置") def require_admin(func): """ 一个装饰器,用于限制命令只能由管理员执行。 """ from functools import wraps from models.events.message import MessageEvent from core.managers import permission_manager @wraps(func) async def wrapper(event: MessageEvent, *args, **kwargs): user_id = event.user_id if await permission_manager.check_permission(user_id, Permission.ADMIN): return await func(event, *args, **kwargs) else: # 假设 event 对象有 reply 方法 if hasattr(event, "reply"): await event.reply("抱歉,您没有权限执行此命令。") return None return wrapper