Files
NeoBot/tests/test_executor.py
K2cr2O1 caf5b06097 refactor(managers): 重构单例管理器实现并优化代码结构
feat(ws_pool): 新增 WebSocket 连接池实现

perf(json): 使用 orjson 替代标准 json 库提升性能

style: 清理未使用的导入和冗余代码

docs: 更新架构文档和开发规范

test: 添加 WebSocket 连接池测试用例

fix(plugins): 修复自动审批插件 API 调用参数格式
2026-01-22 16:23:03 +08:00

192 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
import docker
from core.utils.executor import CodeExecutor
# Mock 配置对象
@pytest.fixture
def mock_config():
config = MagicMock()
config.docker.base_url = None
config.docker.sandbox_image = "sandbox:latest"
config.docker.timeout = 5
config.docker.concurrency_limit = 2
config.docker.tls_verify = False
return config
@pytest.fixture
def mock_docker_client():
with patch("docker.from_env") as mock_from_env:
client = MagicMock()
mock_from_env.return_value = client
yield client
@pytest.fixture
def executor(mock_config, mock_docker_client):
return CodeExecutor(mock_config)
def test_init_success(mock_config, mock_docker_client):
"""测试初始化成功"""
executor = CodeExecutor(mock_config)
assert executor.docker_client is not None
mock_docker_client.ping.assert_called_once()
def test_init_docker_error(mock_config):
"""测试初始化 Docker 失败"""
with patch("docker.from_env", side_effect=docker.errors.DockerException("Docker error")):
executor = CodeExecutor(mock_config)
assert executor.docker_client is None
def test_init_remote_docker(mock_config):
"""测试初始化远程 Docker"""
mock_config.docker.base_url = "tcp://1.2.3.4:2375"
with patch("docker.DockerClient") as mock_client_cls:
executor = CodeExecutor(mock_config)
mock_client_cls.assert_called_once()
assert executor.docker_client is not None
@pytest.mark.asyncio
async def test_add_task_success(executor):
"""测试添加任务成功"""
callback = AsyncMock()
await executor.add_task("print('hello')", callback)
assert executor.task_queue.qsize() == 1
@pytest.mark.asyncio
async def test_add_task_no_docker(mock_config):
"""测试 Docker 未初始化时添加任务"""
with patch("docker.from_env", side_effect=docker.errors.DockerException):
executor = CodeExecutor(mock_config)
callback = AsyncMock()
with pytest.raises(RuntimeError, match="Docker环境未就绪"):
await executor.add_task("print('hello')", callback)
@pytest.mark.asyncio
async def test_worker_success(executor):
"""测试 Worker 成功处理任务"""
# Mock _run_in_container
executor._run_in_container = MagicMock(return_value=b"hello")
callback = AsyncMock()
await executor.add_task("print('hello')", callback)
# 启动 worker 并在处理完一个任务后取消
worker_task = asyncio.create_task(executor.worker())
# 等待队列为空
await executor.task_queue.join()
# 验证结果
callback.assert_called_with("hello")
# 取消 worker
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_worker_timeout(executor):
"""测试 Worker 处理任务超时"""
# Mock _run_in_container to sleep longer than timeout
async def slow_run(*args):
await asyncio.sleep(0.2)
return b""
# 我们不能直接 mock 同步方法让它异步 sleep
# 因为 run_in_executor 会在线程中运行它。
# 这里我们 mock asyncio.wait_for 抛出 TimeoutError 可能会更容易,
# 但为了测试完整流程,我们可以让 _run_in_container 阻塞。
# 实际上,我们可以 mock _run_in_container 抛出 asyncio.TimeoutError
# (虽然它是在线程中运行,但 wait_for 会抛出这个异常)
# 不wait_for 抛出 TimeoutError 是因为 future 没有在时间内完成。
# 让我们简单地 mock _run_in_container 并让 wait_for 超时
executor.timeout = 0.01
executor._run_in_container = MagicMock(side_effect=lambda x: time.sleep(0.05))
import time
callback = AsyncMock()
await executor.add_task("print('hello')", callback)
worker_task = asyncio.create_task(executor.worker())
await executor.task_queue.join()
callback.assert_called_with(f"执行超时 (超过 {executor.timeout} 秒)。")
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
@pytest.mark.asyncio
async def test_worker_docker_errors(executor):
"""测试 Worker 处理 Docker 错误"""
# ImageNotFound
executor._run_in_container = MagicMock(side_effect=docker.errors.ImageNotFound("Image not found"))
callback = AsyncMock()
await executor.add_task("code", callback)
worker_task = asyncio.create_task(executor.worker())
await executor.task_queue.join()
callback.assert_called_with(f"执行失败:沙箱基础镜像 '{executor.sandbox_image}' 不存在,请联系管理员构建。")
worker_task.cancel()
try:
await worker_task
except Exception:
pass
# ContainerError
executor._run_in_container = MagicMock(side_effect=docker.errors.ContainerError(
"container", 1, "cmd", "image", b"Error output"
))
callback = AsyncMock()
await executor.add_task("code", callback)
worker_task = asyncio.create_task(executor.worker())
await executor.task_queue.join()
callback.assert_called_with("代码执行出错:\nError output")
worker_task.cancel()
try:
await worker_task
except Exception:
pass
def test_run_in_container_success(executor):
"""测试 _run_in_container 成功"""
mock_container = MagicMock()
mock_container.wait.return_value = {"StatusCode": 0}
mock_container.logs.side_effect = [b"output", b""] # stdout, stderr
executor.docker_client.containers.create.return_value = mock_container
result = executor._run_in_container("print('hello')")
assert result == b"output"
mock_container.start.assert_called_once()
mock_container.remove.assert_called_with(force=True)
def test_run_in_container_failure(executor):
"""测试 _run_in_container 失败(非零退出码)"""
mock_container = MagicMock()
mock_container.wait.return_value = {"StatusCode": 1}
mock_container.logs.side_effect = [b"", b"Error"] # stdout, stderr
executor.docker_client.containers.create.return_value = mock_container
with pytest.raises(docker.errors.ContainerError):
executor._run_in_container("bad code")
mock_container.remove.assert_called_with(force=True)
def test_run_in_container_no_client(executor):
"""测试 _run_in_container 无客户端"""
executor.docker_client = None
with pytest.raises(docker.errors.DockerException):
executor._run_in_container("code")