* feat: 整合开发历史

* codepy安全性升级

* 优化一些东西

* 再次优化

* 更新一下 requirements.txt

* CQ码支持以及视频解析

* hotfix

* 更新DEV readme.md

* feat: 添加Docker沙箱代码执行功能

- 新增Docker沙箱执行环境,提供安全隔离的代码执行能力
- 重构code_py插件,使用Docker容器替代子进程执行
- 添加docker配置项和权限检查功能
- 实现代码执行队列和并发控制
- 新增广播插件,仅限管理员使用
This commit is contained in:
镀铬酸钾
2026-01-06 22:59:50 +08:00
committed by GitHub
parent 5b3cd5bbd0
commit aaf4a896dd
13 changed files with 620 additions and 1160 deletions

1
.gitignore vendored
View File

@@ -139,3 +139,4 @@ dmypy.json
.pytype/
# End of https://www.toptal.com/developers/gitignore/api/python
/ca

1121
README.md

File diff suppressed because it is too large Load Diff

View File

@@ -12,3 +12,15 @@ host = "114.66.58.203"
port = 1931
db = 0
password = "redis_5dxyJG"
[docker]
base_url = "tcp://dockertest.k2cro4.my:2375"
sandbox_image = "python-sandbox:latest"
timeout = 10
concurrency_limit = 5
tls_verify = true
ca_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/ca.crt"
client_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-cert.pem"
client_key_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-key.pem"

View File

@@ -69,7 +69,7 @@ class CommandManager:
def command(
self,
name: str,
*names: str,
permission: Optional[Any] = None,
override_permission_check: bool = False
) -> Callable:
@@ -77,7 +77,7 @@ class CommandManager:
装饰器:注册一个消息指令处理器。
"""
return self.message_handler.command(
name,
*names,
permission=permission,
override_permission_check=override_permission_check
)

View File

@@ -73,6 +73,15 @@ class Config:
"""
return self._data.get("redis", {})
@property
def docker(self) -> dict:
"""
获取 Docker 配置
:return: 配置字典
"""
return self._data.get("docker", {})
# 实例化全局配置对象
global_config = Config()

View File

