Files
NeoBot/core/managers/redis_manager.py
2026-01-19 14:24:52 +08:00

70 lines
2.0 KiB
Python

import redis.asyncio as redis
from ..config_loader import global_config as config
from ..utils.logger import logger
class RedisManager:
"""
Redis 连接管理器(异步单例)
"""
_instance = None
_redis = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def initialize(self):
"""
异步初始化 Redis 连接并进行健康检查
"""
if self._redis is None:
try:
redis_config = config.redis
host = redis_config.host
port = redis_config.port
db = redis_config.db
password = redis_config.password
logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
self._redis = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True,
ssl=False
)
if await self._redis.ping():
logger.success("Redis 连接成功!")
else:
logger.error("Redis 连接失败: PING 命令无响应")
except Exception as e:
logger.exception(f"Redis 初始化时发生未知错误: {e}")
self._redis = None
@property
def redis(self):
"""
获取 Redis 连接实例
"""
if self._redis is None:
raise ConnectionError("Redis 未初始化或连接失败,请先调用 initialize()")
return self._redis
async def get(self, name):
"""
获取指定键的值
"""
return await self.redis.get(name)
async def set(self, name, value, ex=None):
"""
设置指定键的值
"""
return await self.redis.set(name, value, ex=ex)
# 全局 Redis 管理器实例
redis_manager = RedisManager()