From 54f74d0e7345ac4969e58a71c04d793e2ffb15ab Mon Sep 17 00:00:00 2001 From: K2cr2O1 <2221577113@qq.com> Date: Tue, 6 Jan 2026 22:56:00 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0Docker=E6=B2=99?= =?UTF-8?q?=E7=AE=B1=E4=BB=A3=E7=A0=81=E6=89=A7=E8=A1=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增Docker沙箱执行环境,提供安全隔离的代码执行能力 - 重构code_py插件,使用Docker容器替代子进程执行 - 添加docker配置项和权限检查功能 - 实现代码执行队列和并发控制 - 新增广播插件,仅限管理员使用 --- .gitignore | 1 + config.toml | 12 ++ core/command_manager.py | 4 +- core/config_loader.py | 9 ++ core/event_handler.py | 16 ++- core/executor.py | 197 ++++++++++++++++++++++--- core/permission_manager.py | 27 +++- main.py | 15 ++ plugins/broadcast.py | 75 ++++++++++ plugins/code_py.py | 284 +++++++++++++++++-------------------- requirements.txt | Bin 750 -> 360 bytes sandbox.Dockerfile | 19 +++ 12 files changed, 477 insertions(+), 182 deletions(-) create mode 100644 plugins/broadcast.py create mode 100644 sandbox.Dockerfile diff --git a/.gitignore b/.gitignore index 2729cb2..bb22f85 100644 --- a/.gitignore +++ b/.gitignore @@ -139,3 +139,4 @@ dmypy.json .pytype/ # End of https://www.toptal.com/developers/gitignore/api/python +/ca \ No newline at end of file diff --git a/config.toml b/config.toml index 4846aeb..5955e20 100644 --- a/config.toml +++ b/config.toml @@ -12,3 +12,15 @@ host = "114.66.58.203" port = 1931 db = 0 password = "redis_5dxyJG" + +[docker] +base_url = "tcp://dockertest.k2cro4.my:2375" +sandbox_image = "python-sandbox:latest" +timeout = 10 +concurrency_limit = 5 +tls_verify = true +ca_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/ca.crt" +client_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-cert.pem" +client_key_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-key.pem" + + diff --git a/core/command_manager.py b/core/command_manager.py index c51794a..058cc89 100644 --- a/core/command_manager.py +++ b/core/command_manager.py @@ -69,7 +69,7 @@ class CommandManager: def command( self, - name: str, + *names: str, permission: Optional[Any] = None, override_permission_check: bool = False ) -> Callable: @@ -77,7 +77,7 @@ class CommandManager: 装饰器:注册一个消息指令处理器。 """ return self.message_handler.command( - name, + *names, permission=permission, override_permission_check=override_permission_check ) diff --git a/core/config_loader.py b/core/config_loader.py index 776ddcb..b00ede3 100644 --- a/core/config_loader.py +++ b/core/config_loader.py @@ -73,6 +73,15 @@ class Config: """ return self._data.get("redis", {}) + @property + def docker(self) -> dict: + """ + 获取 Docker 配置 + + :return: 配置字典 + """ + return self._data.get("docker", {}) + # 实例化全局配置对象 global_config = Config() diff --git a/core/event_handler.py b/core/event_handler.py index b355718..8c384f9 100644 --- a/core/event_handler.py +++ b/core/event_handler.py @@ -83,7 +83,7 @@ class MessageHandler(BaseHandler): def command( self, - name: str, + *names: str, permission: Optional[Permission] = None, override_permission_check: bool = False ) -> Callable: @@ -93,11 +93,12 @@ class MessageHandler(BaseHandler): def decorator(func: Callable) -> Callable: if not inspect.iscoroutinefunction(func): raise SyncHandlerError(f"命令处理器 {func.__name__} 必须是异步函数 (async def).") - self.commands[name] = { - "func": func, - "permission": permission, - "override_permission_check": override_permission_check, - } + for name in names: + self.commands[name] = { + "func": func, + "permission": permission, + "override_permission_check": override_permission_check, + } return func return decorator @@ -137,7 +138,8 @@ class MessageHandler(BaseHandler): permission_granted = await permission_manager.check_permission(event.user_id, permission) if not permission_granted and not override_check: - await bot.send(event, f"权限不足,需要 {permission.name} 权限") + permission_name = permission.name if isinstance(permission, Permission) else permission + await bot.send(event, f"权限不足,需要 {permission_name} 权限") return await self._run_handler( diff --git a/core/executor.py b/core/executor.py index 6d691bd..73599d9 100644 --- a/core/executor.py +++ b/core/executor.py @@ -1,27 +1,184 @@ -""" -线程池执行器 - -提供一个全局的线程池和异步接口,用于在事件循环中安全地运行同步函数。 -""" +# -*- coding: utf-8 -*- import asyncio -from concurrent.futures import ThreadPoolExecutor -from functools import partial -from typing import Any, Callable +import docker +from docker.tls import TLSConfig +from typing import Dict, Any, Callable -# 创建一个全局的线程池,可以根据需要调整 max_workers -executor = ThreadPoolExecutor(max_workers=10) +from core.logger import logger -async def run_in_thread_pool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: +class CodeExecutor: """ - 在线程池中异步运行同步函数 + 代码执行引擎,负责管理一个异步任务队列和并发的 Docker 容器执行。 + """ + def __init__(self, bot_instance, config: Dict[str, Any]): + """ + 初始化代码执行引擎。 + :param bot_instance: Bot 实例,用于后续的消息回复。 + :param config: 从 config.toml 加载的配置字典。 + """ + self.bot = bot_instance + self.task_queue = asyncio.Queue() + + # 从传入的配置中读取 Docker 相关设置 + docker_config = config.docker + self.docker_base_url = docker_config.get("base_url") + self.sandbox_image = docker_config.get("sandbox_image", "python-sandbox:latest") + self.timeout = docker_config.get("timeout", 10) + concurrency = docker_config.get("concurrency_limit", 5) + + self.concurrency_limit = asyncio.Semaphore(concurrency) + self.docker_client = None - :param func: 要运行的同步函数 - :param args: 函数的位置参数 - :param kwargs: 函数的关键字参数 - :return: 函数的返回值 + logger.info("[CodeExecutor] 初始化 Docker 客户端...") + try: + if self.docker_base_url: + # 如果配置了远程 Docker 地址,则使用 TLS 选项进行连接 + tls_config = None + if docker_config.get("tls_verify", False): + tls_config = TLSConfig( + ca_cert=docker_config.get("ca_cert_path"), + client_cert=(docker_config.get("client_cert_path"), docker_config.get("client_key_path")), + verify=True + ) + self.docker_client = docker.DockerClient(base_url=self.docker_base_url, tls=tls_config) + else: + # 否则,使用默认的本地环境连接 + self.docker_client = docker.from_env() + + # 检查 Docker 服务是否可用 + self.docker_client.ping() + logger.success("[CodeExecutor] Docker 客户端初始化成功,服务连接正常。") + except docker.errors.DockerException as e: + self.docker_client = None + logger.error(f"无法连接到 Docker 服务,请检查 Docker 是否正在运行: {e}") + except Exception as e: + self.docker_client = None + logger.error(f"初始化 Docker 客户端时发生未知错误: {e}") + + async def add_task(self, code: str, callback: Callable[[str], asyncio.Future]): + """ + 将代码执行任务添加到队列中。 + :param code: 待执行的 Python 代码字符串。 + :param callback: 执行完毕后用于回复结果的回调函数。 + """ + task = {"code": code, "callback": callback} + await self.task_queue.put(task) + logger.info(f"[CodeExecutor] 新的代码执行任务已入队 (队列当前长度: {self.task_queue.qsize()})。") + + async def worker(self): + """ + 后台工作者,不断从队列中取出任务并执行。 + """ + if not self.docker_client: + logger.error("[CodeExecutor] Worker 无法启动,因为 Docker 客户端未初始化。") + return + + logger.info("[CodeExecutor] 代码执行 Worker 已启动,等待任务...") + while True: + task = await self.task_queue.get() + + logger.info("[CodeExecutor] 开始处理代码执行任务。") + + async with self.concurrency_limit: + result_message = "" + try: + loop = asyncio.get_running_loop() + + # 使用 asyncio.wait_for 实现超时控制 + result_bytes = await asyncio.wait_for( + loop.run_in_executor( + None, # 使用默认线程池 + self._run_in_container, + task['code'] + ), + timeout=self.timeout + ) + + output = result_bytes.decode('utf-8').strip() + result_message = output if output else "代码执行完毕,无输出。" + logger.success("[CodeExecutor] 任务成功执行。") + + except docker.errors.ImageNotFound: + logger.error(f"[CodeExecutor] 镜像 '{self.sandbox_image}' 不存在!") + result_message = f"执行失败:沙箱基础镜像 '{self.sandbox_image}' 不存在,请联系管理员构建。" + except docker.errors.ContainerError as e: + error_output = e.stderr.decode('utf-8').strip() + result_message = f"代码执行出错:\n{error_output}" + logger.warning(f"[CodeExecutor] 代码执行时发生错误: {error_output}") + except docker.errors.APIError as e: + logger.error(f"[CodeExecutor] Docker API 错误: {e}") + result_message = "执行失败:与 Docker 服务通信时发生错误,请检查服务状态。" + except asyncio.TimeoutError: + result_message = f"执行超时 (超过 {self.timeout} 秒)。" + logger.warning("[CodeExecutor] 任务执行超时。") + except Exception as e: + logger.exception(f"[CodeExecutor] 执行 Docker 任务时发生未知严重错误: {e}") + result_message = "执行引擎发生内部错误,请联系管理员。" + + # 调用回调函数回复结果 + await task['callback'](result_message) + + self.task_queue.task_done() + + def _run_in_container(self, code: str) -> bytes: + """ + 同步函数:在 Docker 容器中运行代码。 + 此函数通过手动管理容器生命周期来提高稳定性。 + """ + container = None + try: + # 1. 创建容器 + container = self.docker_client.containers.create( + image=self.sandbox_image, + command=["python", "-c", code], + mem_limit='128m', + cpu_shares=512, + network_disabled=True, + log_config={'type': 'json-file', 'config': {'max-size': '1m'}}, + ) + # 2. 启动容器 + container.start() + + # 3. 等待容器执行完成 + # 主超时由 asyncio.wait_for 控制,这里的 timeout 是一个额外的保险 + result = container.wait(timeout=self.timeout + 5) + + # 4. 获取日志 + stdout = container.logs(stdout=True, stderr=False) + stderr = container.logs(stdout=False, stderr=True) + + # 5. 检查退出码,如果不为 0,则手动抛出 ContainerError + if result.get('StatusCode', 0) != 0: + raise docker.errors.ContainerError( + container, result['StatusCode'], f"python -c '{code}'", self.sandbox_image, stderr + ) + + return stdout + + finally: + # 6. 确保容器总是被移除 + if container: + try: + container.remove(force=True) + except docker.errors.NotFound: + # 如果容器因为某些原因已经消失,也沒关系 + pass + except Exception as e: + logger.error(f"[CodeExecutor] 强制移除容器 {container.id} 时失败: {e}") + +def initialize_executor(bot_instance, config: Dict[str, Any]): + """ + 初始化并返回一个 CodeExecutor 实例。 + """ + return CodeExecutor(bot_instance, config) + +async def run_in_thread_pool(sync_func, *args, **kwargs): + """ + 在线程池中运行同步阻塞函数,以避免阻塞 asyncio 事件循环。 + :param sync_func: 同步函数 + :param args: 位置参数 + :param kwargs: 关键字参数 + :return: 同步函数的返回值 """ loop = asyncio.get_running_loop() - # 使用 functools.partial 绑定函数和参数,以便传递给 run_in_executor - func_to_run = partial(func, *args, **kwargs) - # loop.run_in_executor 会返回一个 awaitable 对象 - return await loop.run_in_executor(executor, func_to_run) + return await loop.run_in_executor(None, lambda: sync_func(*args, **kwargs)) diff --git a/core/permission_manager.py b/core/permission_manager.py index c79a18d..917e753 100644 --- a/core/permission_manager.py +++ b/core/permission_manager.py @@ -227,6 +227,14 @@ class PermissionManager: Returns: bool: 如果用户权限 >= 所需权限,返回 True,否则返回 False """ + # 如果传入的是字符串,先转换为 Permission 对象 + if isinstance(required_permission, str): + required_permission = _PERMISSIONS.get(required_permission.lower()) + if not required_permission: + # 如果是无效的权限字符串,默认拒绝 + logger.warning(f"检测到无效的权限检查字符串: {required_permission}") + return False + user_permission = await self.get_user_permission(user_id) return user_permission >= required_permission @@ -249,4 +257,21 @@ class PermissionManager: # 全局权限管理器实例 -permission_manager = PermissionManager() \ No newline at end of file +permission_manager = PermissionManager() + +def require_admin(func): + """ + 一个装饰器,用于限制命令只能由管理员执行。 + """ + from functools import wraps + from models.events.message import MessageEvent + + @wraps(func) + async def wrapper(event: MessageEvent, *args, **kwargs): + user_id = event.user_id + if await permission_manager.check_permission(user_id, ADMIN): + return await func(event, *args, **kwargs) + else: + await event.reply("抱歉,您没有权限执行此命令。") + return None + return wrapper diff --git a/main.py b/main.py index 1445f1f..0589512 100644 --- a/main.py +++ b/main.py @@ -108,6 +108,21 @@ async def main(): try: bot = WS() + + # 初始化代码执行器 + from core.config_loader import global_config as config + from core.executor import initialize_executor + code_executor = initialize_executor(bot, config) + bot.bot.code_executor = code_executor # 将执行器实例附加到 bot.bot 对象上 + + # 启动代码执行器的后台 worker + logger.debug("[Main] 检查是否需要启动代码执行 Worker...") + if code_executor and code_executor.docker_client: + logger.info("[Main] Docker 连接成功,正在启动代码执行 Worker...") + asyncio.create_task(code_executor.worker()) + else: + logger.warning("[Main] 未启动代码执行 Worker,因为 Docker 客户端未初始化或连接失败。") + await bot.connect() finally: if observer.is_alive(): diff --git a/plugins/broadcast.py b/plugins/broadcast.py new file mode 100644 index 0000000..37cfd32 --- /dev/null +++ b/plugins/broadcast.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +""" +管理员专用的广播插件 +功能: +- 仅限管理员在私聊中调用。 +- 通过回复一条消息并发送指令,将该消息转发给机器人所在的所有群聊。 +- 此插件不写入 __plugin_meta__,保持隐藏。 +""" +from core.command_manager import matcher +from models import MessageEvent +from core.permission_manager import ADMIN +from core.logger import logger + +@matcher.command("broadcast", "广播", permission=ADMIN) +async def broadcast_message(event: MessageEvent): + """ + 广播指令处理器。 + + :param event: 消息事件对象。 + """ + # 1. 检查是否为私聊消息 + if event.group_id: + # 在群聊中调用时,静默处理,不予响应 + return + + # 2. 检查是否回复了某条消息 + reply = event.reply + if not reply: + await event.reply("请通过“回复”一条您想广播的消息来使用此功能。") + return + + # 3. 获取机器人所在的群聊列表 + bot = event.bot + try: + group_list = await bot.get_group_list() + if not group_list: + await event.reply("机器人目前没有加入任何群聊。") + return + except Exception as e: + logger.error(f"[Broadcast] 获取群聊列表失败: {e}") + await event.reply(f"获取群聊列表时发生错误,无法广播。错误信息: {e}") + return + + # 4. 遍历所有群聊并转发消息 + success_count = 0 + failed_count = 0 + total_groups = len(group_list) + + await event.reply(f"准备向 {total_groups} 个群聊广播消息,请稍候...") + + for group in group_list: + group_id = group.get("group_id") + if not group_id: + continue + + try: + # 直接转发被回复的消息 + await bot.forward_message( + group_id=group_id, + message_id=reply.message_id + ) + success_count += 1 + logger.info(f"[Broadcast] 已成功将消息转发至群聊: {group_id}") + except Exception as e: + failed_count += 1 + logger.error(f"[Broadcast] 转发消息至群聊 {group_id} 失败: {e}") + + # 5. 向管理员报告结果 + report_message = ( + f"广播任务完成。\n" + f"总群聊数: {total_groups}\n" + f"成功: {success_count}\n" + f"失败: {failed_count}" + ) + await event.reply(report_message) diff --git a/plugins/code_py.py b/plugins/code_py.py index 2fa2988..055b034 100644 --- a/plugins/code_py.py +++ b/plugins/code_py.py @@ -1,176 +1,156 @@ -""" -code_py插件 - -输入/code py回车再加上python代码,机器人就会执行代码并返回执行结果。 -""" - +# -*- coding: utf-8 -*- +import html +import textwrap import asyncio -import re -import sys -import tempfile -import os -from typing import Tuple, Set +from typing import Dict -from core.bot import Bot from core.command_manager import matcher -from core.executor import run_in_thread_pool from models import MessageEvent +from core.permission_manager import ADMIN +from core.logger import logger __plugin_meta__ = { - "name": "code_py", - "description": "提供执行python代码的功能", - "usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码", + "name": "Python 代码执行", + "description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和转发回复。", + "usage": "/py <单行代码>\n/code_py <单行代码>\n/py (进入多行输入模式)", } -# --- 安全配置:危险模块和内置函数黑名单 --- -DANGEROUS_MODULES = [ - "os", "sys", "subprocess", "shutil", "socket", "requests", "urllib", - "http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing", - "asyncio", -] -DANGEROUS_BUILTINS = [ - "__import__", "open", "exec", "eval", "compile", "input", "breakpoint" -] +# --- 会话状态管理 --- +# 结构: {(user_id, group_id): asyncio.TimerHandle} +multi_line_sessions: Dict[tuple, asyncio.TimerHandle] = {} -# 编译后的正则表达式,用于分割语句 -STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]') -# 编译后的正则表达式,用于查找危险的内置函数调用 -BUILTIN_CALL_PATTERN = re.compile(r'\b(' + '|'.join(DANGEROUS_BUILTINS) + r')\s*\(') - -def is_code_safe(code: str) -> Tuple[bool, str]: +async def reply_as_forward(event: MessageEvent, input_code: str, output_result: str): """ - 检查代码中是否包含危险的模块导入或内置函数调用。 + 将输入和输出打包成转发消息进行回复。 + 参考 forward_test.py 的实现,兼容私聊和群聊。 """ - # 1. 检查危险的内置函数 - found_builtins = BUILTIN_CALL_PATTERN.search(code) - if found_builtins: - return False, f"检测到不允许的内置函数调用:'{found_builtins.group(1)}'" - - # 2. 检查危险的模块导入 - statements = STATEMENT_SPLIT_PATTERN.split(code) - for statement in statements: - statement = statement.strip() - if not statement: - continue - parts = statement.split() - if not parts: - continue - if parts[0] == 'from' and len(parts) > 1: - module_name = parts[1].strip() - if module_name in DANGEROUS_MODULES: - return False, f"检测到不允许的模块导入:'{module_name}'" - elif parts[0] == 'import' and len(parts) > 1: - modules_str = ' '.join(parts[1:]) - imported_modules = [m.strip() for m in modules_str.split(',')] - for module_name in imported_modules: - actual_module_name = module_name.split()[0] - if actual_module_name in DANGEROUS_MODULES: - return False, f"检测到不允许的模块导入:'{actual_module_name}'" - return True, "" - -async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]: - """ - 在子进程中安全地执行Python代码。 - """ - with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf: - tf.write(code_str) - tf_path = tf.name - try: - proc = await asyncio.create_subprocess_exec( - sys.executable, tf_path, - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - try: - out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout) - except asyncio.TimeoutError: - proc.kill() - await proc.communicate() - return "", f"执行超时(>{timeout}s)" - return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore") - finally: - try: - os.remove(tf_path) - except Exception: - pass - -async def process_and_reply(bot: Bot, event: MessageEvent, code: str): - """ - 核心处理逻辑:安全检查、执行代码并回复结果。 - """ - safe, message = await run_in_thread_pool(is_code_safe, code) - if not safe: - await event.reply(f"代码安全检查未通过:\n{message}") - return - - try: - stdout, stderr = await run_code_in_subprocess(code, timeout=10.0) - except Exception as e: - await event.reply(f"执行失败:{e}") - return - - resp = stderr.strip() or stdout.strip() or "(无输出)" - MAX = 1500 - if len(resp) > MAX: - resp = resp[:MAX] + "\n...输出被截断..." - + bot = event.bot + + # 1. 构建消息节点列表 nodes = [ - bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code), - bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp), + bot.build_forward_node( + user_id=event.user_id, + nickname=event.sender.nickname or str(event.user_id), + message=f"--- Your Code ---\n{input_code}" + ), + bot.build_forward_node( + user_id=event.self_id, + nickname="Code Executor", + message=f"--- Execution Result ---\n{output_result}" + ) ] + try: + # 2. 发送合并转发消息 await bot.send_forwarded_messages(event, nodes) except Exception as e: - await event.reply(f"结果发送失败: {e}\n\n{resp}") + logger.error(f"[code_py] 发送转发消息失败: {e}") + # 降级为普通消息回复 + await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}") -# --- 交互式会话状态 --- -# 使用集合存储正在等待代码输入的用户标识 -waiting_users: Set[str] = set() - -def get_session_id(event: MessageEvent) -> str: - """根据事件类型生成唯一的会话ID""" - if hasattr(event, 'group_id'): - # 群聊会话ID - return f"group_{event.group_id}-{event.user_id}" - else: - # 私聊会话ID - return f"private_{event.user_id}" - -@matcher.command("code_py") -async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]): - # 模式一:快速执行单行代码 - if args: - code = " ".join(args) - await process_and_reply(bot, event, code) +async def execute_code(event: MessageEvent, code: str): + """ + 核心代码执行逻辑。 + """ + code_executor = getattr(event.bot, 'code_executor', None) + if not code_executor or not code_executor.docker_client: + await event.reply("代码执行服务当前不可用,请检查 Docker 连接配置。") return - # 模式二:进入交互模式 - session_id = get_session_id(event) - if session_id in waiting_users: - await event.reply("您已经有一个正在等待输入的code会话了,请直接发送代码。") - return + # 修改 add_task,让它能直接接收回复函数 + await code_executor.add_task( + code, + lambda result: reply_as_forward(event, code, result) + ) + await event.reply("代码已提交至沙箱执行队列,请稍候...") + +def cleanup_session(session_key: tuple): + """ + 清理超时的会话。 + """ + if session_key in multi_line_sessions: + del multi_line_sessions[session_key] + logger.info(f"[code_py] 会话 {session_key} 已超时,自动取消。") + +def normalize_code(code: str) -> str: + """ + 规范化用户输入的 Python 代码字符串。 + + 主要处理两个问题: + 1. 对消息中可能存在的 HTML 实体进行解码 (e.g., [ -> [)。 + 2. 移除整个代码块的公共前导缩进,以修复因复制粘贴导致的多余缩进。 + + :param code: 原始代码字符串。 + :return: 规范化后的代码字符串。 + """ + # 1. 解码 HTML 实体 + code = html.unescape(code) + + # 2. 移除公共前导缩进 + try: + code = textwrap.dedent(code) + except Exception: + # 在某些情况下(例如,不一致的缩进),dedent 可能会失败, + # 但我们不希望因此中断流程,所以捕获异常并继续。 + pass - waiting_users.add(session_id) - await event.reply("请在下一条消息中发送要执行的Python代码块。(发送“取消”可退出)") + return code.strip() + + +@matcher.command("py", "python", "code_py", permission=ADMIN) +async def code_py_main(event: MessageEvent, args: list[str]): + """ + /py 命令的主入口。 + - 如果有参数,直接执行。 + - 如果没有参数,开启多行输入模式。 + """ + code_to_run = " ".join(args) + + if code_to_run: + # 单行模式,对代码进行规范化处理 + normalized_code = normalize_code(code_to_run) + if not normalized_code: + await event.reply("代码为空或格式错误,请输入有效的代码。") + return + await execute_code(event, normalized_code) + else: + # 多行模式 + # 使用 getattr 兼容私聊和群聊 + session_key = (event.user_id, getattr(event, 'group_id', 'private')) + + # 如果上一个会话的超时任务还在,先取消它 + if session_key in multi_line_sessions: + multi_line_sessions[session_key].cancel() + + await event.reply("已进入多行代码输入模式,请直接发送你的代码。\n(60秒内无操作将自动取消)") + + # 设置 60 秒超时 + loop = asyncio.get_running_loop() + timeout_handler = loop.call_later( + 60, + cleanup_session, + session_key + ) + multi_line_sessions[session_key] = timeout_handler @matcher.on_message() -async def handle_code_input(bot: Bot, event: MessageEvent): - session_id = get_session_id(event) - - # 检查用户是否处于等待状态 - if session_id in waiting_users: - # 从等待集合中移除,无论输入是什么 - waiting_users.remove(session_id) +async def handle_multi_line_code(event: MessageEvent): + """ + 通用消息处理器,用于捕获多行模式下的代码输入。 + """ + # 使用 getattr 兼容私聊和群聊 + session_key = (event.user_id, getattr(event, 'group_id', 'private')) + if session_key in multi_line_sessions: + # 取消超时任务 + multi_line_sessions[session_key].cancel() + del multi_line_sessions[session_key] + + # 对多行代码进行规范化处理 + normalized_code = normalize_code(event.raw_message) - # 处理取消操作 - if event.raw_message.strip() == "取消": - await event.reply("已取消输入。") - return True # 消费事件 - - # 执行代码 - await process_and_reply(bot, event, event.raw_message) - return True # 消费事件,防止被其他指令匹配 - - # 如果用户不在等待状态,则不处理 - return False - + if not normalized_code: + await event.reply("捕获到的代码为空或格式错误,已取消输入。") + return + await execute_code(event, normalized_code) + return True # 消费事件,防止其他处理器响应 diff --git a/requirements.txt b/requirements.txt index 17a9c33e3864882cb7217cb519675a201e0d8f3d..81bbec89684e9c693f0a5a1a37e498102247c737 100644 GIT binary patch literal 360 zcmXX?Npiy=5WMr3Py)2%!dqNOnpl((U=}OR>(f)ojh>!fn3Y^_{;P+YdLFGEr5dFX zYsGtzgVbW9f(37_9`q!Yk_xlKl}ha+rgFOAf2de%uoPO+cPoQ!INzrR}=ZhbWh@**~ZTFN%OI@P!P6*6{xnM>9u>u+y%ynOE7ZK zg3Jpxr+n#cjA%6W$&rij7fz&@=x`!q^3U8OdQWyEVN3VD<>RQYu*3= diff --git a/sandbox.Dockerfile b/sandbox.Dockerfile new file mode 100644 index 0000000..41f2bc3 --- /dev/null +++ b/sandbox.Dockerfile @@ -0,0 +1,19 @@ +# 使用一个轻量级的 Python 官方镜像作为基础 +FROM python:3.11-slim + +# 创建一个低权限的用户来运行代码,增加安全性 +# -S: 创建一个系统用户 (没有 home 目录) +# -u: 指定用户ID +# -g: 指定组ID +RUN groupadd -g 1001 sandbox && useradd -u 1001 -g sandbox -s /bin/sh -r sandbox + +# 创建一个工作目录,用于存放和执行用户的代码 +WORKDIR /sandbox +# 将目录所有权交给沙箱用户 +RUN chown sandbox:sandbox /sandbox + +# 切换到沙箱用户 +USER sandbox + +# 默认的启动命令是 python,这样容器启动时可以直接执行 .py 文件 +CMD ["python"]