@@ -83,7 +83,7 @@ class MessageHandler(BaseHandler):
def command(
self,
name: str,
*names: str,
permission: Optional[Permission] = None,
override_permission_check: bool = False
) -> Callable:
@@ -93,11 +93,12 @@ class MessageHandler(BaseHandler):
def decorator(func: Callable) -> Callable:
if not inspect.iscoroutinefunction(func):
raise SyncHandlerError(f"命令处理器 {func.__name__} 必须是异步函数 (async def).")
self.commands[name] = {
"func": func,
"permission": permission,
"override_permission_check": override_permission_check,
}
for name in names:
self.commands[name] = {
"func": func,
"permission": permission,
"override_permission_check": override_permission_check,
}
return func
return decorator
@@ -137,7 +138,8 @@ class MessageHandler(BaseHandler):
permission_granted = await permission_manager.check_permission(event.user_id, permission)
if not permission_granted and not override_check:
await bot.send(event, f"权限不足,需要 {permission.name} 权限")
permission_name = permission.name if isinstance(permission, Permission) else permission
await bot.send(event, f"权限不足,需要 {permission_name} 权限")
return
await self._run_handler(

View File

@@ -1,27 +1,184 @@
"""
线程池执行器
提供一个全局的线程池和异步接口,用于在事件循环中安全地运行同步函数。
"""
# -*- coding: utf-8 -*-
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable
import docker
from docker.tls import TLSConfig
from typing import Dict, Any, Callable
# 创建一个全局的线程池,可以根据需要调整 max_workers
executor = ThreadPoolExecutor(max_workers=10)
from core.logger import logger
async def run_in_thread_pool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
class CodeExecutor:
"""
在线程池中异步运行同步函数
代码执行引擎,负责管理一个异步任务队列和并发的 Docker 容器执行。
"""
def __init__(self, bot_instance, config: Dict[str, Any]):
"""
初始化代码执行引擎。
:param bot_instance: Bot 实例,用于后续的消息回复。
:param config: 从 config.toml 加载的配置字典。
"""
self.bot = bot_instance
self.task_queue = asyncio.Queue()
# 从传入的配置中读取 Docker 相关设置
docker_config = config.docker
self.docker_base_url = docker_config.get("base_url")
self.sandbox_image = docker_config.get("sandbox_image", "python-sandbox:latest")
self.timeout = docker_config.get("timeout", 10)
concurrency = docker_config.get("concurrency_limit", 5)
self.concurrency_limit = asyncio.Semaphore(concurrency)
self.docker_client = None
:param func: 要运行的同步函数
:param args: 函数的位置参数
:param kwargs: 函数的关键字参数
:return: 函数的返回值
logger.info("[CodeExecutor] 初始化 Docker 客户端...")
try:
if self.docker_base_url:
# 如果配置了远程 Docker 地址,则使用 TLS 选项进行连接
tls_config = None
if docker_config.get("tls_verify", False):
tls_config = TLSConfig(
ca_cert=docker_config.get("ca_cert_path"),
client_cert=(docker_config.get("client_cert_path"), docker_config.get("client_key_path")),
verify=True
)
self.docker_client = docker.DockerClient(base_url=self.docker_base_url, tls=tls_config)
else:
# 否则,使用默认的本地环境连接
self.docker_client = docker.from_env()
# 检查 Docker 服务是否可用
self.docker_client.ping()
logger.success("[CodeExecutor] Docker 客户端初始化成功,服务连接正常。")
except docker.errors.DockerException as e:
self.docker_client = None
logger.error(f"无法连接到 Docker 服务,请检查 Docker 是否正在运行: {e}")
except Exception as e:
self.docker_client = None
logger.error(f"初始化 Docker 客户端时发生未知错误: {e}")
async def add_task(self, code: str, callback: Callable[[str], asyncio.Future]):
"""
将代码执行任务添加到队列中。
:param code: 待执行的 Python 代码字符串。
:param callback: 执行完毕后用于回复结果的回调函数。
"""
task = {"code": code, "callback": callback}
await self.task_queue.put(task)
logger.info(f"[CodeExecutor] 新的代码执行任务已入队 (队列当前长度: {self.task_queue.qsize()})。")
async def worker(self):
"""
后台工作者,不断从队列中取出任务并执行。
"""
if not self.docker_client:
logger.error("[CodeExecutor] Worker 无法启动,因为 Docker 客户端未初始化。")
return
logger.info("[CodeExecutor] 代码执行 Worker 已启动,等待任务...")
while True:
task = await self.task_queue.get()
logger.info("[CodeExecutor] 开始处理代码执行任务。")
async with self.concurrency_limit:
result_message = ""
try:
loop = asyncio.get_running_loop()
# 使用 asyncio.wait_for 实现超时控制
result_bytes = await asyncio.wait_for(
loop.run_in_executor(
None, # 使用默认线程池
self._run_in_container,
task['code']
),
timeout=self.timeout
)
output = result_bytes.decode('utf-8').strip()
result_message = output if output else "代码执行完毕,无输出。"
logger.success("[CodeExecutor] 任务成功执行。")
except docker.errors.ImageNotFound:
logger.error(f"[CodeExecutor] 镜像 '{self.sandbox_image}' 不存在!")
result_message = f"执行失败:沙箱基础镜像 '{self.sandbox_image}' 不存在,请联系管理员构建。"
except docker.errors.ContainerError as e:
error_output = e.stderr.decode('utf-8').strip()
result_message = f"代码执行出错:\n{error_output}"
logger.warning(f"[CodeExecutor] 代码执行时发生错误: {error_output}")
except docker.errors.APIError as e:
logger.error(f"[CodeExecutor] Docker API 错误: {e}")
result_message = "执行失败:与 Docker 服务通信时发生错误,请检查服务状态。"
except asyncio.TimeoutError:
result_message = f"执行超时 (超过 {self.timeout} 秒)。"
logger.warning("[CodeExecutor] 任务执行超时。")
except Exception as e:
logger.exception(f"[CodeExecutor] 执行 Docker 任务时发生未知严重错误: {e}")
result_message = "执行引擎发生内部错误,请联系管理员。"
# 调用回调函数回复结果
await task['callback'](result_message)
self.task_queue.task_done()
def _run_in_container(self, code: str) -> bytes:
"""
同步函数:在 Docker 容器中运行代码。
此函数通过手动管理容器生命周期来提高稳定性。
"""
container = None
try:
# 1. 创建容器
container = self.docker_client.containers.create(
image=self.sandbox_image,
command=["python", "-c", code],
mem_limit='128m',
cpu_shares=512,
network_disabled=True,
log_config={'type': 'json-file', 'config': {'max-size': '1m'}},
)
# 2. 启动容器
container.start()
# 3. 等待容器执行完成
# 主超时由 asyncio.wait_for 控制,这里的 timeout 是一个额外的保险
result = container.wait(timeout=self.timeout + 5)
# 4. 获取日志
stdout = container.logs(stdout=True, stderr=False)
stderr = container.logs(stdout=False, stderr=True)
# 5. 检查退出码,如果不为 0则手动抛出 ContainerError
if result.get('StatusCode', 0) != 0:
raise docker.errors.ContainerError(
container, result['StatusCode'], f"python -c '{code}'", self.sandbox_image, stderr
)
return stdout
finally:
# 6. 确保容器总是被移除
if container:
try:
container.remove(force=True)
except docker.errors.NotFound:
# 如果容器因为某些原因已经消失,也沒关系
pass
except Exception as e:
logger.error(f"[CodeExecutor] 强制移除容器 {container.id} 时失败: {e}")
def initialize_executor(bot_instance, config: Dict[str, Any]):
"""
初始化并返回一个 CodeExecutor 实例。
"""
return CodeExecutor(bot_instance, config)
async def run_in_thread_pool(sync_func, *args, **kwargs):
"""
在线程池中运行同步阻塞函数,以避免阻塞 asyncio 事件循环。
:param sync_func: 同步函数
:param args: 位置参数
:param kwargs: 关键字参数
:return: 同步函数的返回值
"""
loop = asyncio.get_running_loop()
# 使用 functools.partial 绑定函数和参数,以便传递给 run_in_executor
func_to_run = partial(func, *args, **kwargs)
# loop.run_in_executor 会返回一个 awaitable 对象
return await loop.run_in_executor(executor, func_to_run)
return await loop.run_in_executor(None, lambda: sync_func(*args, **kwargs))

View File

@@ -227,6 +227,14 @@ class PermissionManager:
Returns:
bool: 如果用户权限 >= 所需权限,返回 True否则返回 False
"""
# 如果传入的是字符串,先转换为 Permission 对象
if isinstance(required_permission, str):
required_permission = _PERMISSIONS.get(required_permission.lower())
if not required_permission:
# 如果是无效的权限字符串,默认拒绝
logger.warning(f"检测到无效的权限检查字符串: {required_permission}")
return False
user_permission = await self.get_user_permission(user_id)
return user_permission >= required_permission
@@ -249,4 +257,21 @@ class PermissionManager:
# 全局权限管理器实例
permission_manager = PermissionManager()
permission_manager = PermissionManager()
def require_admin(func):
"""
一个装饰器,用于限制命令只能由管理员执行。
"""
from functools import wraps
from models.events.message import MessageEvent
@wraps(func)
async def wrapper(event: MessageEvent, *args, **kwargs):
user_id = event.user_id
if await permission_manager.check_permission(user_id, ADMIN):
return await func(event, *args, **kwargs)
else:
await event.reply("抱歉,您没有权限执行此命令。")
return None
return wrapper

15
main.py
View File

@@ -108,6 +108,21 @@ async def main():
try:
bot = WS()
# 初始化代码执行器
from core.config_loader import global_config as config
from core.executor import initialize_executor
code_executor = initialize_executor(bot, config)
bot.bot.code_executor = code_executor # 将执行器实例附加到 bot.bot 对象上
# 启动代码执行器的后台 worker
logger.debug("[Main] 检查是否需要启动代码执行 Worker...")
if code_executor and code_executor.docker_client:
logger.info("[Main] Docker 连接成功,正在启动代码执行 Worker...")
asyncio.create_task(code_executor.worker())
else:
logger.warning("[Main] 未启动代码执行 Worker因为 Docker 客户端未初始化或连接失败。")
await bot.connect()
finally:
if observer.is_alive():

75
plugins/broadcast.py Normal file
View File

@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
"""
管理员专用的广播插件
功能:
- 仅限管理员在私聊中调用。
- 通过回复一条消息并发送指令,将该消息转发给机器人所在的所有群聊。
- 此插件不写入 __plugin_meta__保持隐藏。
"""
from core.command_manager import matcher
from models import MessageEvent
from core.permission_manager import ADMIN
from core.logger import logger
@matcher.command("broadcast", "广播", permission=ADMIN)
async def broadcast_message(event: MessageEvent):
"""
广播指令处理器。
:param event: 消息事件对象。
"""
# 1. 检查是否为私聊消息
if event.group_id:
# 在群聊中调用时,静默处理,不予响应
return
# 2. 检查是否回复了某条消息
reply = event.reply
if not reply:
await event.reply("请通过“回复”一条您想广播的消息来使用此功能。")
return
# 3. 获取机器人所在的群聊列表
bot = event.bot
try:
group_list = await bot.get_group_list()
if not group_list:
await event.reply("机器人目前没有加入任何群聊。")
return
except Exception as e:
logger.error(f"[Broadcast] 获取群聊列表失败: {e}")
await event.reply(f"获取群聊列表时发生错误,无法广播。错误信息: {e}")
return
# 4. 遍历所有群聊并转发消息
success_count = 0
failed_count = 0
total_groups = len(group_list)
await event.reply(f"准备向 {total_groups} 个群聊广播消息,请稍候...")
for group in group_list:
group_id = group.get("group_id")
if not group_id:
continue
try:
# 直接转发被回复的消息
await bot.forward_message(
group_id=group_id,
message_id=reply.message_id
)
success_count += 1
logger.info(f"[Broadcast] 已成功将消息转发至群聊: {group_id}")
except Exception as e:
failed_count += 1
logger.error(f"[Broadcast] 转发消息至群聊 {group_id} 失败: {e}")
# 5. 向管理员报告结果
report_message = (
f"广播任务完成。\n"
f"总群聊数: {total_groups}\n"
f"成功: {success_count}\n"
f"失败: {failed_count}"
)
await event.reply(report_message)

View File

@@ -1,176 +1,156 @@
"""
code_py插件
输入/code py回车再加上python代码机器人就会执行代码并返回执行结果。
"""
# -*- coding: utf-8 -*-
import html
import textwrap
import asyncio
import re
import sys
import tempfile
import os
from typing import Tuple, Set
from typing import Dict
from core.bot import Bot
from core.command_manager import matcher
from core.executor import run_in_thread_pool
from models import MessageEvent
from core.permission_manager import ADMIN
from core.logger import logger
__plugin_meta__ = {
"name": "code_py",
"description": "提供执行python代码的功能",
"usage": "/code_py - 进入交互模式,等待输入代码\n/code_py [单行代码] - 快速执行单行代码",
"name": "Python 代码执行",
"description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和转发回复。",
"usage": "/py <单行代码>\n/code_py <单行代码>\n/py (进入多行输入模式)",
}
# --- 安全配置:危险模块和内置函数黑名单 ---
DANGEROUS_MODULES = [
"os", "sys", "subprocess", "shutil", "socket", "requests", "urllib",
"http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing",
"asyncio",
]
DANGEROUS_BUILTINS = [
"__import__", "open", "exec", "eval", "compile", "input", "breakpoint"
]
# --- 会话状态管理 ---
# 结构: {(user_id, group_id): asyncio.TimerHandle}
multi_line_sessions: Dict[tuple, asyncio.TimerHandle] = {}
# 编译后的正则表达式,用于分割语句
STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]')
# 编译后的正则表达式,用于查找危险的内置函数调用
BUILTIN_CALL_PATTERN = re.compile(r'\b(' + '|'.join(DANGEROUS_BUILTINS) + r')\s*\(')
def is_code_safe(code: str) -> Tuple[bool, str]:
async def reply_as_forward(event: MessageEvent, input_code: str, output_result: str):
"""
检查代码中是否包含危险的模块导入或内置函数调用
将输入和输出打包成转发消息进行回复
参考 forward_test.py 的实现,兼容私聊和群聊。
"""
# 1. 检查危险的内置函数
found_builtins = BUILTIN_CALL_PATTERN.search(code)
if found_builtins:
return False, f"检测到不允许的内置函数调用:'{found_builtins.group(1)}'"
# 2. 检查危险的模块导入
statements = STATEMENT_SPLIT_PATTERN.split(code)
for statement in statements:
statement = statement.strip()
if not statement:
continue
parts = statement.split()
if not parts:
continue
if parts[0] == 'from' and len(parts) > 1:
module_name = parts[1].strip()
if module_name in DANGEROUS_MODULES:
return False, f"检测到不允许的模块导入:'{module_name}'"
elif parts[0] == 'import' and len(parts) > 1:
modules_str = ' '.join(parts[1:])
imported_modules = [m.strip() for m in modules_str.split(',')]
for module_name in imported_modules:
actual_module_name = module_name.split()[0]
if actual_module_name in DANGEROUS_MODULES:
return False, f"检测到不允许的模块导入:'{actual_module_name}'"
return True, ""
async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]:
"""
在子进程中安全地执行Python代码。
"""
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf:
tf.write(code_str)
tf_path = tf.name
try:
proc = await asyncio.create_subprocess_exec(
sys.executable, tf_path,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return "", f"执行超时(>{timeout}s"
return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore")
finally:
try:
os.remove(tf_path)
except Exception:
pass
async def process_and_reply(bot: Bot, event: MessageEvent, code: str):
"""
核心处理逻辑:安全检查、执行代码并回复结果。
"""
safe, message = await run_in_thread_pool(is_code_safe, code)
if not safe:
await event.reply(f"代码安全检查未通过:\n{message}")
return
try:
stdout, stderr = await run_code_in_subprocess(code, timeout=10.0)
except Exception as e:
await event.reply(f"执行失败:{e}")
return
resp = stderr.strip() or stdout.strip() or "(无输出)"
MAX = 1500
if len(resp) > MAX:
resp = resp[:MAX] + "\n...输出被截断..."
bot = event.bot
# 1. 构建消息节点列表
nodes = [
bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code),
bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp),
bot.build_forward_node(
user_id=event.user_id,
nickname=event.sender.nickname or str(event.user_id),
message=f"--- Your Code ---\n{input_code}"
),
bot.build_forward_node(
user_id=event.self_id,
nickname="Code Executor",
message=f"--- Execution Result ---\n{output_result}"
)
]
try:
# 2. 发送合并转发消息
await bot.send_forwarded_messages(event, nodes)
except Exception as e:
await event.reply(f"结果发送失败: {e}\n\n{resp}")
logger.error(f"[code_py] 发送转发消息失败: {e}")
# 降级为普通消息回复
await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}")
# --- 交互式会话状态 ---
# 使用集合存储正在等待代码输入的用户标识
waiting_users: Set[str] = set()
def get_session_id(event: MessageEvent) -> str:
"""根据事件类型生成唯一的会话ID"""
if hasattr(event, 'group_id'):
# 群聊会话ID
return f"group_{event.group_id}-{event.user_id}"
else:
# 私聊会话ID
return f"private_{event.user_id}"
@matcher.command("code_py")
async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]):
# 模式一:快速执行单行代码
if args:
code = " ".join(args)
await process_and_reply(bot, event, code)
async def execute_code(event: MessageEvent, code: str):
"""
核心代码执行逻辑。
"""
code_executor = getattr(event.bot, 'code_executor', None)
if not code_executor or not code_executor.docker_client:
await event.reply("代码执行服务当前不可用,请检查 Docker 连接配置。")
return
# 模式二:进入交互模式
session_id = get_session_id(event)
if session_id in waiting_users:
await event.reply("您已经有一个正在等待输入的code会话了请直接发送代码。")
return
# 修改 add_task让它能直接接收回复函数
await code_executor.add_task(
code,
lambda result: reply_as_forward(event, code, result)
)
await event.reply("代码已提交至沙箱执行队列,请稍候...")
def cleanup_session(session_key: tuple):
"""
清理超时的会话。
"""
if session_key in multi_line_sessions:
del multi_line_sessions[session_key]
logger.info(f"[code_py] 会话 {session_key} 已超时,自动取消。")
def normalize_code(code: str) -> str:
"""
规范化用户输入的 Python 代码字符串。
主要处理两个问题:
1. 对消息中可能存在的 HTML 实体进行解码 (e.g., &#91; -> [)。
2. 移除整个代码块的公共前导缩进,以修复因复制粘贴导致的多余缩进。
:param code: 原始代码字符串。
:return: 规范化后的代码字符串。
"""
# 1. 解码 HTML 实体
code = html.unescape(code)
# 2. 移除公共前导缩进
try:
code = textwrap.dedent(code)
except Exception:
# 在某些情况下例如不一致的缩进dedent 可能会失败,
# 但我们不希望因此中断流程,所以捕获异常并继续。
pass
waiting_users.add(session_id)
await event.reply("请在下一条消息中发送要执行的Python代码块。发送“取消”可退出")
return code.strip()
@matcher.command("py", "python", "code_py", permission=ADMIN)
async def code_py_main(event: MessageEvent, args: list[str]):
"""
/py 命令的主入口。
- 如果有参数,直接执行。
- 如果没有参数,开启多行输入模式。
"""
code_to_run = " ".join(args)
if code_to_run:
# 单行模式,对代码进行规范化处理
normalized_code = normalize_code(code_to_run)
if not normalized_code:
await event.reply("代码为空或格式错误,请输入有效的代码。")
return
await execute_code(event, normalized_code)
else:
# 多行模式
# 使用 getattr 兼容私聊和群聊
session_key = (event.user_id, getattr(event, 'group_id', 'private'))
# 如果上一个会话的超时任务还在,先取消它
if session_key in multi_line_sessions:
multi_line_sessions[session_key].cancel()
await event.reply("已进入多行代码输入模式,请直接发送你的代码。\n(60秒内无操作将自动取消)")
# 设置 60 秒超时
loop = asyncio.get_running_loop()
timeout_handler = loop.call_later(
60,
cleanup_session,
session_key
)
multi_line_sessions[session_key] = timeout_handler
@matcher.on_message()
async def handle_code_input(bot: Bot, event: MessageEvent):
session_id = get_session_id(event)
# 检查用户是否处于等待状态
if session_id in waiting_users:
# 从等待集合中移除,无论输入是什么
waiting_users.remove(session_id)
async def handle_multi_line_code(event: MessageEvent):
"""
通用消息处理器,用于捕获多行模式下的代码输入。
"""
# 使用 getattr 兼容私聊和群聊
session_key = (event.user_id, getattr(event, 'group_id', 'private'))
if session_key in multi_line_sessions:
# 取消超时任务
multi_line_sessions[session_key].cancel()
del multi_line_sessions[session_key]
# 对多行代码进行规范化处理
normalized_code = normalize_code(event.raw_message)
# 处理取消操作
if event.raw_message.strip() == "取消":
await event.reply("已取消输入。")
return True # 消费事件
# 执行代码
await process_and_reply(bot, event, event.raw_message)
return True # 消费事件,防止被其他指令匹配
# 如果用户不在等待状态,则不处理
return False
if not normalized_code:
await event.reply("捕获到的代码为空或格式错误,已取消输入。")
return
await execute_code(event, normalized_code)
return True # 消费事件,防止其他处理器响应

Binary file not shown.

19
sandbox.Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# 使用一个轻量级的 Python 官方镜像作为基础
FROM python:3.11-slim
# 创建一个低权限的用户来运行代码,增加安全性
# -S: 创建一个系统用户 (没有 home 目录)
# -u: 指定用户ID
# -g: 指定组ID
RUN groupadd -g 1001 sandbox && useradd -u 1001 -g sandbox -s /bin/sh -r sandbox
# 创建一个工作目录,用于存放和执行用户的代码
WORKDIR /sandbox
# 将目录所有权交给沙箱用户
RUN chown sandbox:sandbox /sandbox
# 切换到沙箱用户
USER sandbox
# 默认的启动命令是 python这样容器启动时可以直接执行 .py 文件
CMD ["python"]