Files
NeoBot/core/managers/redis_manager.py
K2cr2O1 d458413e4b feat: 添加状态监控插件和Redis原子操作支持
- 新增 `/status` 指令,展示机器人运行状态和系统指标
- 实现Redis Lua脚本支持原子化计数器操作
- 添加消息收发统计功能
- 完善文档,包括插件开发和性能优化指南
- 重构WebSocket连接池,增加健康检查机制
- 移除旧版编译脚本,优化项目结构
2026-01-23 15:54:45 +08:00

94 lines
2.8 KiB
Python

import redis.asyncio as redis
from ..config_loader import global_config as config
from ..utils.logger import logger
from ..utils.singleton import Singleton
class RedisManager(Singleton):
"""
Redis 连接管理器(异步单例)
"""
_redis = None
def __init__(self):
"""
初始化 Redis 管理器
"""
# 调用父类 __init__ 确保单例初始化
super().__init__()
async def initialize(self):
"""
异步初始化 Redis 连接并进行健康检查
"""
if self._redis is None:
try:
redis_config = config.redis
host = redis_config.host
port = redis_config.port
db = redis_config.db
password = redis_config.password
logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
self._redis = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
ssl=False
)
if await self._redis.ping():
logger.success("Redis 连接成功!")
else:
logger.error("Redis 连接失败: PING 命令无响应")
except Exception as e:
logger.exception(f"Redis 初始化时发生未知错误: {e}")
self._redis = None
@property
def redis(self):
"""
获取 Redis 连接实例
"""
if self._redis is None:
raise ConnectionError("Redis 未初始化或连接失败,请先调用 initialize()")
return self._redis
async def get(self, name):
"""
获取指定键的值
"""
return await self.redis.get(name)
async def set(self, name, value, ex=None):
"""
设置指定键的值
"""
return await self.redis.set(name, value, ex=ex)
async def execute_lua_script(self, script: str, keys: list, args: list):
"""
以原子方式执行 Lua 脚本
Args:
script (str): 要执行的 Lua 脚本字符串
keys (list): 脚本中使用的 Redis 键 (KEYS[1], KEYS[2], ...)
args (list): 传递给脚本的参数 (ARGV[1], ARGV[2], ...)
Returns:
Any: 脚本的返回值
"""
try:
# redis-py 内部会自动处理脚本的缓存 (EVAL/EVALSHA)
lua_script = self.redis.register_script(script)
return await lua_script(keys=keys, args=args)
except Exception as e:
logger.error(f"执行 Lua 脚本失败: {e}")
logger.debug(f"脚本内容: {script}")
raise
# 全局 Redis 管理器实例
redis_manager = RedisManager()