163 lines
5.5 KiB
Python
163 lines
5.5 KiB
Python
"""
|
||
code_py插件
|
||
|
||
输入/code py回车再加上python代码,机器人就会执行代码并返回执行结果。
|
||
"""
|
||
|
||
import asyncio
|
||
import re
|
||
import sys
|
||
import tempfile
|
||
import os
|
||
from typing import Tuple, Set
|
||
|
||
from core.bot import Bot
|
||
from core.command_manager import matcher
|
||
from models import MessageEvent
|
||
|
||
__plugin_meta__ = {
|
||
"name": "code_py",
|
||
"description": "提供执行python代码的功能",
|
||
"usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码",
|
||
}
|
||
|
||
# --- 安全配置:危险模块黑名单 ---
|
||
DANGEROUS_MODULES = [
|
||
"os", "sys", "subprocess", "shutil", "socket", "requests", "urllib",
|
||
"http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing",
|
||
"asyncio",
|
||
]
|
||
|
||
# 编译后的正则表达式,用于分割语句
|
||
STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]')
|
||
|
||
def is_code_safe(code: str) -> Tuple[bool, str]:
|
||
"""
|
||
检查代码中是否包含危险的模块导入。
|
||
"""
|
||
statements = STATEMENT_SPLIT_PATTERN.split(code)
|
||
for statement in statements:
|
||
statement = statement.strip()
|
||
if not statement: continue
|
||
parts = statement.split()
|
||
if not parts: continue
|
||
if parts[0] == 'from' and len(parts) > 1:
|
||
module_name = parts[1].strip()
|
||
if module_name in DANGEROUS_MODULES:
|
||
return False, f"检测到不允许的模块导入:'{module_name}'"
|
||
elif parts[0] == 'import' and len(parts) > 1:
|
||
modules_str = ' '.join(parts[1:])
|
||
imported_modules = [m.strip() for m in modules_str.split(',')]
|
||
for module_name in imported_modules:
|
||
actual_module_name = module_name.split()[0]
|
||
if actual_module_name in DANGEROUS_MODULES:
|
||
return False, f"检测到不允许的模块导入:'{actual_module_name}'"
|
||
return True, ""
|
||
|
||
async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]:
|
||
"""
|
||
在子进程中安全地执行Python代码。
|
||
"""
|
||
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf:
|
||
tf.write(code_str)
|
||
tf_path = tf.name
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
sys.executable, tf_path,
|
||
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||
)
|
||
try:
|
||
out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
proc.kill()
|
||
await proc.communicate()
|
||
return "", f"执行超时(>{timeout}s)"
|
||
return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore")
|
||
finally:
|
||
try:
|
||
os.remove(tf_path)
|
||
except Exception:
|
||
pass
|
||
|
||
async def process_and_reply(bot: Bot, event: MessageEvent, code: str):
|
||
"""
|
||
核心处理逻辑:安全检查、执行代码并回复结果。
|
||
"""
|
||
safe, message = is_code_safe(code)
|
||
if not safe:
|
||
await event.reply(f"代码安全检查未通过:\n{message}")
|
||
return
|
||
|
||
try:
|
||
stdout, stderr = await run_code_in_subprocess(code, timeout=10.0)
|
||
except Exception as e:
|
||
await event.reply(f"执行失败:{e}")
|
||
return
|
||
|
||
resp = stderr.strip() or stdout.strip() or "(无输出)"
|
||
MAX = 1500
|
||
if len(resp) > MAX:
|
||
resp = resp[:MAX] + "\n...输出被截断..."
|
||
|
||
nodes = [
|
||
bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code),
|
||
bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp),
|
||
]
|
||
try:
|
||
await bot.send_forwarded_messages(event, nodes)
|
||
except Exception as e:
|
||
await event.reply(f"结果发送失败: {e}\n\n{resp}")
|
||
|
||
# --- 交互式会话状态 ---
|
||
# 使用集合存储正在等待代码输入的用户标识
|
||
waiting_users: Set[str] = set()
|
||
|
||
def get_session_id(event: MessageEvent) -> str:
|
||
"""根据事件类型生成唯一的会话ID"""
|
||
if hasattr(event, 'group_id'):
|
||
# 群聊会话ID
|
||
return f"group_{event.group_id}-{event.user_id}"
|
||
else:
|
||
# 私聊会话ID
|
||
return f"private_{event.user_id}"
|
||
|
||
@matcher.command("code_py")
|
||
async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]):
|
||
# 模式一:快速执行单行代码
|
||
if args:
|
||
code = " ".join(args)
|
||
await process_and_reply(bot, event, code)
|
||
return
|
||
|
||
# 模式二:进入交互模式
|
||
session_id = get_session_id(event)
|
||
if session_id in waiting_users:
|
||
await event.reply("您已经有一个正在等待输入的code会话了,请直接发送代码。")
|
||
return
|
||
|
||
waiting_users.add(session_id)
|
||
await event.reply("请在下一条消息中发送要执行的Python代码块。(发送“取消”可退出)")
|
||
|
||
@matcher.on_message()
|
||
async def handle_code_input(bot: Bot, event: MessageEvent):
|
||
session_id = get_session_id(event)
|
||
|
||
# 检查用户是否处于等待状态
|
||
if session_id in waiting_users:
|
||
# 从等待集合中移除,无论输入是什么
|
||
waiting_users.remove(session_id)
|
||
|
||
# 处理取消操作
|
||
if event.raw_message.strip() == "取消":
|
||
await event.reply("已取消输入。")
|
||
return True # 消费事件
|
||
|
||
# 执行代码
|
||
await process_and_reply(bot, event, event.raw_message)
|
||
return True # 消费事件,防止被其他指令匹配
|
||
|
||
# 如果用户不在等待状态,则不处理
|
||
return False
|
||
|
||
|