refactor(redis_manager): 移除冗余的ConnectionError处理 refactor(event_handler): 优化Bot类型注解 refactor(factory): 移除未使用的GroupCardNoticeEvent test: 添加全面的单元测试覆盖 - 添加test_import.py测试模块导入 - 添加test_debug.py测试插件加载调试 - 添加test_plugin_error.py测试错误处理 - 添加test_config_loader.py测试配置加载 - 添加test_redis_manager.py测试Redis管理 - 添加test_bot.py测试Bot功能 - 扩展test_models.py测试消息模型 - 添加test_plugin_manager_coverage.py测试插件管理 - 添加test_executor.py测试代码执行器 - 添加test_ws.py测试WebSocket - 添加test_api.py测试API接口 - 添加test_core_managers.py测试核心管理模块 fix(plugin_manager): 修复插件加载日志变量问题 覆盖率已到达86%(忽略插件)
138 lines
5.0 KiB
Python
138 lines
5.0 KiB
Python
import pytest
|
|
from unittest.mock import MagicMock, patch, AsyncMock
|
|
from core.managers.redis_manager import RedisManager
|
|
|
|
|
|
class TestRedisManager:
|
|
def test_singleton_pattern(self):
|
|
"""测试单例模式。"""
|
|
instance1 = RedisManager()
|
|
instance2 = RedisManager()
|
|
assert instance1 is instance2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_success(self):
|
|
"""测试 Redis 初始化成功。"""
|
|
# 重置单例
|
|
if hasattr(RedisManager, "_instance"):
|
|
del RedisManager._instance
|
|
# 确保类有 _instance 属性
|
|
if not hasattr(RedisManager, "_instance"):
|
|
RedisManager._instance = None
|
|
# 重置 Redis 连接
|
|
RedisManager._redis = None
|
|
|
|
# 模拟全局配置
|
|
with patch('core.managers.redis_manager.config') as mock_config:
|
|
mock_config.redis.host = "localhost"
|
|
mock_config.redis.port = 6379
|
|
mock_config.redis.db = 0
|
|
mock_config.redis.password = "test_password"
|
|
|
|
# 模拟 Redis 客户端
|
|
with patch('core.managers.redis_manager.redis') as mock_redis_module:
|
|
mock_redis = AsyncMock()
|
|
mock_redis.ping.return_value = True
|
|
mock_redis_module.Redis.return_value = mock_redis
|
|
|
|
manager = RedisManager()
|
|
await manager.initialize()
|
|
|
|
# 验证 Redis 连接
|
|
mock_redis_module.Redis.assert_called_once_with(
|
|
host="localhost",
|
|
port=6379,
|
|
db=0,
|
|
password="test_password",
|
|
decode_responses=True
|
|
)
|
|
mock_redis.ping.assert_called_once()
|
|
assert manager._redis is mock_redis
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_connection_error(self):
|
|
"""测试 Redis 连接失败。"""
|
|
# 重置单例
|
|
if hasattr(RedisManager, "_instance"):
|
|
del RedisManager._instance
|
|
# 确保类有 _instance 属性
|
|
if not hasattr(RedisManager, "_instance"):
|
|
RedisManager._instance = None
|
|
# 重置 Redis 连接
|
|
RedisManager._redis = None
|
|
|
|
# 模拟全局配置
|
|
with patch('core.managers.redis_manager.config') as mock_config:
|
|
mock_config.redis.host = "localhost"
|
|
mock_config.redis.port = 6379
|
|
mock_config.redis.db = 0
|
|
mock_config.redis.password = "test_password"
|
|
|
|
# 模拟 Redis 连接错误
|
|
with patch('core.managers.redis_manager.redis') as mock_redis_module:
|
|
mock_redis_module.Redis.side_effect = Exception("Connection refused")
|
|
|
|
manager = RedisManager()
|
|
await manager.initialize()
|
|
|
|
# 验证 Redis 未初始化
|
|
assert manager._redis is None
|
|
|
|
def test_redis_property_uninitialized(self):
|
|
"""测试 Redis 属性在未初始化时抛出异常。"""
|
|
# 重置单例
|
|
if hasattr(RedisManager, "_instance"):
|
|
del RedisManager._instance
|
|
# 确保类有 _instance 属性
|
|
if not hasattr(RedisManager, "_instance"):
|
|
RedisManager._instance = None
|
|
# 重置 Redis 连接
|
|
RedisManager._redis = None
|
|
|
|
manager = RedisManager()
|
|
manager._redis = None
|
|
|
|
with pytest.raises(ConnectionError, match="Redis 未初始化或连接失败,请先调用 initialize()"):
|
|
_ = manager.redis
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_method(self):
|
|
"""测试 get 方法。"""
|
|
# 重置单例
|
|
if hasattr(RedisManager, "_instance"):
|
|
del RedisManager._instance
|
|
# 确保类有 _instance 属性
|
|
if not hasattr(RedisManager, "_instance"):
|
|
RedisManager._instance = None
|
|
# 重置 Redis 连接
|
|
RedisManager._redis = None
|
|
|
|
manager = RedisManager()
|
|
mock_redis = AsyncMock()
|
|
mock_redis.get.return_value = "test_value"
|
|
manager._redis = mock_redis
|
|
|
|
result = await manager.get("test_key")
|
|
assert result == "test_value"
|
|
mock_redis.get.assert_called_once_with("test_key")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_method(self):
|
|
"""测试 set 方法。"""
|
|
# 重置单例
|
|
if hasattr(RedisManager, "_instance"):
|
|
del RedisManager._instance
|
|
# 确保类有 _instance 属性
|
|
if not hasattr(RedisManager, "_instance"):
|
|
RedisManager._instance = None
|
|
# 重置 Redis 连接
|
|
RedisManager._redis = None
|
|
|
|
manager = RedisManager()
|
|
mock_redis = AsyncMock()
|
|
mock_redis.set.return_value = True
|
|
manager._redis = mock_redis
|
|
|
|
result = await manager.set("test_key", "test_value", ex=3600)
|
|
assert result is True
|
|
mock_redis.set.assert_called_once_with("test_key", "test_value", ex=3600) |