Files
NeoBot/plugins/code_py.py
2026-01-02 19:41:11 +08:00

90 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
code_py插件
输入/code py回车再加上python代码机器人就会执行代码并返回执行结果。
"""
import asyncio
import os
import sys
import tempfile
from typing import Tuple
from core.bot import Bot
from core.command_manager import matcher
from models import MessageEvent
__plugin_meta__ = {
"name": "code_py",
"description": "提供执行python代码的功能",
"usage": "/code py [python代码] - 执行python代码",
}
@matcher.command("code_py")
async def execute_python_code(bot: Bot, event: MessageEvent, args: list[str]):
if not args:
await event.reply("请提供要执行的Python代码。用法/code_py [python代码]")
return
code = " ".join(args)
async def run_code_in_subprocess(
code_str: str, timeout: float = 5.0
) -> Tuple[str, str]:
# 这里用临时文件
with tempfile.NamedTemporaryFile(
"w", suffix=".py", delete=False, encoding="utf-8"
) as tf:
tf.write(code_str)
tf_path = tf.name
try:
proc = await asyncio.create_subprocess_exec(
sys.executable,
tf_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
out_bytes, err_bytes = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return "", f"执行超时(>{timeout}s"
return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore")
finally:
try:
os.remove(tf_path)
except Exception:
pass
try:
stdout, stderr = await run_code_in_subprocess(code, timeout=5.0)
except Exception as e:
await event.reply(f"执行失败:{e}")
return
# 优先显示 stderr如果 stderr 为空则显示 stdout
resp = stderr.strip() or stdout.strip() or "(无输出)"
# 限制返回长度,避免过长消息
MAX = 1500
if len(resp) > MAX:
resp = resp[:MAX] + "\n...输出被截断..."
nodes = [
bot.build_forward_node(user_id=event.self_id, nickname="机器人", message=code),
bot.build_forward_node(
user_id=event.self_id, nickname="机器人", message="执行结果:\n" + resp
),
]
try:
await bot.send_forwarded_messages(event, nodes)
except Exception as e:
await event.reply(f"发送失败: {e}")