refactor(permission): 重构权限管理系统,合并管理员管理功能

- 将 admin_manager 功能整合到 permission_manager 中,统一管理
- 采用文件为主、Redis 为辅的架构,确保数据一致性
- 实现原子操作机制,防止数据损坏
- 更新文档说明新的权限管理机制
- 调整相关模块引用和编译配置
This commit is contained in:
2026-01-23 08:37:12 +08:00
parent f53117a250
commit 2fafdb004b
11 changed files with 429 additions and 218 deletions

2
.gitignore vendored
View File

@@ -147,4 +147,4 @@ build/
scratch_files/
/config.toml
/core/data/TEMP/*
/core/data/*

View File

@@ -1,3 +1,8 @@
{
"users": {}
"users": {
"123456789": "op",
"888888": "op",
"2221577113": "admin",
"999999": "user"
}
}

View File

@@ -4,7 +4,6 @@
这个包集中了机器人核心的单例管理器。
通过从这里导入,可以确保在整个应用中访问到的都是同一个实例。
"""
from .admin_manager import AdminManager
from .command_manager import matcher as command_manager
from .permission_manager import PermissionManager
from .plugin_manager import PluginManager
@@ -14,10 +13,7 @@ from .image_manager import ImageManager
# --- 实例化所有单例管理器 ---
# 管理员管理
admin_manager = AdminManager()
# 权限管理器
# 权限管理器(包含了管理员管理功能)
permission_manager = PermissionManager()
# 命令与事件管理器 (别名 matcher)
@@ -37,7 +33,6 @@ browser_manager = BrowserManager()
image_manager = ImageManager()
__all__ = [
"admin_manager",
"permission_manager",
"command_manager",
"matcher",

View File

@@ -1,150 +0,0 @@
"""
管理员管理器模块
该模块负责管理机器人的管理员列表。
它现在以 Redis 作为主要数据源,文件仅用作备份。
"""
import orjson
import os
from typing import Set
from ..utils.logger import logger
from ..utils.singleton import Singleton
from .redis_manager import redis_manager
class AdminManager(Singleton):
"""
管理员管理器类
以 Redis Set 作为管理员列表的唯一真实来源,提供高速的读写能力。
文件 (admin.json) 仅用于首次启动时的数据迁移和作为灾备。
"""
_REDIS_KEY = "neobot:admins" # 用于存储管理员集合的 Redis 键
def __init__(self):
"""
初始化 AdminManager
"""
if hasattr(self, '_initialized') and self._initialized:
return
# 管理员数据文件路径,主要用于备份和首次迁移
self.data_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"data",
"admin.json"
)
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
logger.info("管理员管理器初始化完成")
super().__init__()
async def initialize(self):
"""
异步初始化,检查 Redis 数据,如果为空则尝试从文件迁移
"""
try:
# 检查 Redis 中是否已存在数据
if await redis_manager.redis.exists(self._REDIS_KEY):
admin_count = await redis_manager.redis.scard(self._REDIS_KEY)
logger.info(f"Redis 中已存在管理员数据,共 {admin_count} 位。")
else:
# Redis 为空,尝试从文件迁移
logger.info("Redis 中未找到管理员数据,尝试从 admin.json 文件迁移...")
await self._migrate_from_file_to_redis()
except Exception as e:
logger.error(f"初始化管理员数据时发生错误: {e}")
async def _migrate_from_file_to_redis(self):
"""
从 admin.json 加载管理员列表并存入 Redis
这通常只在首次启动或 Redis 数据丢失时执行一次
"""
admins_to_migrate = set()
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
admins = data.get("admins", [])
admins_to_migrate = set(int(admin_id) for admin_id in admins)
if admins_to_migrate:
await redis_manager.redis.sadd(self._REDIS_KEY, *admins_to_migrate)
logger.success(f"成功从文件迁移 {len(admins_to_migrate)} 位管理员到 Redis。")
else:
logger.info("admin.json 文件为空或不存在,无需迁移。")
except ValueError as e:
logger.error(f"解析 admin.json 失败,无法迁移: {e}")
except Exception as e:
logger.error(f"迁移管理员数据到 Redis 失败: {e}")
async def _save_to_file_backup(self):
"""
将 Redis 中的管理员列表备份到 admin.json
"""
try:
admins = await self.get_all_admins()
admin_list = [str(admin_id) for admin_id in admins]
with open(self.data_file, "w", encoding="utf-8") as f:
f.write(orjson.dumps({"admins": admin_list}, indent=2, ensure_ascii=False).decode('utf-8'))
logger.debug(f"管理员列表已备份到 {self.data_file}")
except Exception as e:
logger.error(f"备份管理员列表到 admin.json 失败: {e}")
async def is_admin(self, user_id: int) -> bool:
"""
检查用户是否为管理员(直接从 Redis 读取)
"""
try:
return await redis_manager.redis.sismember(self._REDIS_KEY, user_id)
except Exception as e:
logger.error(f"从 Redis 检查管理员权限失败: {e}")
return False
async def add_admin(self, user_id: int) -> bool:
"""
添加管理员到 Redis并更新文件备份
"""
try:
# sadd 返回成功添加的成员数量1 表示成功0 表示已存在
if await redis_manager.redis.sadd(self._REDIS_KEY, user_id) == 1:
logger.info(f"已添加新管理员 {user_id} 到 Redis")
await self._save_to_file_backup() # 更新备份
return True
return False # 用户已经是管理员
except Exception as e:
logger.error(f"添加管理员 {user_id} 到 Redis 失败: {e}")
return False
async def remove_admin(self, user_id: int) -> bool:
"""
从 Redis 移除管理员,并更新文件备份
"""
try:
# srem 返回成功移除的成员数量1 表示成功0 表示不存在
if await redis_manager.redis.srem(self._REDIS_KEY, user_id) == 1:
logger.info(f"已从 Redis 移除管理员 {user_id}")
await self._save_to_file_backup() # 更新备份
return True
return False # 用户不是管理员
except Exception as e:
logger.error(f"从 Redis 移除管理员 {user_id} 失败: {e}")
return False
async def get_all_admins(self) -> Set[int]:
"""
从 Redis 获取所有管理员的集合
"""
try:
admins = await redis_manager.redis.smembers(self._REDIS_KEY)
return {int(admin_id) for admin_id in admins}
except Exception as e:
logger.error(f"从 Redis 获取所有管理员失败: {e}")
return set()
# 全局 AdminManager 实例
admin_manager = AdminManager()

View File

@@ -2,15 +2,15 @@
权限管理器模块
该模块负责管理用户权限,支持 admin、op、user 三个权限级别。
Redis Hash 作为主要数据源,文件仅用作备份和首次数据迁移
permissions.json 文件作为主要数据源Redis 用于加速访问
"""
import orjson
import os
from typing import Dict
import json
from typing import Dict, Set
from ..utils.logger import logger
from ..utils.singleton import Singleton
from .admin_manager import admin_manager
from .redis_manager import redis_manager
from ..permission import Permission
@@ -25,10 +25,11 @@ class PermissionManager(Singleton):
"""
权限管理器类
Redis Hash 作为权限数据的唯一真实来源,提供高速的读写能力
文件 (permissions.json) 仅用于首次启动时的数据迁移和作为灾备
permissions.json 文件作为权限数据的主要来源Redis 用于高速缓存访问
所有写操作会同时更新文件和Redis缓存确保数据一致性
"""
_REDIS_KEY = "neobot:permissions" # 用于存储用户权限的 Redis Hash 键
_REDIS_ADMINS_KEY = "neobot:admins" # 用于存储管理员列表的 Redis 键
def __init__(self):
"""
@@ -37,7 +38,7 @@ class PermissionManager(Singleton):
if hasattr(self, '_initialized') and self._initialized:
return
# 权限数据文件路径,主要用于备份和首次迁移
# 权限数据文件路径,作为主要数据源
self.data_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
@@ -46,23 +47,81 @@ class PermissionManager(Singleton):
)
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
# 如果文件不存在,创建默认文件
if not os.path.exists(self.data_file):
default_data = {"users": {}}
with open(self.data_file, "w", encoding="utf-8") as f:
f.write(json.dumps(default_data, indent=2, ensure_ascii=False))
logger.info(f"已创建默认权限文件: {self.data_file}")
logger.info("权限管理器初始化完成")
super().__init__()
async def initialize(self):
"""
异步初始化,检查 Redis 数据,如果为空则尝试从文件迁移
异步初始化,以 permissions.json 文件内容为主,同步到 Redis 缓存
"""
try:
if not await redis_manager.redis.exists(self._REDIS_KEY):
logger.info("Redis 中未找到权限数据,尝试从 permissions.json 文件迁移...")
await self._migrate_from_file_to_redis()
else:
perm_count = await redis_manager.redis.hlen(self._REDIS_KEY)
logger.info(f"Redis 中已存在权限数据,共 {perm_count} 条。")
# 总是以文件内容为主,强制同步到 Redis
logger.info(" permissions.json 文件内容为准,同步到 Redis 缓存...")
await self._sync_file_to_redis()
# 检查 Redis 中的数据量
perm_count = await redis_manager.redis.hlen(self._REDIS_KEY)
admin_count = await redis_manager.redis.scard(self._REDIS_ADMINS_KEY)
logger.info(f"Redis 缓存已同步,权限数据: {perm_count} 条,管理员: {admin_count} 位。")
except Exception as e:
logger.error(f"初始化权限数据时发生错误: {e}")
async def _sync_file_to_redis(self):
"""
将 permissions.json 文件内容同步到 Redis 缓存
"""
try:
# 清空 Redis 中的现有数据
await redis_manager.redis.delete(self._REDIS_KEY)
await redis_manager.redis.delete(self._REDIS_ADMINS_KEY)
# 从文件加载数据
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
users = data.get("users", {})
if users:
# 分离普通权限和管理员权限
normal_perms = {}
admin_ids = set()
for user_id, level_name in users.items():
if level_name == Permission.ADMIN.value:
admin_ids.add(user_id)
else:
normal_perms[user_id] = level_name
# 使用 pipeline 批量写入普通权限
if normal_perms:
async with redis_manager.redis.pipeline(transaction=True) as pipe:
for user_id, level_name in normal_perms.items():
pipe.hset(self._REDIS_KEY, user_id, level_name)
await pipe.execute()
# 使用 pipeline 批量写入管理员
if admin_ids:
await redis_manager.redis.sadd(self._REDIS_ADMINS_KEY, *admin_ids)
logger.success(f"成功同步 {len(users)} 条权限数据到 Redis (普通权限: {len(normal_perms)}, 管理员: {len(admin_ids)})")
else:
logger.info("permissions.json 文件中没有权限数据,已清空 Redis 缓存。")
else:
logger.warning(f"权限文件 {self.data_file} 不存在,已清空 Redis 缓存。")
except ValueError as e:
logger.error(f"解析 permissions.json 失败: {e}")
except Exception as e:
logger.error(f"同步文件到 Redis 失败: {e}")
async def _migrate_from_file_to_redis(self):
"""
从 permissions.json 加载权限数据并存入 Redis Hash
@@ -89,16 +148,53 @@ class PermissionManager(Singleton):
except Exception as e:
logger.error(f"迁移权限数据到 Redis 失败: {e}")
async def _migrate_admins_from_file_to_redis(self):
"""
从 permissions.json 加载管理员列表并存入 Redis
"""
admins_to_migrate = set()
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
# 从 users 字段中查找权限为 admin 的用户
users = data.get("users", {})
for user_id, level_name in users.items():
if level_name == Permission.ADMIN.value:
admins_to_migrate.add(user_id)
# 同时兼容旧版的 admins 字段(如果存在的话)
old_admins = data.get("admins", [])
for admin_id in old_admins:
admins_to_migrate.add(str(admin_id))
if admins_to_migrate:
await redis_manager.redis.sadd(self._REDIS_ADMINS_KEY, *admins_to_migrate)
logger.success(f"成功从文件迁移 {len(admins_to_migrate)} 位管理员到 Redis。")
else:
logger.info("permissions.json 文件中没有管理员数据,无需迁移。")
except ValueError as e:
logger.error(f"解析 permissions.json 失败,无法迁移管理员数据: {e}")
except Exception as e:
logger.error(f"迁移管理员数据到 Redis 失败: {e}")
async def _save_to_file_backup(self):
"""
将 Redis 中的权限数据完整备份到 permissions.json
将 Redis 中的权限数据和管理员列表完整备份到 permissions.json
"""
try:
all_perms = await redis_manager.redis.hgetall(self._REDIS_KEY)
# Redis 返回的是 bytes需要解码
users_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in all_perms.items()}
# 由于Redis连接已设置decode_responses=True所以直接使用字符串
users_data = {k: v for k, v in all_perms.items()}
# 获取Redis中的管理员列表并合并到数据中
all_admins = await redis_manager.redis.smembers(self._REDIS_ADMINS_KEY)
for admin_id in all_admins:
users_data[admin_id] = Permission.ADMIN.value # 管理员拥有最高权限
with open(self.data_file, "w", encoding="utf-8") as f:
f.write(orjson.dumps({"users": users_data}, indent=2, ensure_ascii=False).decode('utf-8'))
f.write(json.dumps({"users": users_data}, indent=2, ensure_ascii=False))
logger.debug(f"权限数据已备份到 {self.data_file}")
except Exception as e:
logger.error(f"备份权限数据到 permissions.json 失败: {e}")
@@ -109,13 +205,16 @@ class PermissionManager(Singleton):
优先检查是否为机器人管理员,然后从 Redis 查询。
"""
if await admin_manager.is_admin(user_id):
return Permission.ADMIN
# 检查用户是否为管理员Redis Set 中的存在性检查)
try:
if await redis_manager.redis.sismember(self._REDIS_ADMINS_KEY, str(user_id)):
return Permission.ADMIN
except Exception as e:
logger.error(f"从 Redis 检查管理员权限失败: {e}")
try:
level_name_bytes = await redis_manager.redis.hget(self._REDIS_KEY, str(user_id))
if level_name_bytes:
level_name = level_name_bytes.decode('utf-8')
level_name = await redis_manager.redis.hget(self._REDIS_KEY, str(user_id))
if level_name:
return _PERMISSIONS.get(level_name, Permission.USER)
except Exception as e:
logger.error(f"从 Redis 获取用户 {user_id} 权限失败: {e}")
@@ -124,28 +223,62 @@ class PermissionManager(Singleton):
async def set_user_permission(self, user_id: int, permission: Permission) -> None:
"""
在 Redis 中设置指定用户的权限级别,更新文件备份
设置指定用户的权限级别,首先更新文件,然后同步到 Redis 缓存
"""
if not isinstance(permission, Permission):
raise ValueError(f"无效的权限对象: {permission}")
try:
await redis_manager.redis.hset(self._REDIS_KEY, str(user_id), permission.value)
await self._save_to_file_backup()
logger.info(f"已在 Redis 中设置用户 {user_id} 的权限为 {permission.value}")
# 首先从文件加载当前数据
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
else:
data = {"users": {}}
# 更新权限数据
data["users"][str(user_id)] = permission.value
# 原子性写入文件
temp_file = self.data_file + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
os.replace(temp_file, self.data_file) # 原子操作
# 同步到 Redis
await self._sync_file_to_redis()
logger.info(f"已设置用户 {user_id} 的权限为 {permission.value},并同步到 Redis")
except Exception as e:
logger.error(f"在 Redis 中设置用户 {user_id} 权限失败: {e}")
logger.error(f"设置用户 {user_id} 权限失败: {e}")
async def remove_user(self, user_id: int) -> None:
"""
Redis 中移除指定用户的权限设置,并更新文件备份
权限设置中移除指定用户,首先更新文件,然后同步到 Redis 缓存
"""
try:
if await redis_manager.redis.hdel(self._REDIS_KEY, str(user_id)):
await self._save_to_file_backup()
logger.info(f"已从 Redis 中移除用户 {user_id} 的权限设置")
# 首先从文件加载当前数据
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
else:
data = {"users": {}}
# 从权限数据中移除用户
user_id_str = str(user_id)
if user_id_str in data["users"]:
del data["users"][user_id_str]
# 原子性写入文件
temp_file = self.data_file + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
os.replace(temp_file, self.data_file) # 原子操作
# 同步到 Redis
await self._sync_file_to_redis()
logger.info(f"已从权限设置中移除用户 {user_id},并同步到 Redis")
except Exception as e:
logger.error(f"从 Redis 移除用户 {user_id} 权限失败: {e}")
logger.error(f"移除用户 {user_id} 权限失败: {e}")
async def check_permission(self, user_id: int, required_permission: Permission) -> bool:
"""
@@ -162,19 +295,20 @@ class PermissionManager(Singleton):
async def get_all_user_permissions(self) -> Dict[str, str]:
"""
获取所有已配置的用户权限(合并 Redis 和 AdminManager
获取所有已配置的用户权限(合并普通权限和管理员
"""
permissions = {}
try:
# 从 Redis 获取基础权限
all_perms = await redis_manager.redis.hgetall(self._REDIS_KEY)
permissions = {k.decode('utf-8'): v.decode('utf-8') for k, v in all_perms.items()}
# 由于Redis连接已设置decode_responses=True所以直接使用字符串
permissions = {k: v for k, v in all_perms.items()}
except Exception as e:
logger.error(f"从 Redis 获取所有权限失败: {e}")
# 合并 AdminManager 中的管理员ADMIN 权限覆盖一切
# 获取 Redis 中的管理员列表并添加到权限字典中
try:
admins = await admin_manager.get_all_admins()
admins = await redis_manager.redis.smembers(self._REDIS_ADMINS_KEY)
for admin_id in admins:
permissions[str(admin_id)] = Permission.ADMIN.value
except Exception as e:
@@ -182,16 +316,115 @@ class PermissionManager(Singleton):
return permissions
async def clear_all(self) -> None:
async def is_admin(self, user_id: int) -> bool:
"""
清空 Redis 中的所有权限设置,并更新备份文件
检查用户是否为管理员
"""
try:
await redis_manager.redis.delete(self._REDIS_KEY)
await self._save_to_file_backup()
logger.info("已清空 Redis 中的所有权限设置")
return await redis_manager.redis.sismember(self._REDIS_ADMINS_KEY, str(user_id))
except Exception as e:
logger.error(f"清空 Redis 权限数据失败: {e}")
logger.error(f" Redis 检查管理员权限失败: {e}")
return False
async def add_admin(self, user_id: int) -> bool:
"""
添加管理员,首先更新文件,然后同步到 Redis 缓存
"""
try:
# 首先从文件加载当前数据
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
else:
data = {"users": {}}
user_id_str = str(user_id)
# 检查用户是否已经是管理员
if data["users"].get(user_id_str) == Permission.ADMIN.value:
return False # 用户已经是管理员
# 更新权限数据为管理员
data["users"][user_id_str] = Permission.ADMIN.value
# 原子性写入文件
temp_file = self.data_file + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
os.replace(temp_file, self.data_file) # 原子操作
# 同步到 Redis
await self._sync_file_to_redis()
logger.info(f"已添加新管理员 {user_id},并同步到 Redis")
return True
except Exception as e:
logger.error(f"添加管理员 {user_id} 失败: {e}")
return False
async def remove_admin(self, user_id: int) -> bool:
"""
从管理员列表中移除用户,首先更新文件,然后同步到 Redis 缓存
"""
try:
# 首先从文件加载当前数据
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = orjson.loads(f.read())
else:
data = {"users": {}}
user_id_str = str(user_id)
# 检查用户是否是管理员
if data["users"].get(user_id_str) != Permission.ADMIN.value:
return False # 用户不是管理员
# 将管理员降级为普通用户(或者可以选择完全移除权限)
# 这里我们将其设置为USER权限
data["users"][user_id_str] = Permission.USER.value
# 原子性写入文件
temp_file = self.data_file + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
os.replace(temp_file, self.data_file) # 原子操作
# 同步到 Redis
await self._sync_file_to_redis()
logger.info(f"已从管理员列表中移除用户 {user_id},并同步到 Redis")
return True
except Exception as e:
logger.error(f"移除管理员 {user_id} 失败: {e}")
return False
async def get_all_admins(self) -> Set[int]:
"""
从 Redis 获取所有管理员的集合
"""
try:
admins = await redis_manager.redis.smembers(self._REDIS_ADMINS_KEY)
return {int(admin_id) for admin_id in admins}
except Exception as e:
logger.error(f"从 Redis 获取所有管理员失败: {e}")
return set()
async def clear_all(self) -> None:
"""
清空所有权限设置,首先更新文件,然后同步到 Redis 缓存
"""
try:
# 创建空的权限数据
empty_data = {"users": {}}
# 原子性写入文件
temp_file = self.data_file + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(json.dumps(empty_data, indent=2, ensure_ascii=False))
os.replace(temp_file, self.data_file) # 原子操作
# 同步到 Redis
await self._sync_file_to_redis()
logger.info("已清空所有权限设置,并同步到 Redis")
except Exception as e:
logger.error(f"清空权限数据失败: {e}")
def require_admin(func):
@@ -205,7 +438,7 @@ def require_admin(func):
@wraps(func)
async def wrapper(event: MessageEvent, *args, **kwargs):
user_id = event.user_id
if await permission_manager.check_permission(user_id, Permission.ADMIN):
if await permission_manager.is_admin(user_id):
return await func(event, *args, **kwargs)
else:
# 假设 event 对象有 reply 方法

View File

@@ -0,0 +1,135 @@
# Redis 原子操作与数据一致性
## 概述
NEO Bot 的权限管理系统采用了文件为主、Redis 为辅的架构设计,确保数据可靠性和高性能访问的平衡。为了保证数据一致性,系统在所有写操作中都实现了原子操作机制。
## 设计理念
### 以文件为权威数据源
- **主数据源**: `core/data/permissions.json` 作为权限数据的权威来源
- **缓存层**: Redis 作为高速缓存,提供快速访问能力
- **一致性保障**: 所有写操作都以文件为准,再同步到 Redis
### 原子操作实现
所有权限管理的写操作都遵循以下原子操作模式:
1. **读取当前状态**: 从 `permissions.json` 读取当前数据
2. **内存中修改**: 在内存中完成数据修改
3. **原子写入**: 使用临时文件 + 原子重命名的方式写入磁盘
4. **缓存同步**: 将修改同步到 Redis 缓存
## 核心实现细节
### 原子文件写入
```python
# 原子写入操作示例
temp_file = self.data_file + ".tmp"
with open(temp_file, "w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
os.replace(temp_file, self.data_file) # 原子操作
```
- 使用临时文件避免写入过程中数据损坏
- `os.replace()` 确保操作的原子性
- 即使在写入过程中断电,也不会破坏原文件
### Redis 同步机制
```python
# 同步文件内容到 Redis
async def _sync_file_to_redis(self):
# 清空 Redis 中的现有数据
await redis_manager.redis.delete(self._REDIS_KEY)
await redis_manager.redis.delete(self._REDIS_ADMINS_KEY)
# 从文件加载数据并同步到 Redis
# ...
```
- 使用 Redis 管道操作提高批量写入效率
- 确保 Redis 与文件数据的一致性
## 支持的操作
### 权限管理操作
- `set_user_permission()`: 设置用户权限
- `remove_user()`: 移除用户权限
- `add_admin()`: 添加管理员
- `remove_admin()`: 移除管理员
- `clear_all()`: 清空所有权限
### 数据隔离
- 普通权限存储在 Redis Hash (`neobot:permissions`) 中
- 管理员列表存储在 Redis Set (`neobot:admins`) 中
- 避免数据冲突,提高查询效率
## 性能优化
### 读取优化
- 读取操作直接从 Redis 缓存获取,毫秒级响应
- 减少磁盘 I/O提升系统性能
### 批量操作
- 使用 Redis 管道进行批量操作
- 减少网络往返次数,提高吞吐量
## 错误处理
### 异常恢复
- 文件写入失败时,保留原数据不丢失
- Redis 操作失败时,不影响文件数据
- 提供详细的错误日志便于排查
### 数据校验
- 写入前校验数据格式
- 防止非法数据进入系统
## 最佳实践
### 插件开发建议
```python
# 在插件中使用权限管理器
from core.managers import permission_manager
from core.permission import Permission
# 查询权限
user_perm = await permission_manager.get_user_permission(user_id)
# 检查权限
if await permission_manager.check_permission(user_id, Permission.ADMIN):
# 执行管理员操作
pass
```
### 高并发场景
- 系统设计支持高并发读取
- 写操作频率较低,适合权限管理场景
- Redis 缓存有效缓解数据库压力
## 故障恢复
### Redis 故障
- Redis 不可用时,系统仍可通过文件数据提供服务
- Redis 恢复后,自动从文件同步最新数据
### 文件损坏
- 系统定期备份权限数据
- 可从历史备份中恢复数据
## 总结
通过以文件为权威数据源、Redis 为缓存层的设计结合原子操作机制NEO Bot 的权限管理系统在保证数据可靠性的同时,提供了高性能的访问能力。这种设计既满足了数据一致性的要求,又兼顾了系统性能的需求。

View File

@@ -30,14 +30,9 @@
* **管啥**:
* **划分三六九等**: `ADMIN`, `OP`, `USER` 这些等级都是它定的。
* **管理权限**: 谁有啥权限,都记在 `core/data/permissions.json` 里。
* **会自动变通**: 查权限的时候,它会把 `AdminManager` 里的超管也当成 `ADMIN`
### 3. `AdminManager` (`admin_manager`)
* **怎么找**: `from core.managers.admin_manager import admin_manager`
* **管啥**:
* **钦差大臣**: 专门管机器人的超级管理员,增删改查都在这。
* **三级缓存**: 内存 -> Redis -> 文件
* **管理员管理**: 超级管理员的增删改查也在这里,统一管理
* **双重存储**: 普通权限存储在 Redis Hash 中,管理员列表存储在 Redis Set 中。
* **原子操作**: 所有写操作都采用原子操作,确保数据一致性。详见 [Redis 原子操作与数据一致性](./redis-atomic-operations.md)
### 4. `PluginManager`

View File

@@ -17,6 +17,7 @@
* [性能优化](./core-concepts/performance.md): 页面池、JIT、Mypyc...
* [消息流](./core-concepts/event-flow.md): 看看一条消息从被接收到被回复是如何运行的
* [核心](./core-concepts/singleton-managers.md): `matcher`, `browser_manager`... 认识这些核心模块。
* [Redis 原子操作与数据一致性](./core-concepts/redis-atomic-operations.md): 权限管理系统的原子操作实现,确保数据一致性
* [错误处理](./core-concepts/error-handling.md): 了解系统的错误处理机制和错误码定义。
### 3. API 参考

View File

@@ -14,9 +14,8 @@ from watchdog.events import FileSystemEventHandler
from core.utils.logger import logger
# 核心模块导入
from core.managers.admin_manager import admin_manager
from core.ws import WS
from core.managers import plugin_manager, matcher
from core.managers import plugin_manager, matcher, permission_manager
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
@@ -172,8 +171,8 @@ async def main():
# 同步帮助图片
await matcher.sync_help_pic()
# 初始化管理员管理
await admin_manager.initialize()
# 初始化权限管理器(包含了管理员管理功能)
await permission_manager.initialize()
# 初始化浏览器管理器 (使用页面池)
await browser_manager.init_pool(size=3)

View File

@@ -46,10 +46,9 @@ def main():
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/managers/permission_manager.py', # 权限管理(包含管理员管理功能)
'core/ws.py', # WebSocket 核心
'core/managers/plugin_manager.py', # 插件管理器
'core/managers/plugin_manager.py', # 插件管理器
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
]

View File

@@ -26,8 +26,7 @@ modules = [
# 核心管理模块
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/managers/permission_manager.py', # 权限管理(包含管理员管理功能)
'core/managers/plugin_manager.py', # 插件管理器
# 核心基础模块