Files
NeoBot/plugins/code_py.py
2026-01-04 19:38:47 +08:00

166 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
code_py插件
输入/code py回车再加上python代码机器人就会执行代码并返回执行结果。
"""
import asyncio
import re
import sys
import tempfile
import os
from typing import Tuple, Set
from core.bot import Bot
from core.command_manager import matcher
from core.executor import run_in_thread_pool
from models import MessageEvent
__plugin_meta__ = {
"name": "code_py",
"description": "提供执行python代码的功能",
"usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码",
}
# --- 安全配置:危险模块黑名单 ---
DANGEROUS_MODULES = [
"os", "sys", "subprocess", "shutil", "socket", "requests", "urllib",
"http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing",
"asyncio",
]
# 编译后的正则表达式,用于分割语句
STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]')
def is_code_safe(code: str) -> Tuple[bool, str]:
"""
检查代码中是否包含危险的模块导入。
"""
statements = STATEMENT_SPLIT_PATTERN.split(code)
for statement in statements:
statement = statement.strip()
if not statement:
continue
parts = statement.split()
if not parts:
continue
if parts[0] == 'from' and len(parts) > 1:
module_name = parts[1].strip()
if module_name in DANGEROUS_MODULES:
return False, f"检测到不允许的模块导入:'{module_name}'"
elif parts[0] == 'import' and len(parts) > 1:
modules_str = ' '.join(parts[1:])
imported_modules = [m.strip() for m in modules_str.split(',')]
for module_name in imported_modules:
actual_module_name = module_name.split()[0]
if actual_module_name in DANGEROUS_MODULES:
return False, f"检测到不允许的模块导入:'{actual_module_name}'"
return True, ""
async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]:
"""
在子进程中安全地执行Python代码。
"""
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf:
tf.write(code_str)
tf_path = tf.name
try:
proc = await asyncio.create_subprocess_exec(
sys.executable, tf_path,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return "", f"执行超时(>{timeout}s"
return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore")
finally:
try:
os.remove(tf_path)
except Exception:
pass
async def process_and_reply(bot: Bot, event: MessageEvent, code: str):
"""
核心处理逻辑:安全检查、执行代码并回复结果。
"""
safe, message = await run_in_thread_pool(is_code_safe, code)
if not safe:
await event.reply(f"代码安全检查未通过:\n{message}")
return
try:
stdout, stderr = await run_code_in_subprocess(code, timeout=10.0)
except Exception as e:
await event.reply(f"执行失败:{e}")
return
resp = stderr.strip() or stdout.strip() or "(无输出)"
MAX = 1500
if len(resp) > MAX:
resp = resp[:MAX] + "\n...输出被截断..."
nodes = [
bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code),
bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp),
]
try:
await bot.send_forwarded_messages(event, nodes)
except Exception as e:
await event.reply(f"结果发送失败: {e}\n\n{resp}")
# --- 交互式会话状态 ---
# 使用集合存储正在等待代码输入的用户标识
waiting_users: Set[str] = set()
def get_session_id(event: MessageEvent) -> str:
"""根据事件类型生成唯一的会话ID"""
if hasattr(event, 'group_id'):
# 群聊会话ID
return f"group_{event.group_id}-{event.user_id}"
else:
# 私聊会话ID
return f"private_{event.user_id}"
@matcher.command("code_py")
async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]):
# 模式一:快速执行单行代码
if args:
code = " ".join(args)
await process_and_reply(bot, event, code)
return
# 模式二:进入交互模式
session_id = get_session_id(event)
if session_id in waiting_users:
await event.reply("您已经有一个正在等待输入的code会话了请直接发送代码。")
return
waiting_users.add(session_id)
await event.reply("请在下一条消息中发送要执行的Python代码块。发送“取消”可退出")
@matcher.on_message()
async def handle_code_input(bot: Bot, event: MessageEvent):
session_id = get_session_id(event)
# 检查用户是否处于等待状态
if session_id in waiting_users:
# 从等待集合中移除,无论输入是什么
waiting_users.remove(session_id)
# 处理取消操作
if event.raw_message.strip() == "取消":
await event.reply("已取消输入。")
return True # 消费事件
# 执行代码
await process_and_reply(bot, event, event.raw_message)
return True # 消费事件,防止被其他指令匹配
# 如果用户不在等待状态,则不处理
return False