""" code_py插件 输入/code py回车再加上python代码,机器人就会执行代码并返回执行结果。 """ import asyncio import re import sys import tempfile import os from typing import Tuple, Set from core.bot import Bot from core.command_manager import matcher from core.executor import run_in_thread_pool from models import MessageEvent __plugin_meta__ = { "name": "code_py", "description": "提供执行python代码的功能", "usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码", } # --- 安全配置:危险模块和内置函数黑名单 --- DANGEROUS_MODULES = [ "os", "sys", "subprocess", "shutil", "socket", "requests", "urllib", "http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing", "asyncio", ] DANGEROUS_BUILTINS = [ "__import__", "open", "exec", "eval", "compile", "input", "breakpoint" ] # 编译后的正则表达式,用于分割语句 STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]') # 编译后的正则表达式,用于查找危险的内置函数调用 BUILTIN_CALL_PATTERN = re.compile(r'\b(' + '|'.join(DANGEROUS_BUILTINS) + r')\s*\(') def is_code_safe(code: str) -> Tuple[bool, str]: """ 检查代码中是否包含危险的模块导入或内置函数调用。 """ # 1. 检查危险的内置函数 found_builtins = BUILTIN_CALL_PATTERN.search(code) if found_builtins: return False, f"检测到不允许的内置函数调用:'{found_builtins.group(1)}'" # 2. 检查危险的模块导入 statements = STATEMENT_SPLIT_PATTERN.split(code) for statement in statements: statement = statement.strip() if not statement: continue parts = statement.split() if not parts: continue if parts[0] == 'from' and len(parts) > 1: module_name = parts[1].strip() if module_name in DANGEROUS_MODULES: return False, f"检测到不允许的模块导入:'{module_name}'" elif parts[0] == 'import' and len(parts) > 1: modules_str = ' '.join(parts[1:]) imported_modules = [m.strip() for m in modules_str.split(',')] for module_name in imported_modules: actual_module_name = module_name.split()[0] if actual_module_name in DANGEROUS_MODULES: return False, f"检测到不允许的模块导入:'{actual_module_name}'" return True, "" async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]: """ 在子进程中安全地执行Python代码。 """ with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf: tf.write(code_str) tf_path = tf.name try: proc = await asyncio.create_subprocess_exec( sys.executable, tf_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.communicate() return "", f"执行超时(>{timeout}s)" return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore") finally: try: os.remove(tf_path) except Exception: pass async def process_and_reply(bot: Bot, event: MessageEvent, code: str): """ 核心处理逻辑:安全检查、执行代码并回复结果。 """ safe, message = await run_in_thread_pool(is_code_safe, code) if not safe: await event.reply(f"代码安全检查未通过:\n{message}") return try: stdout, stderr = await run_code_in_subprocess(code, timeout=10.0) except Exception as e: await event.reply(f"执行失败:{e}") return resp = stderr.strip() or stdout.strip() or "(无输出)" MAX = 1500 if len(resp) > MAX: resp = resp[:MAX] + "\n...输出被截断..." nodes = [ bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code), bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp), ] try: await bot.send_forwarded_messages(event, nodes) except Exception as e: await event.reply(f"结果发送失败: {e}\n\n{resp}") # --- 交互式会话状态 --- # 使用集合存储正在等待代码输入的用户标识 waiting_users: Set[str] = set() def get_session_id(event: MessageEvent) -> str: """根据事件类型生成唯一的会话ID""" if hasattr(event, 'group_id'): # 群聊会话ID return f"group_{event.group_id}-{event.user_id}" else: # 私聊会话ID return f"private_{event.user_id}" @matcher.command("code_py") async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]): # 模式一:快速执行单行代码 if args: code = " ".join(args) await process_and_reply(bot, event, code) return # 模式二:进入交互模式 session_id = get_session_id(event) if session_id in waiting_users: await event.reply("您已经有一个正在等待输入的code会话了,请直接发送代码。") return waiting_users.add(session_id) await event.reply("请在下一条消息中发送要执行的Python代码块。(发送“取消”可退出)") @matcher.on_message() async def handle_code_input(bot: Bot, event: MessageEvent): session_id = get_session_id(event) # 检查用户是否处于等待状态 if session_id in waiting_users: # 从等待集合中移除,无论输入是什么 waiting_users.remove(session_id) # 处理取消操作 if event.raw_message.strip() == "取消": await event.reply("已取消输入。") return True # 消费事件 # 执行代码 await process_and_reply(bot, event, event.raw_message) return True # 消费事件,防止被其他指令匹配 # 如果用户不在等待状态,则不处理 return False