import redis from .config_loader import global_config as config from .logger import logger class RedisManager: """ Redis 连接管理器 """ _pool = None _client = None @classmethod def initialize(cls): """ 初始化 Redis 连接并进行健康检查 """ if cls._pool is None: try: host = config.redis['host'] port = config.redis['port'] db = config.redis['db'] password = config.redis.get('password') logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}") cls._pool = redis.ConnectionPool( host=host, port=port, db=db, password=password, decode_responses=True ) cls._client = redis.Redis(connection_pool=cls._pool) if cls._client.ping(): logger.success("Redis 连接成功!") else: logger.error("Redis 连接失败: PING 命令无响应") except redis.exceptions.ConnectionError as e: logger.error(f"Redis 连接失败: {e}") cls._pool = None cls._client = None except Exception as e: logger.exception(f"Redis 初始化时发生未知错误: {e}") cls._pool = None cls._client = None @classmethod def get_redis(cls): """ 获取 Redis 连接 :return: Redis 连接实例 """ if cls._client is None: # 理论上 initialize 应该在程序启动时被调用,这里作为备用 cls.initialize() return cls._client # 在模块加载时直接初始化 RedisManager.initialize() redis_client = RedisManager.get_redis()