From 3fcac59ef9c2a8d93e2e805a4036de5d3276bca2 Mon Sep 17 00:00:00 2001 From: K2cr2O1 <2221577113@qq.com> Date: Fri, 2 Jan 2026 20:10:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96codepy=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/__init__.py | 2 +- core/command_manager.py | 63 +++++++++----- main.py | 2 +- plugins/code_py.py | 177 ++++++++++++++++++++++++++++------------ 4 files changed, 170 insertions(+), 74 deletions(-) diff --git a/core/__init__.py b/core/__init__.py index 2717750..032d0c6 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -1,6 +1,6 @@ from .command_manager import matcher from .config_loader import global_config from .plugin_manager import PluginDataManager -from .WS import WS +from .ws import WS __all__ = ["WS", "matcher", "global_config", "PluginDataManager"] diff --git a/core/command_manager.py b/core/command_manager.py index f95053a..ec875b2 100644 --- a/core/command_manager.py +++ b/core/command_manager.py @@ -36,13 +36,15 @@ class CommandManager: Args: prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。 """ + # --- 初始化所有处理器列表 --- self.prefixes = prefixes - self.commands: Dict[str, Callable] = {} # 存储消息指令处理器 - self.notice_handlers: List[Dict] = [] # 存储通知事件处理器 - self.request_handlers: List[Dict] = [] # 存储请求事件处理器 - self.plugins: Dict[str, Dict[str, Any]] = {} # 存储已加载插件的元数据 + self.commands: Dict[str, Callable] = {} + self.message_handlers: List[Callable] = [] + self.notice_handlers: List[Dict] = [] + self.request_handlers: List[Dict] = [] + self.plugins: Dict[str, Dict[str, Any]] = {} - # --- 注册内置 help 指令 --- + # --- 注册内置指令 --- self.commands["help"] = self._help_command self.plugins["core.help"] = { "name": "帮助", @@ -50,6 +52,26 @@ class CommandManager: "usage": "/help", } + def on_message(self) -> Callable: + """ + 装饰器:用于注册一个通用的消息处理器。 + + 被此装饰器注册的函数,会在每次收到消息时(在指令匹配前)被调用。 + 如果函数返回 True,则表示该消息已被“消费”,后续的指令匹配将不会进行。 + + Example: + @matcher.on_message() + async def code_input_handler(bot, event): + if is_waiting_for_code(event.user_id): + await process_code(event.raw_message) + return True # 消费事件 + """ + def decorator(func: Callable) -> Callable: + self.message_handlers.append(func) + return func + return decorator + + async def _help_command(self, bot, event): """ 内置的 `/help` 命令的实现。 @@ -164,21 +186,21 @@ class CommandManager: async def handle_message(self, bot, event): """ - 处理消息事件,解析并分发指令。 - - 该方法会检查消息是否以已配置的命令前缀开头,如果是,则解析出 - 指令名称和参数,并调用对应的处理器。 - - Args: - bot: Bot 实例。 - event: 消息事件对象。 + 处理消息事件,优先执行通用处理器,然后解析并分发指令。 """ + # --- 1. 执行通用消息处理器 --- + for handler in self.message_handlers: + # 如果任何一个处理器返回 True,则中断后续处理 + consumed = await self._run_handler(handler, bot, event) + if consumed: + return + + # --- 2. 检查并执行指令 --- if not event.raw_message: return raw_text = event.raw_message.strip() - # 1. 检查前缀 prefix_found = None for p in self.prefixes: if raw_text.startswith(p): @@ -188,7 +210,6 @@ class CommandManager: if not prefix_found: return - # 2. 拆分指令和参数 full_cmd = raw_text[len(prefix_found) :].split() if not full_cmd: return @@ -196,7 +217,6 @@ class CommandManager: cmd_name = full_cmd[0] args = full_cmd[1:] - # 3. 查找并执行 if cmd_name in self.commands: func = self.commands[cmd_name] await self._run_handler(func, bot, event, args) @@ -227,7 +247,7 @@ class CommandManager: async def _run_handler(self, func: Callable, bot, event, args: List[str] = None): """ - 智能执行事件处理器。 + 智能执行事件处理器,并返回事件是否被消费。 该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参数 (如 `bot`, `event`, `args`),实现了依赖注入。 @@ -237,7 +257,9 @@ class CommandManager: bot: Bot 实例。 event: 事件对象。 args (List[str], optional): 指令参数列表(仅对消息事件有效)。 - Defaults to None. + + Returns: + bool: 如果处理器函数返回 True,则返回 True,否则返回 False。 """ sig = inspect.signature(func) params = sig.parameters @@ -250,8 +272,9 @@ class CommandManager: if "args" in params and args is not None: kwargs["args"] = args - # 执行函数 - await func(**kwargs) + # 执行函数并获取返回值 + result = await func(**kwargs) + return result is True # --- 全局单例 --- diff --git a/main.py b/main.py index 1d9d6cc..d4c723c 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,7 @@ from watchdog.events import FileSystemEventHandler # 初始化日志系统,必须在其他 core 模块导入之前执行 from core.logger import logger -from core import WS +from core.ws import WS from core.plugin_manager import load_all_plugins diff --git a/plugins/code_py.py b/plugins/code_py.py index 5af3e4c..cb53f2b 100644 --- a/plugins/code_py.py +++ b/plugins/code_py.py @@ -5,10 +5,11 @@ code_py插件 """ import asyncio -import os +import re import sys import tempfile -from typing import Tuple +import os +from typing import Tuple, Set from core.bot import Bot from core.command_manager import matcher @@ -17,73 +18,145 @@ from models import MessageEvent __plugin_meta__ = { "name": "code_py", "description": "提供执行python代码的功能", - "usage": "/code py [python代码] - 执行python代码", + "usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码", } +# --- 安全配置:危险模块黑名单 --- +DANGEROUS_MODULES = [ + "os", "sys", "subprocess", "shutil", "socket", "requests", "urllib", + "http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing", + "asyncio", +] -@matcher.command("code_py") -async def execute_python_code(bot: Bot, event: MessageEvent, args: list[str]): - if not args: - await event.reply("请提供要执行的Python代码。用法:/code_py [python代码]") +# 编译后的正则表达式,用于分割语句 +STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]') + +def is_code_safe(code: str) -> Tuple[bool, str]: + """ + 检查代码中是否包含危险的模块导入。 + """ + statements = STATEMENT_SPLIT_PATTERN.split(code) + for statement in statements: + statement = statement.strip() + if not statement: continue + parts = statement.split() + if not parts: continue + if parts[0] == 'from' and len(parts) > 1: + module_name = parts[1].strip() + if module_name in DANGEROUS_MODULES: + return False, f"检测到不允许的模块导入:'{module_name}'" + elif parts[0] == 'import' and len(parts) > 1: + modules_str = ' '.join(parts[1:]) + imported_modules = [m.strip() for m in modules_str.split(',')] + for module_name in imported_modules: + actual_module_name = module_name.split()[0] + if actual_module_name in DANGEROUS_MODULES: + return False, f"检测到不允许的模块导入:'{actual_module_name}'" + return True, "" + +async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]: + """ + 在子进程中安全地执行Python代码。 + """ + with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf: + tf.write(code_str) + tf_path = tf.name + try: + proc = await asyncio.create_subprocess_exec( + sys.executable, tf_path, + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + try: + out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + return "", f"执行超时(>{timeout}s)" + return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore") + finally: + try: + os.remove(tf_path) + except Exception: + pass + +async def process_and_reply(bot: Bot, event: MessageEvent, code: str): + """ + 核心处理逻辑:安全检查、执行代码并回复结果。 + """ + safe, message = is_code_safe(code) + if not safe: + await event.reply(f"代码安全检查未通过:\n{message}") return - code = " ".join(args) - - async def run_code_in_subprocess( - code_str: str, timeout: float = 5.0 - ) -> Tuple[str, str]: - # 这里用临时文件 - with tempfile.NamedTemporaryFile( - "w", suffix=".py", delete=False, encoding="utf-8" - ) as tf: - tf.write(code_str) - tf_path = tf.name - - try: - proc = await asyncio.create_subprocess_exec( - sys.executable, - tf_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - try: - out_bytes, err_bytes = await asyncio.wait_for( - proc.communicate(), timeout=timeout - ) - except asyncio.TimeoutError: - proc.kill() - await proc.communicate() - return "", f"执行超时(>{timeout}s)" - - return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore") - finally: - try: - os.remove(tf_path) - except Exception: - pass - try: - stdout, stderr = await run_code_in_subprocess(code, timeout=5.0) + stdout, stderr = await run_code_in_subprocess(code, timeout=10.0) except Exception as e: await event.reply(f"执行失败:{e}") return - # 优先显示 stderr,如果 stderr 为空则显示 stdout resp = stderr.strip() or stdout.strip() or "(无输出)" - - # 限制返回长度,避免过长消息 MAX = 1500 if len(resp) > MAX: resp = resp[:MAX] + "\n...输出被截断..." nodes = [ - bot.build_forward_node(user_id=event.self_id, nickname="机器人", message=code), - bot.build_forward_node( - user_id=event.self_id, nickname="机器人", message="执行结果:\n" + resp - ), + bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code), + bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp), ] - try: await bot.send_forwarded_messages(event, nodes) except Exception as e: - await event.reply(f"发送失败: {e}") + await event.reply(f"结果发送失败: {e}\n\n{resp}") + +# --- 交互式会话状态 --- +# 使用集合存储正在等待代码输入的用户标识 +waiting_users: Set[str] = set() + +def get_session_id(event: MessageEvent) -> str: + """根据事件类型生成唯一的会话ID""" + if hasattr(event, 'group_id'): + # 群聊会话ID + return f"group_{event.group_id}-{event.user_id}" + else: + # 私聊会话ID + return f"private_{event.user_id}" + +@matcher.command("code_py") +async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]): + # 模式一:快速执行单行代码 + if args: + code = " ".join(args) + await process_and_reply(bot, event, code) + return + + # 模式二:进入交互模式 + session_id = get_session_id(event) + if session_id in waiting_users: + await event.reply("您已经有一个正在等待输入的code会话了,请直接发送代码。") + return + + waiting_users.add(session_id) + await event.reply("请在下一条消息中发送要执行的Python代码块。(发送“取消”可退出)") + +@matcher.on_message() +async def handle_code_input(bot: Bot, event: MessageEvent): + session_id = get_session_id(event) + + # 检查用户是否处于等待状态 + if session_id in waiting_users: + # 从等待集合中移除,无论输入是什么 + waiting_users.remove(session_id) + + # 处理取消操作 + if event.raw_message.strip() == "取消": + await event.reply("已取消输入。") + return True # 消费事件 + + # 执行代码 + await process_and_reply(bot, event, event.raw_message) + return True # 消费事件,防止被其他指令匹配 + + # 如果用户不在等待状态,则不处理 + return False + +