refactor(redis_manager): 移除冗余的ConnectionError处理 refactor(event_handler): 优化Bot类型注解 refactor(factory): 移除未使用的GroupCardNoticeEvent test: 添加全面的单元测试覆盖 - 添加test_import.py测试模块导入 - 添加test_debug.py测试插件加载调试 - 添加test_plugin_error.py测试错误处理 - 添加test_config_loader.py测试配置加载 - 添加test_redis_manager.py测试Redis管理 - 添加test_bot.py测试Bot功能 - 扩展test_models.py测试消息模型 - 添加test_plugin_manager_coverage.py测试插件管理 - 添加test_executor.py测试代码执行器 - 添加test_ws.py测试WebSocket - 添加test_api.py测试API接口 - 添加test_core_managers.py测试核心管理模块 fix(plugin_manager): 修复插件加载日志变量问题 覆盖率已到达86%(忽略插件)
69 lines
2.0 KiB
Python
69 lines
2.0 KiB
Python
import redis.asyncio as redis
|
|
from ..config_loader import global_config as config
|
|
from ..utils.logger import logger
|
|
|
|
class RedisManager:
|
|
"""
|
|
Redis 连接管理器(异步单例)
|
|
"""
|
|
_instance = None
|
|
_redis = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
async def initialize(self):
|
|
"""
|
|
异步初始化 Redis 连接并进行健康检查
|
|
"""
|
|
if self._redis is None:
|
|
try:
|
|
redis_config = config.redis
|
|
host = redis_config.host
|
|
port = redis_config.port
|
|
db = redis_config.db
|
|
password = redis_config.password
|
|
|
|
logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
|
|
|
|
self._redis = redis.Redis(
|
|
host=host,
|
|
port=port,
|
|
db=db,
|
|
password=password,
|
|
decode_responses=True
|
|
)
|
|
if await self._redis.ping():
|
|
logger.success("Redis 连接成功!")
|
|
else:
|
|
logger.error("Redis 连接失败: PING 命令无响应")
|
|
except Exception as e:
|
|
logger.exception(f"Redis 初始化时发生未知错误: {e}")
|
|
self._redis = None
|
|
|
|
@property
|
|
def redis(self):
|
|
"""
|
|
获取 Redis 连接实例
|
|
"""
|
|
if self._redis is None:
|
|
raise ConnectionError("Redis 未初始化或连接失败,请先调用 initialize()")
|
|
return self._redis
|
|
|
|
async def get(self, name):
|
|
"""
|
|
获取指定键的值
|
|
"""
|
|
return await self.redis.get(name)
|
|
|
|
async def set(self, name, value, ex=None):
|
|
"""
|
|
设置指定键的值
|
|
"""
|
|
return await self.redis.set(name, value, ex=ex)
|
|
|
|
# 全局 Redis 管理器实例
|
|
redis_manager = RedisManager()
|