Files
NeoBot/plugins/code_py.py
K2cr2O1 1420d0f0b2 feat(web_parser): 新增通用web链接解析插件框架
refactor: 重构B站、抖音、GitHub解析器为模块化结构

fix(executor): 增强docker容器错误处理和回调稳定性

style(templates): 优化帮助页面和代码执行结果的样式

perf(web_parser): 添加API缓存和消息去重机制

docs: 更新插件元信息和注释

chore: 移除旧的独立解析器插件文件
2026-01-22 01:58:13 +08:00

201 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import html
import textwrap
import asyncio
from typing import Dict
import datetime
import sys
from core.managers.command_manager import matcher
from models.events.message import MessageEvent
from core.permission import Permission
from core.utils.logger import logger
from core.managers.image_manager import image_manager
from models.message import MessageSegment
__plugin_meta__ = {
"name": "Python 代码执行",
"description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和图片输出。",
"usage": "/py <单行代码>\n/code_py <单行代码>\n/py (进入多行输入模式)",
}
# --- 会话状态管理 ---
# 结构: {(user_id, group_id): asyncio.TimerHandle}
multi_line_sessions: Dict[tuple, asyncio.TimerHandle] = {}
async def generate_and_send_code_image(event: MessageEvent, input_code: str, output_result: str):
"""
生成代码执行结果的图片并发送,如果发送失败则降级为文本消息。
Args:
event (MessageEvent): 消息事件对象
input_code (str): 用户输入的代码
output_result (str): 代码执行结果
"""
try:
# 准备模板数据
user_nickname = event.sender.nickname if event.sender else str(event.user_id)
user_id = event.user_id
avatar_initial = user_nickname[0] if user_nickname else "U"
# 构建QQ头像URL
qq_avatar_url = f"https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
template_data = {
"user_nickname": user_nickname,
"user_id": user_id,
"avatar_initial": avatar_initial,
"qq_avatar_url": qq_avatar_url,
"code": input_code,
"result": output_result,
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"execution_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"result_title": "执行成功" if "Traceback" not in output_result and "Error" not in output_result else "执行出错",
"result_class": "result-success" if "Traceback" not in output_result and "Error" not in output_result else "result-error"
}
# 渲染模板为图片
image_base64 = await image_manager.render_template_to_base64(
template_name="code_execution.html",
data=template_data,
output_name=f"code_execution_{event.user_id}_{int(datetime.datetime.now().timestamp())}.png",
quality=90,
image_type="png"
)
if image_base64:
# 发送图片
await event.reply(MessageSegment.image(image_base64))
else:
# 如果图片生成失败,降级为文本消息
await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}")
except Exception as e:
logger.error(f"[code_py] 生成代码执行图片失败: {e}")
# 降级为文本消息
await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}")
async def execute_code(event: MessageEvent, code: str):
"""
核心代码执行逻辑。
Args:
event (MessageEvent): 消息事件对象
code (str): 要执行的Python代码
"""
code_executor = getattr(event.bot, 'code_executor', None)
if not code_executor or not code_executor.docker_client:
await event.reply("代码执行服务当前不可用,请检查 Docker 连接配置。")
return
# 定义一个包装回调函数,确保正确处理异步操作和异常
async def callback_wrapper(result):
try:
await generate_and_send_code_image(event, code, result)
except Exception as e:
logger.error(f"[code_py] 执行回调时发生错误: {e}")
# 即使回调失败,也要确保任务被标记为完成
# 降级为简单文本回复
try:
await event.reply(f"代码执行结果:\n{result}")
except Exception as reply_error:
logger.error(f"[code_py] 发送降级回复时也失败: {reply_error}")
await code_executor.add_task(
code,
callback_wrapper
)
await event.reply("代码已提交至沙箱执行队列,请稍候...")
def cleanup_session(session_key: tuple):
"""
清理超时的会话。
"""
if session_key in multi_line_sessions:
del multi_line_sessions[session_key]
logger.info(f"[code_py] 会话 {session_key} 已超时,自动取消。")
def normalize_code(code: str) -> str:
"""
规范化用户输入的 Python 代码字符串。
主要处理两个问题:
1. 对消息中可能存在的 HTML 实体进行解码 (e.g., &#91; -> [)。
2. 移除整个代码块的公共前导缩进,以修复因复制粘贴导致的多余缩进。
:param code: 原始代码字符串。
:return: 规范化后的代码字符串。
"""
# 1. 解码 HTML 实体
code = html.unescape(code)
# 2. 移除公共前导缩进
try:
code = textwrap.dedent(code)
except Exception:
# 在某些情况下例如不一致的缩进dedent 可能会失败,
# 但我们不希望因此中断流程,所以捕获异常并继续。
pass
return code.strip()
@matcher.command("py", "python", "code_py", permission=Permission.ADMIN)
async def code_py_main(event: MessageEvent, args: list[str]):
"""
/py 命令的主入口。
- 如果有参数,直接执行。
- 如果没有参数,开启多行输入模式。
"""
code_to_run = " ".join(args)
if code_to_run:
# 单行模式,对代码进行规范化处理
normalized_code = normalize_code(code_to_run)
if not normalized_code:
await event.reply("代码为空或格式错误,请输入有效的代码。")
return
await execute_code(event, normalized_code)
else:
# 多行模式
# 使用 getattr 兼容私聊和群聊
session_key = (event.user_id, getattr(event, 'group_id', 'private'))
# 如果上一个会话的超时任务还在,先取消它
if session_key in multi_line_sessions:
multi_line_sessions[session_key].cancel()
await event.reply("已进入多行代码输入模式,请直接发送你的代码。\n(60秒内无操作将自动取消)")
# 设置 60 秒超时
loop = asyncio.get_running_loop()
timeout_handler = loop.call_later(
60,
cleanup_session,
session_key
)
multi_line_sessions[session_key] = timeout_handler
@matcher.on_message()
async def handle_multi_line_code(event: MessageEvent):
"""
通用消息处理器,用于捕获多行模式下的代码输入。
"""
# 使用 getattr 兼容私聊和群聊
session_key = (event.user_id, getattr(event, 'group_id', 'private'))
if session_key in multi_line_sessions:
# 取消超时任务
multi_line_sessions[session_key].cancel()
del multi_line_sessions[session_key]
# 对多行代码进行规范化处理
normalized_code = normalize_code(event.raw_message)
if not normalized_code:
await event.reply("捕获到的代码为空或格式错误,已取消输入。")
return
await execute_code(event, normalized_code)
return True # 消费事件,防止其他处理器响应