feat: 添加测试覆盖率并修复相关问题
refactor(redis_manager): 移除冗余的ConnectionError处理 refactor(event_handler): 优化Bot类型注解 refactor(factory): 移除未使用的GroupCardNoticeEvent test: 添加全面的单元测试覆盖 - 添加test_import.py测试模块导入 - 添加test_debug.py测试插件加载调试 - 添加test_plugin_error.py测试错误处理 - 添加test_config_loader.py测试配置加载 - 添加test_redis_manager.py测试Redis管理 - 添加test_bot.py测试Bot功能 - 扩展test_models.py测试消息模型 - 添加test_plugin_manager_coverage.py测试插件管理 - 添加test_executor.py测试代码执行器 - 添加test_ws.py测试WebSocket - 添加test_api.py测试API接口 - 添加test_core_managers.py测试核心管理模块 fix(plugin_manager): 修复插件加载日志变量问题 覆盖率已到达86%(忽略插件)
This commit is contained in:
138
tests/test_redis_manager.py
Normal file
138
tests/test_redis_manager.py
Normal file
@@ -0,0 +1,138 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch, AsyncMock
|
||||
from core.managers.redis_manager import RedisManager
|
||||
|
||||
|
||||
class TestRedisManager:
|
||||
def test_singleton_pattern(self):
|
||||
"""测试单例模式。"""
|
||||
instance1 = RedisManager()
|
||||
instance2 = RedisManager()
|
||||
assert instance1 is instance2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialize_success(self):
|
||||
"""测试 Redis 初始化成功。"""
|
||||
# 重置单例
|
||||
if hasattr(RedisManager, "_instance"):
|
||||
del RedisManager._instance
|
||||
# 确保类有 _instance 属性
|
||||
if not hasattr(RedisManager, "_instance"):
|
||||
RedisManager._instance = None
|
||||
# 重置 Redis 连接
|
||||
RedisManager._redis = None
|
||||
|
||||
# 模拟全局配置
|
||||
with patch('core.managers.redis_manager.config') as mock_config:
|
||||
mock_config.redis.host = "localhost"
|
||||
mock_config.redis.port = 6379
|
||||
mock_config.redis.db = 0
|
||||
mock_config.redis.password = "test_password"
|
||||
|
||||
# 模拟 Redis 客户端
|
||||
with patch('core.managers.redis_manager.redis') as mock_redis_module:
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.ping.return_value = True
|
||||
mock_redis_module.Redis.return_value = mock_redis
|
||||
|
||||
manager = RedisManager()
|
||||
await manager.initialize()
|
||||
|
||||
# 验证 Redis 连接
|
||||
mock_redis_module.Redis.assert_called_once_with(
|
||||
host="localhost",
|
||||
port=6379,
|
||||
db=0,
|
||||
password="test_password",
|
||||
decode_responses=True
|
||||
)
|
||||
mock_redis.ping.assert_called_once()
|
||||
assert manager._redis is mock_redis
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialize_connection_error(self):
|
||||
"""测试 Redis 连接失败。"""
|
||||
# 重置单例
|
||||
if hasattr(RedisManager, "_instance"):
|
||||
del RedisManager._instance
|
||||
# 确保类有 _instance 属性
|
||||
if not hasattr(RedisManager, "_instance"):
|
||||
RedisManager._instance = None
|
||||
# 重置 Redis 连接
|
||||
RedisManager._redis = None
|
||||
|
||||
# 模拟全局配置
|
||||
with patch('core.managers.redis_manager.config') as mock_config:
|
||||
mock_config.redis.host = "localhost"
|
||||
mock_config.redis.port = 6379
|
||||
mock_config.redis.db = 0
|
||||
mock_config.redis.password = "test_password"
|
||||
|
||||
# 模拟 Redis 连接错误
|
||||
with patch('core.managers.redis_manager.redis') as mock_redis_module:
|
||||
mock_redis_module.Redis.side_effect = Exception("Connection refused")
|
||||
|
||||
manager = RedisManager()
|
||||
await manager.initialize()
|
||||
|
||||
# 验证 Redis 未初始化
|
||||
assert manager._redis is None
|
||||
|
||||
def test_redis_property_uninitialized(self):
|
||||
"""测试 Redis 属性在未初始化时抛出异常。"""
|
||||
# 重置单例
|
||||
if hasattr(RedisManager, "_instance"):
|
||||
del RedisManager._instance
|
||||
# 确保类有 _instance 属性
|
||||
if not hasattr(RedisManager, "_instance"):
|
||||
RedisManager._instance = None
|
||||
# 重置 Redis 连接
|
||||
RedisManager._redis = None
|
||||
|
||||
manager = RedisManager()
|
||||
manager._redis = None
|
||||
|
||||
with pytest.raises(ConnectionError, match="Redis 未初始化或连接失败,请先调用 initialize()"):
|
||||
_ = manager.redis
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_method(self):
|
||||
"""测试 get 方法。"""
|
||||
# 重置单例
|
||||
if hasattr(RedisManager, "_instance"):
|
||||
del RedisManager._instance
|
||||
# 确保类有 _instance 属性
|
||||
if not hasattr(RedisManager, "_instance"):
|
||||
RedisManager._instance = None
|
||||
# 重置 Redis 连接
|
||||
RedisManager._redis = None
|
||||
|
||||
manager = RedisManager()
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.get.return_value = "test_value"
|
||||
manager._redis = mock_redis
|
||||
|
||||
result = await manager.get("test_key")
|
||||
assert result == "test_value"
|
||||
mock_redis.get.assert_called_once_with("test_key")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_set_method(self):
|
||||
"""测试 set 方法。"""
|
||||
# 重置单例
|
||||
if hasattr(RedisManager, "_instance"):
|
||||
del RedisManager._instance
|
||||
# 确保类有 _instance 属性
|
||||
if not hasattr(RedisManager, "_instance"):
|
||||
RedisManager._instance = None
|
||||
# 重置 Redis 连接
|
||||
RedisManager._redis = None
|
||||
|
||||
manager = RedisManager()
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.set.return_value = True
|
||||
manager._redis = mock_redis
|
||||
|
||||
result = await manager.set("test_key", "test_value", ex=3600)
|
||||
assert result is True
|
||||
mock_redis.set.assert_called_once_with("test_key", "test_value", ex=3600)
|
||||
Reference in New Issue
Block a user