移除code_py.py中py命令的ADMIN权限限制,使其对所有用户可用 在jrcd.py中为handle_jrcd和handle_bbcd命令添加对特定事件ID(831797331)的过滤
201 lines
7.5 KiB
Python
201 lines
7.5 KiB
Python
# -*- coding: utf-8 -*-
|
||
import html
|
||
import textwrap
|
||
import asyncio
|
||
from typing import Dict
|
||
import datetime
|
||
import sys
|
||
|
||
from core.managers.command_manager import matcher
|
||
from models.events.message import MessageEvent
|
||
from core.permission import Permission
|
||
from core.utils.logger import logger
|
||
from core.managers.image_manager import image_manager
|
||
from models.message import MessageSegment
|
||
|
||
__plugin_meta__ = {
|
||
"name": "Python 代码执行",
|
||
"description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和图片输出。",
|
||
"usage": "/py <单行代码>\n/code_py <单行代码>\n/py (进入多行输入模式)",
|
||
}
|
||
|
||
# --- 会话状态管理 ---
|
||
# 结构: {(user_id, group_id): asyncio.TimerHandle}
|
||
multi_line_sessions: Dict[tuple, asyncio.TimerHandle] = {}
|
||
|
||
async def generate_and_send_code_image(event: MessageEvent, input_code: str, output_result: str):
|
||
"""
|
||
生成代码执行结果的图片并发送,如果发送失败则降级为文本消息。
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
input_code (str): 用户输入的代码
|
||
output_result (str): 代码执行结果
|
||
"""
|
||
try:
|
||
# 准备模板数据
|
||
user_nickname = event.sender.nickname if event.sender else str(event.user_id)
|
||
user_id = event.user_id
|
||
avatar_initial = user_nickname[0] if user_nickname else "U"
|
||
|
||
# 构建QQ头像URL
|
||
qq_avatar_url = f"https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
|
||
|
||
template_data = {
|
||
"user_nickname": user_nickname,
|
||
"user_id": user_id,
|
||
"avatar_initial": avatar_initial,
|
||
"qq_avatar_url": qq_avatar_url,
|
||
"code": input_code,
|
||
"result": output_result,
|
||
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"execution_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
||
"result_title": "执行成功" if "Traceback" not in output_result and "Error" not in output_result else "执行出错",
|
||
"result_class": "result-success" if "Traceback" not in output_result and "Error" not in output_result else "result-error"
|
||
}
|
||
|
||
# 渲染模板为图片
|
||
image_base64 = await image_manager.render_template_to_base64(
|
||
template_name="code_execution.html",
|
||
data=template_data,
|
||
output_name=f"code_execution_{event.user_id}_{int(datetime.datetime.now().timestamp())}.png",
|
||
quality=90,
|
||
image_type="png"
|
||
)
|
||
|
||
if image_base64:
|
||
# 发送图片
|
||
await event.reply(MessageSegment.image(image_base64))
|
||
else:
|
||
# 如果图片生成失败,降级为文本消息
|
||
await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[code_py] 生成代码执行图片失败: {e}")
|
||
# 降级为文本消息
|
||
await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}")
|
||
|
||
async def execute_code(event: MessageEvent, code: str):
|
||
"""
|
||
核心代码执行逻辑。
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
code (str): 要执行的Python代码
|
||
"""
|
||
code_executor = getattr(event.bot, 'code_executor', None)
|
||
if not code_executor or not code_executor.docker_client:
|
||
await event.reply("代码执行服务当前不可用,请检查 Docker 连接配置。")
|
||
return
|
||
|
||
# 定义一个包装回调函数,确保正确处理异步操作和异常
|
||
async def callback_wrapper(result):
|
||
try:
|
||
await generate_and_send_code_image(event, code, result)
|
||
except Exception as e:
|
||
logger.error(f"[code_py] 执行回调时发生错误: {e}")
|
||
# 即使回调失败,也要确保任务被标记为完成
|
||
# 降级为简单文本回复
|
||
try:
|
||
await event.reply(f"代码执行结果:\n{result}")
|
||
except Exception as reply_error:
|
||
logger.error(f"[code_py] 发送降级回复时也失败: {reply_error}")
|
||
|
||
await code_executor.add_task(
|
||
code,
|
||
callback_wrapper
|
||
)
|
||
await event.reply("代码已提交至沙箱执行队列,请稍候...")
|
||
|
||
def cleanup_session(session_key: tuple):
|
||
"""
|
||
清理超时的会话。
|
||
"""
|
||
if session_key in multi_line_sessions:
|
||
del multi_line_sessions[session_key]
|
||
logger.info(f"[code_py] 会话 {session_key} 已超时,自动取消。")
|
||
|
||
def normalize_code(code: str) -> str:
|
||
"""
|
||
规范化用户输入的 Python 代码字符串。
|
||
|
||
主要处理两个问题:
|
||
1. 对消息中可能存在的 HTML 实体进行解码 (e.g., [ -> [)。
|
||
2. 移除整个代码块的公共前导缩进,以修复因复制粘贴导致的多余缩进。
|
||
|
||
:param code: 原始代码字符串。
|
||
:return: 规范化后的代码字符串。
|
||
"""
|
||
# 1. 解码 HTML 实体
|
||
code = html.unescape(code)
|
||
|
||
# 2. 移除公共前导缩进
|
||
try:
|
||
code = textwrap.dedent(code)
|
||
except Exception:
|
||
# 在某些情况下(例如,不一致的缩进),dedent 可能会失败,
|
||
# 但我们不希望因此中断流程,所以捕获异常并继续。
|
||
pass
|
||
|
||
return code.strip()
|
||
|
||
|
||
@matcher.command("py", "python", "code_py")
|
||
async def code_py_main(event: MessageEvent, args: list[str]):
|
||
"""
|
||
/py 命令的主入口。
|
||
- 如果有参数,直接执行。
|
||
- 如果没有参数,开启多行输入模式。
|
||
"""
|
||
code_to_run = " ".join(args)
|
||
|
||
if code_to_run:
|
||
# 单行模式,对代码进行规范化处理
|
||
normalized_code = normalize_code(code_to_run)
|
||
if not normalized_code:
|
||
await event.reply("代码为空或格式错误,请输入有效的代码。")
|
||
return
|
||
await execute_code(event, normalized_code)
|
||
else:
|
||
# 多行模式
|
||
# 使用 getattr 兼容私聊和群聊
|
||
session_key = (event.user_id, getattr(event, 'group_id', 'private'))
|
||
|
||
# 如果上一个会话的超时任务还在,先取消它
|
||
if session_key in multi_line_sessions:
|
||
multi_line_sessions[session_key].cancel()
|
||
|
||
await event.reply("已进入多行代码输入模式,请直接发送你的代码。\n(60秒内无操作将自动取消)")
|
||
|
||
# 设置 60 秒超时
|
||
loop = asyncio.get_running_loop()
|
||
timeout_handler = loop.call_later(
|
||
60,
|
||
cleanup_session,
|
||
session_key
|
||
)
|
||
multi_line_sessions[session_key] = timeout_handler
|
||
|
||
@matcher.on_message()
|
||
async def handle_multi_line_code(event: MessageEvent):
|
||
"""
|
||
通用消息处理器,用于捕获多行模式下的代码输入。
|
||
"""
|
||
# 使用 getattr 兼容私聊和群聊
|
||
session_key = (event.user_id, getattr(event, 'group_id', 'private'))
|
||
if session_key in multi_line_sessions:
|
||
# 取消超时任务
|
||
multi_line_sessions[session_key].cancel()
|
||
del multi_line_sessions[session_key]
|
||
|
||
# 对多行代码进行规范化处理
|
||
normalized_code = normalize_code(event.raw_message)
|
||
|
||
if not normalized_code:
|
||
await event.reply("捕获到的代码为空或格式错误,已取消输入。")
|
||
return
|
||
|
||
await execute_code(event, normalized_code)
|
||
return True # 消费事件,防止其他处理器响应
|