import pytest from unittest.mock import MagicMock, patch, AsyncMock from core.managers.redis_manager import RedisManager class TestRedisManager: def test_singleton_pattern(self): """测试单例模式。""" instance1 = RedisManager() instance2 = RedisManager() assert instance1 is instance2 @pytest.mark.asyncio async def test_initialize_success(self): """测试 Redis 初始化成功。""" # 重置单例 if hasattr(RedisManager, "_instance"): del RedisManager._instance # 确保类有 _instance 属性 if not hasattr(RedisManager, "_instance"): RedisManager._instance = None # 重置 Redis 连接 RedisManager._redis = None # 模拟全局配置 with patch('core.managers.redis_manager.config') as mock_config: mock_config.redis.host = "localhost" mock_config.redis.port = 6379 mock_config.redis.db = 0 mock_config.redis.password = "test_password" # 模拟 Redis 客户端 with patch('core.managers.redis_manager.redis') as mock_redis_module: mock_redis = AsyncMock() mock_redis.ping.return_value = True mock_redis_module.Redis.return_value = mock_redis manager = RedisManager() await manager.initialize() # 验证 Redis 连接 mock_redis_module.Redis.assert_called_once_with( host="localhost", port=6379, db=0, password="test_password", decode_responses=True ) mock_redis.ping.assert_called_once() assert manager._redis is mock_redis @pytest.mark.asyncio async def test_initialize_connection_error(self): """测试 Redis 连接失败。""" # 重置单例 if hasattr(RedisManager, "_instance"): del RedisManager._instance # 确保类有 _instance 属性 if not hasattr(RedisManager, "_instance"): RedisManager._instance = None # 重置 Redis 连接 RedisManager._redis = None # 模拟全局配置 with patch('core.managers.redis_manager.config') as mock_config: mock_config.redis.host = "localhost" mock_config.redis.port = 6379 mock_config.redis.db = 0 mock_config.redis.password = "test_password" # 模拟 Redis 连接错误 with patch('core.managers.redis_manager.redis') as mock_redis_module: mock_redis_module.Redis.side_effect = Exception("Connection refused") manager = RedisManager() await manager.initialize() # 验证 Redis 未初始化 assert manager._redis is None def test_redis_property_uninitialized(self): """测试 Redis 属性在未初始化时抛出异常。""" # 重置单例 if hasattr(RedisManager, "_instance"): del RedisManager._instance # 确保类有 _instance 属性 if not hasattr(RedisManager, "_instance"): RedisManager._instance = None # 重置 Redis 连接 RedisManager._redis = None manager = RedisManager() manager._redis = None with pytest.raises(ConnectionError, match="Redis 未初始化或连接失败,请先调用 initialize()"): _ = manager.redis @pytest.mark.asyncio async def test_get_method(self): """测试 get 方法。""" # 重置单例 if hasattr(RedisManager, "_instance"): del RedisManager._instance # 确保类有 _instance 属性 if not hasattr(RedisManager, "_instance"): RedisManager._instance = None # 重置 Redis 连接 RedisManager._redis = None manager = RedisManager() mock_redis = AsyncMock() mock_redis.get.return_value = "test_value" manager._redis = mock_redis result = await manager.get("test_key") assert result == "test_value" mock_redis.get.assert_called_once_with("test_key") @pytest.mark.asyncio async def test_set_method(self): """测试 set 方法。""" # 重置单例 if hasattr(RedisManager, "_instance"): del RedisManager._instance # 确保类有 _instance 属性 if not hasattr(RedisManager, "_instance"): RedisManager._instance = None # 重置 Redis 连接 RedisManager._redis = None manager = RedisManager() mock_redis = AsyncMock() mock_redis.set.return_value = True manager._redis = mock_redis result = await manager.set("test_key", "test_value", ex=3600) assert result is True mock_redis.set.assert_called_once_with("test_key", "test_value", ex=3600)