- 新增 `/status` 指令,展示机器人运行状态和系统指标 - 实现Redis Lua脚本支持原子化计数器操作 - 添加消息收发统计功能 - 完善文档,包括插件开发和性能优化指南 - 重构WebSocket连接池,增加健康检查机制 - 移除旧版编译脚本,优化项目结构
94 lines
2.8 KiB
Python
94 lines
2.8 KiB
Python
import redis.asyncio as redis
|
|
from ..config_loader import global_config as config
|
|
from ..utils.logger import logger
|
|
from ..utils.singleton import Singleton
|
|
|
|
class RedisManager(Singleton):
|
|
"""
|
|
Redis 连接管理器(异步单例)
|
|
"""
|
|
_redis = None
|
|
|
|
def __init__(self):
|
|
"""
|
|
初始化 Redis 管理器
|
|
"""
|
|
# 调用父类 __init__ 确保单例初始化
|
|
super().__init__()
|
|
|
|
async def initialize(self):
|
|
"""
|
|
异步初始化 Redis 连接并进行健康检查
|
|
"""
|
|
if self._redis is None:
|
|
try:
|
|
redis_config = config.redis
|
|
host = redis_config.host
|
|
port = redis_config.port
|
|
db = redis_config.db
|
|
password = redis_config.password
|
|
|
|
logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
|
|
|
|
self._redis = redis.Redis(
|
|
host=host,
|
|
port=port,
|
|
db=db,
|
|
password=password,
|
|
decode_responses=True,
|
|
ssl=False
|
|
)
|
|
if await self._redis.ping():
|
|
logger.success("Redis 连接成功!")
|
|
else:
|
|
logger.error("Redis 连接失败: PING 命令无响应")
|
|
except Exception as e:
|
|
logger.exception(f"Redis 初始化时发生未知错误: {e}")
|
|
self._redis = None
|
|
|
|
@property
|
|
def redis(self):
|
|
"""
|
|
获取 Redis 连接实例
|
|
"""
|
|
if self._redis is None:
|
|
raise ConnectionError("Redis 未初始化或连接失败,请先调用 initialize()")
|
|
return self._redis
|
|
|
|
async def get(self, name):
|
|
"""
|
|
获取指定键的值
|
|
"""
|
|
return await self.redis.get(name)
|
|
|
|
async def set(self, name, value, ex=None):
|
|
"""
|
|
设置指定键的值
|
|
"""
|
|
return await self.redis.set(name, value, ex=ex)
|
|
|
|
async def execute_lua_script(self, script: str, keys: list, args: list):
|
|
"""
|
|
以原子方式执行 Lua 脚本
|
|
|
|
Args:
|
|
script (str): 要执行的 Lua 脚本字符串
|
|
keys (list): 脚本中使用的 Redis 键 (KEYS[1], KEYS[2], ...)
|
|
args (list): 传递给脚本的参数 (ARGV[1], ARGV[2], ...)
|
|
|
|
Returns:
|
|
Any: 脚本的返回值
|
|
"""
|
|
try:
|
|
# redis-py 内部会自动处理脚本的缓存 (EVAL/EVALSHA)
|
|
lua_script = self.redis.register_script(script)
|
|
return await lua_script(keys=keys, args=args)
|
|
except Exception as e:
|
|
logger.error(f"执行 Lua 脚本失败: {e}")
|
|
logger.debug(f"脚本内容: {script}")
|
|
raise
|
|
|
|
|
|
# 全局 Redis 管理器实例
|
|
redis_manager = RedisManager()
|