From aaf4a896dd8b3c3f3987f50c6811d3fc8df889ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=95=80=E9=93=AC=E9=85=B8=E9=92=BE?= <148796996+K2cr2O1@users.noreply.github.com> Date: Tue, 6 Jan 2026 22:59:50 +0800 Subject: [PATCH] Dev (#26) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 整合开发历史 * codepy安全性升级 * 优化一些东西 * 再次优化 * 更新一下 requirements.txt * CQ码支持以及视频解析 * hotfix * 更新DEV readme.md * feat: 添加Docker沙箱代码执行功能 - 新增Docker沙箱执行环境,提供安全隔离的代码执行能力 - 重构code_py插件,使用Docker容器替代子进程执行 - 添加docker配置项和权限检查功能 - 实现代码执行队列和并发控制 - 新增广播插件,仅限管理员使用 --- .gitignore | 1 + README.md | 1121 +++++------------------------------- config.toml | 12 + core/command_manager.py | 4 +- core/config_loader.py | 9 + core/event_handler.py | 16 +- core/executor.py | 197 ++++++- core/permission_manager.py | 27 +- main.py | 15 + plugins/broadcast.py | 75 +++ plugins/code_py.py | 284 +++++---- requirements.txt | Bin 750 -> 360 bytes sandbox.Dockerfile | 19 + 13 files changed, 620 insertions(+), 1160 deletions(-) create mode 100644 plugins/broadcast.py create mode 100644 sandbox.Dockerfile diff --git a/.gitignore b/.gitignore index 2729cb2..bb22f85 100644 --- a/.gitignore +++ b/.gitignore @@ -139,3 +139,4 @@ dmypy.json .pytype/ # End of https://www.toptal.com/developers/gitignore/api/python +/ca \ No newline at end of file diff --git a/README.md b/README.md index 5c299b3..ec20324 100644 --- a/README.md +++ b/README.md @@ -1,267 +1,117 @@ # Calglau BOT by NEO Bot Framework -## 📖 概述 +> **[INTERNAL USE ONLY]** +> +> 本仓库为 Calglau BOT 的内部开发版本,请遵守相关保密协议。 -NEO 是一个基于 Python 的现代化 OneBot 11 协议机器人框架,专为需要高性能、可扩展性和开发效率的团队设计。该框架通过 WebSocket 与各种 OneBot 实现端(如 NapCatQQ、LLOneBot 等)通信,提供了一套完整的机器人开发解决方案。 +**Powered by NEO Bot Framework** -### 设计理念 +## 📖 项目概述 -NEO 框架的设计遵循以下核心理念: +**Calglau BOT** 是一个基于 NEO Bot Framework 构建的、功能丰富的 QQ 机器人。它被设计为一个模块化、易于扩展的内部工具,通过插件化的方式集成了多种实用与娱乐功能。 -1. **开发者友好**:简洁的 API 设计、完整的类型提示和详细的文档,让开发者能够快速上手和高效开发 -2. **架构清晰**:采用模块化设计,分离关注点,使代码易于维护和扩展 -3. **高性能异步**:基于 `asyncio` 和 `websockets` 构建,支持高并发消息处理 -4. **类型安全**:全面使用 Python 类型系统,提供编译时类型检查,减少运行时错误 -5. **热重载支持**:支持插件热重载,开发过程中修改代码无需重启机器人 +本项目旨在提供一个稳定、高性能且开发体验优秀的机器人平台,服务于我们的社群管理和日常自动化需求。 -### 核心价值 +### ✨ 核心特性 -- **快速原型开发**:通过简洁的装饰器语法快速定义指令和事件处理器 -- **生产环境就绪**:内置断线重连、错误处理和性能监控机制 -- **可扩展架构**:支持自定义插件、中间件和权限系统 -- **现代化开发体验**:支持热重载、类型提示和完整的 API 文档 +* **模块化插件架构**:所有功能均以独立插件形式存在于 `plugins/` 目录,易于开发、维护和热重载。 +* **高性能异步核心**:基于 `asyncio` 和 `websockets`,确保在高并发消息下依然响应迅速。 +* **开发者友好**:内置插件热重载,修改代码无需重启;完整的类型提示和清晰的 API 设计,提升开发效率。 +* **集成 Redis 缓存**:自动缓存常用 API 调用(如群信息),减少重复请求,提升响应速度。 +* **内置帮助系统**:通过 `/help` 指令可自动生成并展示所有已加载插件的功能说明。 -### 适用场景 +### 🛠️ 技术栈 -- QQ 群机器人管理 -- 自动化客服与问答系统 -- 游戏社区管理 -- 团队内部工具集成 -- 教育与培训辅助 +* **核心框架**: Python 3.8+ & NEO Bot Framework +* **异步库**: `asyncio` +* **网络通信**: `websockets` (OneBot v11) +* **缓存**: `Redis` +* **日志**: `Loguru` +* **文件监控**: `watchdog` (用于热重载) -## ✨ 特性 - -* **OneBot 11 标准支持**:完整支持 OneBot 11 的消息、通知、请求和元事件。 -* **类型安全**:基于 `dataclasses` 的强类型事件模型,开发体验更佳。 -* **插件系统**:轻量级的装饰器风格插件系统,支持指令 (`@matcher.command`) 和事件监听 (`@matcher.on_notice`, `@matcher.on_request`)。 -* **插件元数据与内置帮助**:插件可通过 `__plugin_meta__` 变量进行自我描述。框架核心内置了 `/help` 指令,可自动收集并展示所有插件的帮助信息,无需手动维护。 -* **🔥 热重载支持**:内置文件监控,修改 `plugins` 下的代码自动重载,无需重启,极大提升调试效率。 -* **异步核心**:基于 `asyncio` 和 `websockets` 的高性能异步核心。 -* **自动重连**:内置 WebSocket 断线重连机制。 - -## ⚡️ 性能优化 - -### Redis 缓存机制 -为了提升响应速度并减少对 OneBot API 的重复调用,框架核心集成了一套基于 Redis 的缓存系统。对于一些不频繁变更的数据(如群信息、好友列表等),首次查询后会将其缓存至 Redis,在缓存有效期内(默认为 1 小时),后续请求将直接从 Redis 读取,极大提升了性能。 - -#### 工作原理 -- **自动缓存**:框架会自动缓存特定 API 的调用结果。 -- **缓存键**:缓存键根据 API 名称和关键参数(如 `group_id`, `user_id`)生成,确保唯一性。 -- **过期时间**:默认缓存 1 小时,之后会自动失效,下次调用时将重新从 OneBot 实现端获取最新数据。 - -#### 受影响的 API -以下核心 API 已默认启用缓存: -- `get_group_info` -- `get_group_member_info` -- `get_friend_list` -- `get_stranger_info` -- `get_login_info` - -#### 如何绕过缓存 -在某些场景下,你可能需要获取实时数据而非缓存数据。为此,所有受缓存影响的 API 方法都增加了一个 `no_cache: bool = False` 的可选参数。 - -当你需要强制从服务器获取最新信息时,只需在调用时传入 `no_cache=True` 即可。 - -**示例:** -```python -# 正常调用,会使用缓存 -group_info = await bot.get_group_info(group_id=12345) - -# 强制获取最新信息,不使用缓存 -latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True) -``` - -### `__slots__` 内存优化 -框架内的所有数据模型(包括事件、消息段、API 返回对象等)均已启用 `__slots__ = True` 优化。这可以显著减少每个对象实例的内存占用,特别是在处理大量事件和数据时,能够有效降低机器人的整体内存消耗。 - - -## 📝 待办事项 (TODO) - -### API 封装 -- [x] **消息相关** - - `delete_msg`: 撤回消息 - - `get_msg`: 获取消息 - - `get_forward_msg`: 获取合并转发消息 - - `send_like`: 发送点赞 -- [x] **群组管理** - - `set_group_kick`: 群组踢人 - - `set_group_ban`: 群组单人禁言 - - `set_group_anonymous_ban`: 群组匿名禁言 - - `set_group_whole_ban`: 群组全员禁言 - - `set_group_admin`: 群组设置管理员 - - `set_group_anonymous`: 群组匿名 - - `set_group_card`: 设置群名片(群备注) - - `set_group_name`: 设置群名 - - `set_group_leave`: 退出群组 - - `set_group_special_title`: 设置群组专属头衔 -- [x] **群组信息** - - `get_group_info`: 获取群信息 - - `get_group_list`: 获取群列表 - - `get_group_member_info`: 获取群成员信息 - - `get_group_member_list`: 获取群成员列表 - - `get_group_honor_info`: 获取群荣誉信息 -- [x] **用户相关** - - `get_login_info`: 获取登录号信息 - - `get_stranger_info`: 获取陌生人信息 - - `get_friend_list`: 获取好友列表 -- [x] **请求处理** - - `set_friend_add_request`: 处理加好友请求 - - `set_group_add_request`: 处理加群请求/邀请 -- [x] **系统/其他** - - `get_version_info`: 获取版本信息 - - `get_status`: 获取状态 - - `can_send_image`: 检查是否可以发送图片 - - `can_send_record`: 检查是否可以发送语音 - - `clean_cache`: 清理缓存 - -### 待实现 API -- [ ] **Web 凭证类** - - `get_cookies` - - `get_csrf_token` - - `get_credentials` -- [ ] **文件/资源信息** - - `get_image` - - `get_record` - - `get_file` -- [ ] **系统控制** - - `set_restart` -- [ ] **扩展功能** - - [x] `send_forward_msg`: 发送合并转发消息 - -### 其他改进 -- [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。 -- [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。 -- [x] **权限系统**: 实现基础的权限管理(超级管理员、群管理员等)。 -- [x] **日志系统优化**: 引入 `loguru` 进行日志记录,支持文件输出和日志级别控制。 -- [x] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。 -- [x] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。 +--- ## 📂 项目结构 ``` . -├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载) -│ ├── admin.py # 管理员插件 -│ ├── code_py.py # Python 代码执行插件 -│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令 -│ ├── forward_test.py # 示例插件:演示合并转发消息 -│ ├── jrcd.py # 娱乐插件:/jrcd 和 /bbcd -│ └── thpic.py # 图片插件:/thpic -├── core/ # 核心框架代码 -│ ├── api/ # API 模块抽象层 -│ ├── bot.py # Bot 实例与 API 封装 -│ ├── admin_manager.py # 管理员管理模块 -│ ├── command_manager.py # 命令与事件分发器 -│ ├── config_loader.py # 配置加载器 -│ ├── event_handler.py # 事件处理器 -│ ├── executor.py # 插件执行器 -│ ├── logger.py # 日志系统 -│ ├── permission_manager.py # 权限管理器 -│ ├── plugin_manager.py # 插件加载与管理 -│ ├── redis_manager.py # Redis 连接管理器 -│ └── ws.py # WebSocket 客户端核心 -├── data/ # 数据存储目录 -│ ├── admin.json # 管理员配置文件 -│ └── permissions.json # 权限数据 -├── html/ # HTML 静态文件 +├── plugins/ # 插件目录,所有机器人的功能模块都在这里 +│ ├── admin.py +│ ├── bili_parser.py +│ ├── code_py.py +│ ├── echo.py +│ ├── forward_test.py +│ ├── jrcd.py +│ └── thpic.py +├── core/ # NEO 框架核心代码,通常无需修改 +│ ├── api/ +│ ├── bot.py +│ ├── ... +│ └── ws.py +├── data/ # 数据存储目录 (管理员列表, 权限配置) +│ ├── admin.json +│ └── permissions.json +├── html/ # 静态网页文件 │ ├── 404.html │ └── index.html -├── models/ # 数据模型 -│ ├── events/ # OneBot 事件定义 -│ ├── message.py # 消息段定义 -│ ├── objects.py # API 返回对象定义 -│ └── sender.py # 发送者定义 +├── models/ # 数据模型 (事件, 消息段等) +│ ├── ... ├── .gitignore -├── config.toml # 配置文件 -├── main.py # 启动入口(包含热重载监控) -└── requirements.txt # 项目依赖 +├── config.toml # 主配置文件 +├── main.py # 项目启动入口 +└── requirements.txt # Python 依赖 ``` -### 目录结构详细说明 - -#### `plugins/` - 插件目录 -- **功能**存放:所有机器人插件,支持热重载机制 -- **加载机制**:框架会自动扫描此目录下的所有 `.py` 文件,并作为插件加载 -- **插件约定**:每个插件文件应包含 `__plugin_meta__` 字典用于插件元数据定义 -- **热重载**:开发过程中修改插件文件会自动触发重载,无需重启机器人 -- **内置插件**: - - `admin.py` - 管理员管理插件,支持动态添加/移除管理员 - - `code_py.py` - Python 代码执行插件,支持安全的代码执行环境 - - `echo.py` - 示例插件,演示基本指令处理 - - `forward_test.py` - 合并转发消息演示插件 - - `jrcd.py` - 娱乐插件,提供 `/jrcd` 和 `/bbcd` 指令 - - `thpic.py` - 图片插件,提供 `/thpic` 指令返回东方Project图片 - -#### `core/` - 核心框架代码 -- `api/` - API 模块抽象层 - - `base.py` - API 基类定义 - - `message.py` - 消息相关 API 封装 - - `group.py` - 群组管理 API 封装 - - `friend.py` - 好友相关 API 封装 - - `account.py` - 账号相关 API 封装 -- `bot.py` - Bot 核心类,通过 Mixin 模式继承所有 API 功能,提供统一的调用接口 - - `admin_manager.py` - 管理员管理模块,负责管理员的添加、移除和权限验证 - - `command_manager.py` - 命令与事件分发器,负责注册和处理所有指令和事件 -- `config_loader.py` - 配置加载器,读取和解析 `config.toml` 配置文件 -- `event_handler.py` - 事件处理器,负责将原始事件转换为类型化事件对象 -- `executor.py` - 插件执行器,提供线程池执行环境用于执行同步任务 -- `logger.py` - 日志系统,基于 `loguru` 提供高性能日志记录 -- `permission_manager.py` - 权限管理器,管理用户权限级别(admin、op、user) -- `plugin_manager.py` - 插件加载与管理,负责插件的扫描、加载和热重载 -- `redis_manager.py` - Redis 连接管理器,提供异步 Redis 客户端连接池 -- `ws.py` - WebSocket 客户端核心,负责与 OneBot 实现端建立和管理连接 - -#### `data/` - 数据存储目录 -- `admin.json` - 管理员配置文件,存储全局管理员列表 -- `permissions.json` - 权限数据文件,存储用户权限映射关系 - -#### `html/` - HTML 静态文件 -- `404.html` - 404 错误页面 -- `index.html` - 项目主页 HTML 文件,展示项目信息和特性 - -#### `models/` - 数据模型定义 -- `events/` - OneBot 事件定义 - - `base.py` - 事件基类定义 - - `message.py` - 消息事件定义 - - `notice.py` - 通知事件定义 - - `request.py` - 请求事件定义 - - `meta.py` - 元事件定义 - - `factory.py` - 事件工厂类,用于根据 JSON 数据创建对应事件对象 -- `message.py` - 消息段定义,支持文本、图片、表情等多种消息类型 -- `objects.py` - API 返回对象定义,提供强类型化的 API 响应数据模型 -- `sender.py` - 发送者定义,包含用户、群成员等信息 - -#### 根目录文件 -- `.gitignore` - Git 忽略文件配置 -- `config.toml` - 主配置文件,包含 WebSocket 连接、机器人指令前缀、Redis 连接等配置 -- `main.py` - 程序入口文件,负责初始化插件、启动热重载监控和建立 WebSocket 连接 -- `requirements.txt` - Python 依赖包列表 +--- ## 🚀 快速开始 ### 1. 环境准备 -* Python 3.8+ -* OneBot 11 实现端(推荐 [NapCatQQ](https://github.com/NapNeko/NapCatQQ) 或 LLOneBot) +* **Python 3.12 或更高版本** + * **我觉得**: 在开发和调试阶段使用官方的 **CPython** 解释器,以获得最佳的第三方库兼容性和调试体验。 + * **你也可以觉得**: 在生产环境部署时,可以考虑使用 **PyPy** 以获取潜在的性能提升,但这可能会牺牲一定的兼容性。 +* Redis 服务 +* 一个正在运行的 OneBot v11 实现端 (推荐 **NapCatQQ**) ### 2. 安装依赖 +克隆本项目后,在项目根目录执行: ```bash pip install -r requirements.txt ``` -### 3. 配置文件 +### 3. 配置 -修改根目录下的 `config.toml`,配置 WebSocket 连接信息: +**[内部开发]** + +为了方便内部开发和调试,项目中的 `config.toml` 文件已预先配置为连接到官方的 DEV 调试服务器。 + +**因此,在拉取仓库后,您通常无需对 `config.toml` 文件进行任何修改即可直接运行。** + +如果您需要连接到本地或其他特定环境,可以参考以下配置结构进行修改。配置示例: ```toml +# config.toml + [napcat_ws] -uri = "ws://127.0.0.1:30004" # OneBot 实现端的 WebSocket 地址 -token = "your_token" # Access Token (如果有) -reconnect_interval = 5 # 断线重连间隔(秒) +# OneBot 实现端的 WebSocket 地址 +uri = "ws://127.0.0.1:3001" +# Access Token (如果有) +token = "" +# 断线重连间隔(秒) +reconnect_interval = 5 [bot] -command = ["/"] # 指令前缀,支持多个,如 ["/", "#"] +# 机器人指令的起始符号,可配置多个 +command_prefixes = ["/", "!", "!"] + +[redis] +# Redis 连接信息 +host = "127.0.0.1" +port = 6379 +db = 0 +password = "" ``` ### 4. 运行 @@ -269,794 +119,109 @@ command = ["/"] # 指令前缀,支持多个,如 ["/", "#"] ```bash python main.py ``` +机器人启动后,将自动连接到 OneBot 实现端。控制台会输出加载的插件列表和连接状态。 -## 🛠️ 开发指南 +--- -### 🔥 热重载调试 +## 🛠️ 插件开发指南 -项目集成了 `watchdog` 文件监控。在开发过程中,你只需要: -1. 保持 `main.py` 运行。 -2. 修改或新建 `plugins` 目录下的 `.py` 插件文件。 -3. 保存文件。 -4. 控制台会自动提示 `[HotReload] 插件重载完成`,新的逻辑立即生效。 +Calglau BOT 的所有功能都通过插件实现。开发新功能非常简单,并且得益于热重载,你无需在开发过程中频繁重启机器人。 -### 创建新插件 +### 🔥 热重载工作流 -在 `plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。 +1. 保持 `python main.py` 进程运行。 +2. 在 `plugins/` 目录下创建或修改任意 `.py` 文件。 +3. **保存文件**。 +4. 观察控制台输出 `[HotReload] 插件重载完成` 的提示。你的新代码已即时生效。 -### 示例代码 +### 创建一个新插件 -#### 1. 注册消息指令 +1. 在 `plugins/` 目录下新建一个 Python 文件,例如 `weather.py`。 +2. 在该文件中编写你的逻辑。 -使用 `@matcher.command("指令名")` 注册指令。 +#### 1. 定义插件元数据 (`__plugin_meta__`) + +为了让 `/help` 指令能自动发现你的插件,请在文件顶部定义 `__plugin_meta__` 字典: ```python -from core.command_manager import matcher -from core.bot import Bot -from models import MessageEvent +# plugins/weather.py -# 注册 /hello 指令 -@matcher.command("hello") -async def handle_hello(bot: Bot, event: MessageEvent, args: list[str]): - # args 是去除指令后的参数列表 - await event.reply("你好!这里是 NEO Bot。") -``` - -#### 2. 监听通知事件 - -使用 `@matcher.on_notice("通知类型")` 监听通知。 - -```python -from core.command_manager import matcher -from core.bot import Bot -from models import GroupIncreaseNoticeEvent - -# 监听群成员增加事件 -@matcher.on_notice("group_increase") -async def welcome_new_member(bot: Bot, event: GroupIncreaseNoticeEvent): - await bot.send_group_msg(event.group_id, f"欢迎新成员 {event.user_id} 加入!") -``` - -#### 3. 监听请求事件 - -使用 `@matcher.on_request("请求类型")` 监听请求。 - -```python -from core.command_manager import matcher -from core.bot import Bot -from models import FriendRequestEvent - -# 自动同意好友请求 -@matcher.on_request("friend") -async def auto_approve_friend(bot: Bot, event: FriendRequestEvent): - await bot.call_api("set_friend_add_request", { - "flag": event.flag, - "approve": True - }) -``` - -#### 4. API 调用方式对比 - -框架提供两种 API 调用方式:**类型化 API**(推荐)和 **通用 API**(备用)。 - -##### 方式一:类型化 API(推荐) -对于已封装的 API,框架提供了类型化的方法,返回数据模型对象而非原始字典: - -```python -from core.command_manager import matcher -from core.bot import Bot -from models import MessageEvent -from models.objects import Group - -@matcher.command("info") -async def get_group_info_typed(bot: Bot, event: MessageEvent, args: list[str]): - # 使用类型化 API,返回 Group 对象 - group: Group = await bot.get_group_info(event.group_id) - await event.reply(f"群名:{group.group_name}\n成员数:{group.member_count}\n创建时间:{group.create_time}") -``` - -##### 方式二:通用 API(备用) -如果框架尚未封装某个 OneBot API,你可以使用 `bot.call_api` 直接调用。这是通用的备用调用方法。 - -```python -from core.command_manager import matcher -from core.bot import Bot -from models import MessageEvent - -@matcher.command("info_legacy") -async def get_group_info_legacy(bot: Bot, event: MessageEvent, args: list[str]): - # 直接调用 get_group_info API - # action: API 名称 - # params: API 参数字典 - resp = await bot.call_api("get_group_info", { - "group_id": event.group_id, - "no_cache": False - }) - - if resp.get("status") == "ok": - group_name = resp["data"]["group_name"] - await event.reply(f"当前群名:{group_name}") -``` - -**建议**:优先使用类型化 API,获得更好的类型安全和代码提示。仅在框架未封装特定 API 时使用通用 API。 - -## 📖 插件开发指南 - -### 插件基本结构 -一个标准的插件文件应该包含以下部分: -1. **模块文档字符串**:描述插件功能 -2. **导入必要的模块**:从 `core` 和 `models` 导入所需类 -3. **使用装饰器注册事件处理器**:`@matcher.command()`, `@matcher.on_notice()`, `@matcher.on_request()` -4. **异步函数实现业务逻辑**:使用 `async def` 定义处理函数 - -### 插件元数据 (`__plugin_meta__`) -为了实现插件的自动发现和帮助信息的自动生成,框架引入了插件元数据机制。你需要在你的插件模块中定义一个名为 `__plugin_meta__` 的字典。 - -`load_all_plugins` 函数在加载插件时会自动读取这个变量,并将其注册到 `CommandManager` 中。`/help` 指令会遍历所有已注册的元数据,生成格式化的帮助信息。 - -一个标准的 `__plugin_meta__` 包含以下字段: - -- `name` (str): 插件的友好名称,例如 "回声"。 -- `description` (str): 对插件功能的简短描述。 -- `usage` (str): 插件的使用方法,可以包含多个指令和它们的说明。 - -**示例:** -```python -# plugins/echo.py - -__plugin_meta__ = { - "name": "回声与交互", - "description": "提供 echo 和 赞我 功能", - "usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞", -} -``` - -### 使用类型化 API -框架现已提供完整的类型化 API 封装,建议优先使用这些封装方法而非原始的 `call_api`: - -| API 方法 | 返回类型 | 说明 | -|---------|---------|------| -| `bot.send_group_msg()` | `Message` | 发送群消息 | -| `bot.get_group_info()` | `Group` | 获取群信息 | -| `bot.get_group_member_info()` | `GroupMember` | 获取群成员信息 | -| `bot.get_friend_list()` | `List[Friend]` | 获取好友列表 | -| `bot.get_login_info()` | `LoginInfo` | 获取登录信息 | -| `bot.get_version_info()` | `VersionInfo` | 获取版本信息 | - -#### 示例:使用类型化 API 重构群信息查询 -```python -from core.command_manager import matcher -from core.bot import Bot -from models import MessageEvent -from models.objects import Group - -@matcher.command("group_info") -async def get_group_info_typed(bot: Bot, event: MessageEvent, args: list[str]): - # 使用类型化 API,返回 Group 对象而非字典 - group: Group = await bot.get_group_info(event.group_id) - await event.reply(f"群名:{group.group_name}\n成员数:{group.member_count}\n创建时间:{group.create_time}") -``` - -### 事件处理模式 -除了基本的消息指令,还可以处理多种事件类型: - -#### 1. 通知事件处理 -```python -from models import GroupCardChangeEvent - -@matcher.on_notice("group_card") -async def handle_group_card_change(bot: Bot, event: GroupCardChangeEvent): - # event.card_new 是新名片,event.card_old 是旧名片 - await bot.send_group_msg(event.group_id, f"成员 {event.user_id} 的名片从 '{event.card_old}' 改为 '{event.card_new}'") -``` - -#### 2. 请求事件处理 -```python -from models import GroupRequestEvent - -@matcher.on_request("group") -async def handle_group_request(bot: Bot, event: GroupRequestEvent): - # 根据请求类型处理 - if event.sub_type == "add": - # 自动同意加群请求 - await bot.set_group_add_request(event.flag, event.sub_type, approve=True) - await bot.send_group_msg(event.group_id, f"已同意用户 {event.user_id} 的加群请求") -``` - -### 错误处理 -建议在插件中添加适当的错误处理,避免单个插件崩溃影响整个机器人: - -```python -@matcher.command("dangerous") -async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]): - try: - # 可能失败的操作 - result = await bot.call_api("some_api", {"param": "value"}) - await event.reply(f"成功:{result}") - except Exception as e: - await event.reply(f"执行失败:{str(e)}") - # 记录日志 - from core.logger import logger - logger.error(f"插件执行错误:{e}", exc_info=True) -``` - -### 处理同步阻塞操作 -为了保持机器人的响应性,所有可能导致长时间阻塞的同步操作都应该在单独的线程池中执行。框架提供了 `run_in_thread_pool` 函数来简化这一过程。 - -**示例:执行同步阻塞任务** -```python -from core.command_manager import matcher -from core.bot import Bot -from models import MessageEvent -from core.executor import run_in_thread_pool -import time - -# 模拟一个耗时的同步操作 -def blocking_task(duration: int): - time.sleep(duration) - return f"阻塞任务完成,耗时 {duration} 秒" - -@matcher.command("block_test") -async def handle_blocking_test(bot: Bot, event: MessageEvent, args: list[str]): - if not args or not args[0].isdigit(): - await event.reply("请提供一个数字作为阻塞时间(秒)。例如:/block_test 5") - return - - duration = int(args[0]) - await event.reply(f"开始执行阻塞任务,耗时 {duration} 秒...") - - # 将同步阻塞任务放入线程池执行 - result = await run_in_thread_pool(blocking_task, duration) - await event.reply(result) -``` - -### 权限管理 -框架内置了基于用户角色的权限管理系统,支持 `admin`(超级管理员)、`op`(操作员)、`user`(普通用户)三个权限级别。权限数据存储在 `data/permissions.json` 文件中。 - -#### 权限级别说明 -- **admin**:最高权限,可以执行所有管理命令,包括添加/移除其他管理员 -- **op**:操作员权限,可以执行大部分管理命令,但不能修改管理员列表 -- **user**:普通用户权限,只能使用基础功能 - -#### 在插件中使用权限控制 -注册命令时可以通过 `permission` 参数指定所需权限级别: - -```python -from models import MessageEvent - -# 只有管理员可以执行此命令 -@matcher.command("admin_only", permission=MessageEvent.ADMIN) -async def admin_command(bot: Bot, event: MessageEvent, args: list[str]): - await event.reply("此命令仅限管理员使用") - -# 操作员及以上权限可以执行 -@matcher.command("op_only", permission=MessageEvent.OP) -async def op_command(bot: Bot, event: MessageEvent, args: list[str]): - await event.reply("此命令需要操作员权限") - -# 所有用户都可以执行(默认) -@matcher.command("public") -async def public_command(bot: Bot, event: MessageEvent, args: list[str]): - await event.reply("所有用户都可以使用此命令") -``` - -#### 动态权限检查 -如果需要更复杂的权限逻辑,可以使用 `override_permission_check=True` 参数,然后在函数中手动检查权限: - -```python -@matcher.command( - "special", - permission=MessageEvent.OP, - override_permission_check=True -) -async def special_command(bot: Bot, event: MessageEvent, permission_granted: bool): - if not permission_granted: - await event.reply("权限不足!") - return - - # 额外的权限逻辑 - if event.user_id == 123456: - await event.reply("特殊用户,允许执行") - else: - await event.reply("普通用户,拒绝执行") -``` - -### 使用 Redis 进行数据缓存 -框架集成了 Redis 客户端,提供了便捷的异步接口用于数据缓存和持久化。Redis 连接管理器会自动管理连接池,你可以在插件中直接使用。 - -#### 基本用法 -```python -from core.redis_manager import redis_manager - -@matcher.command("cache") -async def cache_example(bot: Bot, event: MessageEvent, args: list[str]): - # 设置缓存 - await redis_manager.set("user:123:name", "张三") - - # 获取缓存 - name = await redis_manager.get("user:123:name") - - # 设置带过期时间的缓存(单位:秒) - await redis_manager.setex("temp:data", 3600, "临时数据") - - # 删除缓存 - await redis_manager.delete("user:123:name") - - await event.reply(f"用户名:{name}") -``` - -#### 使用哈希表(Hash) -```python -# 设置哈希字段 -await redis_manager.hset("user:123", "age", 20) -await redis_manager.hset("user:123", "city", "北京") - -# 获取哈希字段 -age = await redis_manager.hget("user:123", "age") -user_data = await redis_manager.hgetall("user:123") - -# 删除哈希字段 -await redis_manager.hdel("user:123", "city") -``` - -#### 使用列表(List) -```python -# 向列表添加元素 -await redis_manager.lpush("recent:actions", "login") -await redis_manager.rpush("recent:actions", "logout") - -# 获取列表范围 -actions = await redis_manager.lrange("recent:actions", 0, 9) - -# 获取列表长度 -length = await redis_manager.llen("recent:actions") -``` - -### 插件数据管理 -对于需要持久化存储配置或数据的插件,框架提供了 `PluginDataManager` 类,可以方便地管理 JSON 格式的数据文件。 - -#### 基本用法 -```python -from core.plugin_manager import PluginDataManager - -# 初始化数据管理器 -data_manager = PluginDataManager("weather_plugin") - -@matcher.command("weather_set") -async def set_weather_config(bot: Bot, event: MessageEvent, args: list[str]): - if len(args) < 2: - await event.reply("用法:/weather_set <城市> <温度>") - return - - city = args[0] - temperature = args[1] - - # 保存配置 - await data_manager.set(city, temperature) - await event.reply(f"已设置 {city} 的温度为 {temperature}℃") - -@matcher.command("weather_get") -async def get_weather_config(bot: Bot, event: MessageEvent, args: list[str]): - if not args: - await event.reply("用法:/weather_get <城市>") - return - - city = args[0] - - # 读取配置 - temperature = data_manager.get(city) - if temperature: - await event.reply(f"{city} 的温度是 {temperature}℃") - else: - await event.reply(f"未找到 {city} 的温度配置") -``` - -#### 数据文件位置 -插件数据文件保存在 `plugins/data/` 目录下,每个插件对应一个独立的 JSON 文件。例如 `weather_plugin` 插件的数据文件为 `plugins/data/weather_plugin.json`。 - -### 插件开发最佳实践 -1. **单一职责**:每个插件专注于一个功能领域 -2. **错误处理**:妥善处理可能发生的异常 -3. **类型提示**:为函数参数和返回值添加类型提示 -4. **文档完整**:为每个函数添加文档字符串 -5. **性能考虑**:避免在插件中执行耗时同步操作 -6. **资源清理**:必要时使用 `try...finally` 确保资源释放 - -### 🚀 高性能插件开发规范 (避坑指南) -为了保证整个机器人框架的响应速度和稳定性,所有插件都必须遵循异步、非阻塞的开发原则。任何一个插件中的阻塞操作都可能导致整个机器人卡顿或无响应。 - -以下是必须遵守的核心规范: - -#### 1. 禁止任何形式的同步网络请求 -- **错误示范**: 使用 `requests` 库发起网络请求。 - ```python - import requests - # 错误!这会阻塞整个程序 - response = requests.get("https://api.example.com") - ``` -- **正确做法**: 必须使用异步 HTTP 客户端,如 `aiohttp` 或 `httpx`。 - ```python - import httpx - # 正确,使用 async with 和 await - async with httpx.AsyncClient() as client: - response = await client.get("https://api.example.com") - ``` - -#### 2. 禁止使用 `time.sleep()` -- **错误示范**: 使用 `time.sleep()` 进行等待。 - ```python - import time - # 错误!这会阻塞事件循环 - time.sleep(5) - ``` -- **正确做法**: 必须使用 `asyncio.sleep()`。 - ```python - import asyncio - # 正确,这会将控制权交还给事件循环 - await asyncio.sleep(5) - ``` - -#### 3. 谨慎处理文件 I/O -- 对于读写小型、本地文件,直接使用 `with open(...)` 通常是可接受的。 -- 但对于大型文件或网络文件系统(NFS)上的文件,同步 I/O 可能会导致明显的阻塞。 -- **推荐做法**: 对于可能耗时较长的文件操作,使用 `aiofiles` 库。 - ```python - import aiofiles - async with aiofiles.open('large_file.dat', mode='rb') as f: - contents = await f.read() - ``` - -#### 4. 将 CPU 密集型任务移出事件循环 -- 如果插件需要执行复杂的计算(例如,图像处理、视频转码、数据分析),这些任务会长时间占用 CPU,同样会阻塞事件循环。 -- **正确做法**: 使用 `loop.run_in_executor()` 将这类任务抛到独立的线程池或进程池中执行。 - ```python - import asyncio - - def cpu_bound_task(data): - # 这是一个耗时的同步函数 - # ... 进行大量计算 ... - return "计算结果" - - # 在异步的事件处理器中调用 - loop = asyncio.get_running_loop() - # `None` 表示使用默认的线程池 - result = await loop.run_in_executor(None, cpu_bound_task, "一些数据") - await event.reply(result) - ``` - -遵循以上规范,可以确保您开发的插件不会成为整个机器人应用的性能瓶颈。 - -### 示例:完整插件模板 -```python -""" -天气查询插件 - -提供 /weather 指令,查询指定城市的天气信息。 -""" -from core.command_manager import matcher -from core.bot import Bot -from models import MessageEvent - -# 插件元数据,用于 help 指令 __plugin_meta__ = { "name": "天气查询", - "description": "查询指定城市的天气信息", - "usage": "/weather [城市名称]", + "description": "提供城市天气查询功能。", + "usage": "/weather [城市名] - 查询指定城市的实时天气。", } +``` + +#### 2. 编写指令处理器 + +使用 `@matcher.command()` 装饰器来注册一个聊天指令。 + +```python +# plugins/weather.py +from core.command_manager import matcher +from models import MessageEvent + +# ... (元数据定义) ... @matcher.command("weather") -async def handle_weather(bot: Bot, event: MessageEvent, args: list[str]): +async def handle_weather_command(event: MessageEvent, args: list[str]): """ - 查询天气信息 - - :param bot: Bot 实例 - :param event: 消息事件对象 - :param args: 指令参数列表(城市名称) + 处理 /weather 指令 + :param event: 消息事件对象,用于回复等操作 + :param args: 用户发送的参数列表 (已按空格分割) """ if not args: - await event.reply("请输入城市名称,例如:/weather 北京") + await event.reply("请输入要查询的城市名,例如:/weather 北京") return - city = " ".join(args) - try: - # 这里可以调用天气 API - weather_info = f"{city} 的天气:晴,25℃" - await event.reply(weather_info) - except Exception as e: - await event.reply(f"查询天气失败:{str(e)}") - -# 可以注册多个事件处理器 -@matcher.on_notice("group_increase") -async def welcome_new_member(bot: Bot, event): - await bot.send_group_msg(event.group_id, f"欢迎新成员 {event.user_id} 加入!") -``` - -## 📚 事件模型说明 - -NEO 框架的事件模型是基于 OneBot v11 协议的强类型数据模型,采用 `dataclasses` 和类型注解构建。所有事件都继承自 `OneBotEvent` 基类,并通过事件工厂自动从 JSON 数据创建对应的事件对象。 - -### 事件层次结构 - -``` -OneBotEvent (抽象基类) -├── MetaEvent (元事件) -│ ├── HeartbeatEvent (心跳事件) -│ └── LifeCycleEvent (生命周期事件) -├── MessageEvent (消息事件) -│ ├── PrivateMessageEvent (私聊消息事件) -│ └── GroupMessageEvent (群聊消息事件) -├── NoticeEvent (通知事件) -│ ├── FriendAddNoticeEvent (好友添加通知) -│ ├── FriendRecallNoticeEvent (好友消息撤回通知) -│ ├── GroupRecallNoticeEvent (群消息撤回通知) -│ ├── GroupIncreaseNoticeEvent (群成员增加通知) -│ ├── GroupDecreaseNoticeEvent (群成员减少通知) -│ ├── GroupAdminNoticeEvent (群管理员变动通知) -│ ├── GroupBanNoticeEvent (群禁言通知) -│ ├── GroupUploadNoticeEvent (群文件上传通知) -│ ├── PokeNotifyEvent (戳一戳通知) -│ ├── LuckyKingNotifyEvent (运气王通知) -│ ├── HonorNotifyEvent (群荣誉变更通知) -│ ├── GroupCardNoticeEvent (群成员名片更新通知) -│ ├── OfflineFileNoticeEvent (离线文件通知) -│ ├── ClientStatusNoticeEvent (客户端状态变更通知) -│ └── EssenceNoticeEvent (精华消息变动通知) -└── RequestEvent (请求事件) - ├── FriendRequestEvent (加好友请求) - └── GroupRequestEvent (加群请求/邀请) -``` - -### 事件基类:OneBotEvent - -所有事件的基类,定义了事件的通用属性和方法: - -```python -@dataclass(slots=True) -class OneBotEvent(ABC): - """ - OneBot v11 事件的抽象基类。 + city = args[0] - Attributes: - time (int): 事件发生的时间戳 (秒) - self_id (int): 收到事件的机器人 QQ 号 - _bot (Optional[Bot]): 内部持有的 Bot 实例引用 - """ - time: int - self_id: int - _bot: Optional["Bot"] = field(default=None, init=False) + # 此处应调用天气 API 获取数据 + # (示例代码,省略了真实 API 调用) + weather_data = f"{city}的天气是:晴,25°C。" - @property - @abstractmethod - def post_type(self) -> str: - """事件的上报类型,子类必须重写此属性""" - pass - - @property - def bot(self) -> "Bot": - """获取与此事件关联的 Bot 实例""" - if self._bot is None: - raise ValueError("Bot instance not set for this event") - return self._bot - - @bot.setter - def bot(self, value: "Bot"): - """为事件对象设置关联的 Bot 实例""" - self._bot = value + await event.reply(weather_data) ``` -### 事件类型常量 +#### 3. 监听事件 -框架定义了完整的事件类型常量,用于标识不同种类的事件: - -```python -class EventType: - META = 'meta_event' # 元事件:心跳、生命周期等 - REQUEST = 'request ' # 请求事件:加好友请求、加群请求等 - NOTICE = 'notice' # 通知事件:群成员增加、文件上传等 - MESSAGE = 'message' # 消息事件:私聊消息、群消息等 - MESSAGE_SENT = 'message_sent' # 消息发送事件:机器人自己发送消息的上报 -``` - -### 消息事件 - -消息事件是机器人最常处理的事件类型,框架提供了完整的消息段支持和便捷的回复方法: - -#### MessageEvent (消息事件基类) - -```python -@dataclass -class MessageEvent(OneBotEvent): - message_type: str # 消息类型: private (私聊), group (群聊) - sub_type: str # 消息子类型 - message_id: int # 消息 ID - user_id: int # 发送者 QQ 号 - message: List[MessageSegment] # 消息内容列表 - raw_message: str # 原始消息内容 - font: int # 字体 - sender: Optional[Sender] # 发送者信息 - - @property - def post_type(self) -> str: - return EventType.MESSAGE - - async def reply(self, message: str, auto_escape: bool = False): - """回复消息(抽象方法,由子类实现)""" - raise NotImplementedError -``` - -#### PrivateMessageEvent (私聊消息事件) - -```python -@dataclass -class PrivateMessageEvent(MessageEvent): - async def reply(self, message: str, auto_escape: bool = False): - """回复私聊消息""" - await self.bot.send_private_msg( - user_id=self.user_id, message=message, auto_escape=auto_escape - ) -``` - -#### GroupMessageEvent (群聊消息事件) - -```python -@dataclass -class GroupMessageEvent(MessageEvent): - group_id: int = 0 # 群号 - anonymous: Optional[Anonymous] = None # 匿名信息 - - async def reply(self, message: str, auto_escape: bool = False): - """回复群聊消息""" - await self.bot.send_group_msg( - group_id=self.group_id, message=message, auto_escape=auto_escape - ) -``` - -### 通知事件 - -通知事件用于处理各种系统通知,如群成员变动、文件上传等: - -#### 常用通知事件示例 - -```python -@dataclass -class GroupIncreaseNoticeEvent(GroupNoticeEvent): - """群成员增加通知""" - operator_id: int = 0 # 操作者 QQ 号 - sub_type: str = "" # 子类型: approve (管理员同意入群), invite (管理员邀请入群) - -@dataclass -class GroupRecallNoticeEvent(GroupNoticeEvent): - """群消息撤回通知""" - operator_id: int = 0 # 操作者 QQ 号 - message_id: int = 0 # 被撤回的消息 ID - -@dataclass -class PokeNotifyEvent(NotifyNoticeEvent): - """戳一戳通知""" - target_id: int = 0 # 被戳者 QQ 号 - group_id: int = 0 # 群号 (如果是群内戳一戳) -``` - -### 请求事件 - -请求事件用于处理用户的主动请求,如加好友、加群等: - -```python -@dataclass -class FriendRequestEvent(RequestEvent): - """加好友请求事件""" - user_id: int = 0 # 发送请求的 QQ 号 - comment: str = "" # 验证信息 - flag: str = "" # 请求 flag,用于 API 调用 - -@dataclass -class GroupRequestEvent(RequestEvent): - """加群请求/邀请事件""" - sub_type: str = "" # 子类型: add (加群请求), invite (邀请登录号入群) - group_id: int = 0 # 群号 - user_id: int = 0 # 发送请求的 QQ 号 - comment: str = "" # 验证信息 - flag: str = "" # 请求 flag,用于 API 调用 -``` - -### 元事件 - -元事件用于处理框架自身状态变化,如心跳、生命周期等: - -```python -@dataclass -class HeartbeatEvent(MetaEvent): - """心跳事件,用于确认连接状态""" - meta_event_type: str = 'heartbeat' - status: HeartbeatStatus = field(default_factory=HeartbeatStatus) - interval: int = 0 # 心跳间隔时间(ms) - -@dataclass -class LifeCycleEvent(MetaEvent): - """生命周期事件,用于通知框架生命周期变化""" - meta_event_type: str = 'lifecycle' - sub_type: LifeCycleSubType = LifeCycleSubType.ENABLE # 子类型: enable, disable, connect -``` - -### 事件工厂:EventFactory - -事件工厂是框架的核心组件之一,负责将原始 JSON 数据转换为强类型的事件对象: - -```python -class EventFactory: - @staticmethod - def create_event(data: Dict[str, Any]) -> OneBotEvent: - """根据数据创建事件对象""" - post_type = data.get("post_type") - - if post_type == EventType.MESSAGE or post_type == EventType.MESSAGE_SENT: - return EventFactory._create_message_event(data, common_args) - elif post_type == EventType.NOTICE: - return EventFactory._create_notice_event(data, common_args) - elif post_type == EventType.REQUEST: - return EventFactory._create_request_event(data, common_args) - elif post_type == EventType.META: - return EventFactory._create_meta_event(data, common_args) - else: - raise ValueError(f"Unknown event type: {post_type}") -``` - -### 在插件中使用事件 - -插件可以直接使用这些事件类型来处理各种场景: +除了指令,你还可以监听各种事件,例如新成员入群。 ```python from core.command_manager import matcher +from models import GroupIncreaseNoticeEvent from core.bot import Bot -from models import GroupMessageEvent, PrivateMessageEvent -from models.events.notice import GroupIncreaseNoticeEvent -from models.events.request import FriendRequestEvent -# 处理群消息事件 -@matcher.command("hello") -async def handle_hello(bot: Bot, event: GroupMessageEvent, args: list[str]): - await event.reply(f"你好 {event.sender.nickname}!") - -# 处理私聊消息事件 -@matcher.command("help", permission_level=MessageEvent.USER) -async def handle_help(bot: Bot, event: PrivateMessageEvent, args: list[str]): - await event.reply("这里是帮助信息...") - -# 处理群成员增加通知 @matcher.on_notice("group_increase") -async def handle_group_increase(bot: Bot, event: GroupIncreaseNoticeEvent): - await bot.send_group_msg( - event.group_id, - f"欢迎新成员 {event.user_id} 加入!操作者:{event.operator_id}" - ) - -# 处理加好友请求 -@matcher.on_request("friend") -async def handle_friend_request(bot: Bot, event: FriendRequestEvent): - # 自动同意所有好友请求 - await bot.set_friend_add_request(flag=event.flag, approve=True) - await bot.send_private_msg(event.user_id, "已通过您的好友请求!") +async def welcome_new_member(bot: Bot, event: GroupIncreaseNoticeEvent): + """当有新成员加入群聊时触发""" + welcome_message = f"欢迎新成员 @{event.user_id} 加入本群!" + await bot.send_group_msg(event.group_id, welcome_message) ``` -### 事件处理的优势 +--- -1. **类型安全**:所有事件都有明确的类型定义,IDE 可以提供完整的代码提示和补全 -2. **易于测试**:事件对象可以轻松构造,便于编写单元测试 -3. **数据完整**:所有字段都有类型注解,确保数据的一致性和完整性 -4. **性能优化**:使用 `@dataclass(slots=True)` 减少内存占用,提高属性访问速度 -5. **可扩展性**:可以轻松定义自定义事件类型,扩展框架功能 +## 📦 当前功能插件 -### 常用事件属性速查 +| 插件文件 (`plugins/`) | 功能描述 | +|-----------------------|----------| +| `admin.py` | 机器人管理员权限管理 | +| `bili_parser.py` | 自动解析 Bilibili 视频链接分享卡片 | +| `code_py.py` | 执行 Python 代码片段 (高危,仅限管理员) | +| `echo.py` | 提供 `/echo` 复读和 `/赞我` 功能 | +| `forward_test.py` | 演示如何发送合并转发消息 | +| `jrcd.py` | 娱乐功能:今日人品、牛牛词典 | +| `thpic.py` | 发送一张随机的东方 Project 图片 | -| 事件类型 | 关键属性 | 描述 | -|---------|---------|------| -| **MessageEvent** | `message_type`, `user_id`, `message`, `sender` | 所有消息事件的基类 | -| **PrivateMessageEvent** | 继承自 MessageEvent | 私聊消息事件 | -| **GroupMessageEvent** | `group_id`, `anonymous` | 群聊消息事件,包含群号和匿名信息 | -| **GroupIncreaseNoticeEvent** | `group_id`, `user_id`, `operator_id`, `sub_type` | 群成员增加通知 | -| **RecallGroupNoticeEvent** | `group_id`, `user_id`, `operator_id`, `message_id` | 群消息撤回通知 | -| **FriendRequestEvent** | `user_id`, `comment`, `flag` | 加好友请求事件 | -| **GroupRequestEvent** | `group_id`, `user_id`, `sub_type`, `comment`, `flag` | 加群请求/邀请事件 | -| **HeartbeatEvent** | `status`, `interval` | 心跳事件,用于监控连接状态 | +--- -通过这套完整的事件模型,NEO 框架为开发者提供了强大而灵活的事件处理能力,同时保持了代码的类型安全和良好的开发体验。 +## 🗺️ 路线图 (Roadmap) + +- [ ] **Web 仪表盘**: 开发一个简单的 Web 页面,用于查看机器人状态和插件列表。 +- [ ] **权限系统重构**: 引入更精细化的权限节点,允许按插件或指令控制用户权限。 +- [ ] **数据库集成**: 引入 `SQLite` 或其他数据库,用于需要持久化存储数据的功能。 +- [ ] **新插件开发**: + - [ ] 天气查询插件 + - [ ] GIL实现 + - [ ] coming soon... diff --git a/config.toml b/config.toml index 4846aeb..5955e20 100644 --- a/config.toml +++ b/config.toml @@ -12,3 +12,15 @@ host = "114.66.58.203" port = 1931 db = 0 password = "redis_5dxyJG" + +[docker] +base_url = "tcp://dockertest.k2cro4.my:2375" +sandbox_image = "python-sandbox:latest" +timeout = 10 +concurrency_limit = 5 +tls_verify = true +ca_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/ca.crt" +client_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-cert.pem" +client_key_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-key.pem" + + diff --git a/core/command_manager.py b/core/command_manager.py index c51794a..058cc89 100644 --- a/core/command_manager.py +++ b/core/command_manager.py @@ -69,7 +69,7 @@ class CommandManager: def command( self, - name: str, + *names: str, permission: Optional[Any] = None, override_permission_check: bool = False ) -> Callable: @@ -77,7 +77,7 @@ class CommandManager: 装饰器:注册一个消息指令处理器。 """ return self.message_handler.command( - name, + *names, permission=permission, override_permission_check=override_permission_check ) diff --git a/core/config_loader.py b/core/config_loader.py index 776ddcb..b00ede3 100644 --- a/core/config_loader.py +++ b/core/config_loader.py @@ -73,6 +73,15 @@ class Config: """ return self._data.get("redis", {}) + @property + def docker(self) -> dict: + """ + 获取 Docker 配置 + + :return: 配置字典 + """ + return self._data.get("docker", {}) + # 实例化全局配置对象 global_config = Config() diff --git a/core/event_handler.py b/core/event_handler.py index b355718..8c384f9 100644 --- a/core/event_handler.py +++ b/core/event_handler.py @@ -83,7 +83,7 @@ class MessageHandler(BaseHandler): def command( self, - name: str, + *names: str, permission: Optional[Permission] = None, override_permission_check: bool = False ) -> Callable: @@ -93,11 +93,12 @@ class MessageHandler(BaseHandler): def decorator(func: Callable) -> Callable: if not inspect.iscoroutinefunction(func): raise SyncHandlerError(f"命令处理器 {func.__name__} 必须是异步函数 (async def).") - self.commands[name] = { - "func": func, - "permission": permission, - "override_permission_check": override_permission_check, - } + for name in names: + self.commands[name] = { + "func": func, + "permission": permission, + "override_permission_check": override_permission_check, + } return func return decorator @@ -137,7 +138,8 @@ class MessageHandler(BaseHandler): permission_granted = await permission_manager.check_permission(event.user_id, permission) if not permission_granted and not override_check: - await bot.send(event, f"权限不足,需要 {permission.name} 权限") + permission_name = permission.name if isinstance(permission, Permission) else permission + await bot.send(event, f"权限不足,需要 {permission_name} 权限") return await self._run_handler( diff --git a/core/executor.py b/core/executor.py index 6d691bd..73599d9 100644 --- a/core/executor.py +++ b/core/executor.py @@ -1,27 +1,184 @@ -""" -线程池执行器 - -提供一个全局的线程池和异步接口,用于在事件循环中安全地运行同步函数。 -""" +# -*- coding: utf-8 -*- import asyncio -from concurrent.futures import ThreadPoolExecutor -from functools import partial -from typing import Any, Callable +import docker +from docker.tls import TLSConfig +from typing import Dict, Any, Callable -# 创建一个全局的线程池,可以根据需要调整 max_workers -executor = ThreadPoolExecutor(max_workers=10) +from core.logger import logger -async def run_in_thread_pool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: +class CodeExecutor: """ - 在线程池中异步运行同步函数 + 代码执行引擎,负责管理一个异步任务队列和并发的 Docker 容器执行。 + """ + def __init__(self, bot_instance, config: Dict[str, Any]): + """ + 初始化代码执行引擎。 + :param bot_instance: Bot 实例,用于后续的消息回复。 + :param config: 从 config.toml 加载的配置字典。 + """ + self.bot = bot_instance + self.task_queue = asyncio.Queue() + + # 从传入的配置中读取 Docker 相关设置 + docker_config = config.docker + self.docker_base_url = docker_config.get("base_url") + self.sandbox_image = docker_config.get("sandbox_image", "python-sandbox:latest") + self.timeout = docker_config.get("timeout", 10) + concurrency = docker_config.get("concurrency_limit", 5) + + self.concurrency_limit = asyncio.Semaphore(concurrency) + self.docker_client = None - :param func: 要运行的同步函数 - :param args: 函数的位置参数 - :param kwargs: 函数的关键字参数 - :return: 函数的返回值 + logger.info("[CodeExecutor] 初始化 Docker 客户端...") + try: + if self.docker_base_url: + # 如果配置了远程 Docker 地址,则使用 TLS 选项进行连接 + tls_config = None + if docker_config.get("tls_verify", False): + tls_config = TLSConfig( + ca_cert=docker_config.get("ca_cert_path"), + client_cert=(docker_config.get("client_cert_path"), docker_config.get("client_key_path")), + verify=True + ) + self.docker_client = docker.DockerClient(base_url=self.docker_base_url, tls=tls_config) + else: + # 否则,使用默认的本地环境连接 + self.docker_client = docker.from_env() + + # 检查 Docker 服务是否可用 + self.docker_client.ping() + logger.success("[CodeExecutor] Docker 客户端初始化成功,服务连接正常。") + except docker.errors.DockerException as e: + self.docker_client = None + logger.error(f"无法连接到 Docker 服务,请检查 Docker 是否正在运行: {e}") + except Exception as e: + self.docker_client = None + logger.error(f"初始化 Docker 客户端时发生未知错误: {e}") + + async def add_task(self, code: str, callback: Callable[[str], asyncio.Future]): + """ + 将代码执行任务添加到队列中。 + :param code: 待执行的 Python 代码字符串。 + :param callback: 执行完毕后用于回复结果的回调函数。 + """ + task = {"code": code, "callback": callback} + await self.task_queue.put(task) + logger.info(f"[CodeExecutor] 新的代码执行任务已入队 (队列当前长度: {self.task_queue.qsize()})。") + + async def worker(self): + """ + 后台工作者,不断从队列中取出任务并执行。 + """ + if not self.docker_client: + logger.error("[CodeExecutor] Worker 无法启动,因为 Docker 客户端未初始化。") + return + + logger.info("[CodeExecutor] 代码执行 Worker 已启动,等待任务...") + while True: + task = await self.task_queue.get() + + logger.info("[CodeExecutor] 开始处理代码执行任务。") + + async with self.concurrency_limit: + result_message = "" + try: + loop = asyncio.get_running_loop() + + # 使用 asyncio.wait_for 实现超时控制 + result_bytes = await asyncio.wait_for( + loop.run_in_executor( + None, # 使用默认线程池 + self._run_in_container, + task['code'] + ), + timeout=self.timeout + ) + + output = result_bytes.decode('utf-8').strip() + result_message = output if output else "代码执行完毕,无输出。" + logger.success("[CodeExecutor] 任务成功执行。") + + except docker.errors.ImageNotFound: + logger.error(f"[CodeExecutor] 镜像 '{self.sandbox_image}' 不存在!") + result_message = f"执行失败:沙箱基础镜像 '{self.sandbox_image}' 不存在,请联系管理员构建。" + except docker.errors.ContainerError as e: + error_output = e.stderr.decode('utf-8').strip() + result_message = f"代码执行出错:\n{error_output}" + logger.warning(f"[CodeExecutor] 代码执行时发生错误: {error_output}") + except docker.errors.APIError as e: + logger.error(f"[CodeExecutor] Docker API 错误: {e}") + result_message = "执行失败:与 Docker 服务通信时发生错误,请检查服务状态。" + except asyncio.TimeoutError: + result_message = f"执行超时 (超过 {self.timeout} 秒)。" + logger.warning("[CodeExecutor] 任务执行超时。") + except Exception as e: + logger.exception(f"[CodeExecutor] 执行 Docker 任务时发生未知严重错误: {e}") + result_message = "执行引擎发生内部错误,请联系管理员。" + + # 调用回调函数回复结果 + await task['callback'](result_message) + + self.task_queue.task_done() + + def _run_in_container(self, code: str) -> bytes: + """ + 同步函数:在 Docker 容器中运行代码。 + 此函数通过手动管理容器生命周期来提高稳定性。 + """ + container = None + try: + # 1. 创建容器 + container = self.docker_client.containers.create( + image=self.sandbox_image, + command=["python", "-c", code], + mem_limit='128m', + cpu_shares=512, + network_disabled=True, + log_config={'type': 'json-file', 'config': {'max-size': '1m'}}, + ) + # 2. 启动容器 + container.start() + + # 3. 等待容器执行完成 + # 主超时由 asyncio.wait_for 控制,这里的 timeout 是一个额外的保险 + result = container.wait(timeout=self.timeout + 5) + + # 4. 获取日志 + stdout = container.logs(stdout=True, stderr=False) + stderr = container.logs(stdout=False, stderr=True) + + # 5. 检查退出码,如果不为 0,则手动抛出 ContainerError + if result.get('StatusCode', 0) != 0: + raise docker.errors.ContainerError( + container, result['StatusCode'], f"python -c '{code}'", self.sandbox_image, stderr + ) + + return stdout + + finally: + # 6. 确保容器总是被移除 + if container: + try: + container.remove(force=True) + except docker.errors.NotFound: + # 如果容器因为某些原因已经消失,也沒关系 + pass + except Exception as e: + logger.error(f"[CodeExecutor] 强制移除容器 {container.id} 时失败: {e}") + +def initialize_executor(bot_instance, config: Dict[str, Any]): + """ + 初始化并返回一个 CodeExecutor 实例。 + """ + return CodeExecutor(bot_instance, config) + +async def run_in_thread_pool(sync_func, *args, **kwargs): + """ + 在线程池中运行同步阻塞函数,以避免阻塞 asyncio 事件循环。 + :param sync_func: 同步函数 + :param args: 位置参数 + :param kwargs: 关键字参数 + :return: 同步函数的返回值 """ loop = asyncio.get_running_loop() - # 使用 functools.partial 绑定函数和参数,以便传递给 run_in_executor - func_to_run = partial(func, *args, **kwargs) - # loop.run_in_executor 会返回一个 awaitable 对象 - return await loop.run_in_executor(executor, func_to_run) + return await loop.run_in_executor(None, lambda: sync_func(*args, **kwargs)) diff --git a/core/permission_manager.py b/core/permission_manager.py index c79a18d..917e753 100644 --- a/core/permission_manager.py +++ b/core/permission_manager.py @@ -227,6 +227,14 @@ class PermissionManager: Returns: bool: 如果用户权限 >= 所需权限,返回 True,否则返回 False """ + # 如果传入的是字符串,先转换为 Permission 对象 + if isinstance(required_permission, str): + required_permission = _PERMISSIONS.get(required_permission.lower()) + if not required_permission: + # 如果是无效的权限字符串,默认拒绝 + logger.warning(f"检测到无效的权限检查字符串: {required_permission}") + return False + user_permission = await self.get_user_permission(user_id) return user_permission >= required_permission @@ -249,4 +257,21 @@ class PermissionManager: # 全局权限管理器实例 -permission_manager = PermissionManager() \ No newline at end of file +permission_manager = PermissionManager() + +def require_admin(func): + """ + 一个装饰器,用于限制命令只能由管理员执行。 + """ + from functools import wraps + from models.events.message import MessageEvent + + @wraps(func) + async def wrapper(event: MessageEvent, *args, **kwargs): + user_id = event.user_id + if await permission_manager.check_permission(user_id, ADMIN): + return await func(event, *args, **kwargs) + else: + await event.reply("抱歉,您没有权限执行此命令。") + return None + return wrapper diff --git a/main.py b/main.py index 1445f1f..0589512 100644 --- a/main.py +++ b/main.py @@ -108,6 +108,21 @@ async def main(): try: bot = WS() + + # 初始化代码执行器 + from core.config_loader import global_config as config + from core.executor import initialize_executor + code_executor = initialize_executor(bot, config) + bot.bot.code_executor = code_executor # 将执行器实例附加到 bot.bot 对象上 + + # 启动代码执行器的后台 worker + logger.debug("[Main] 检查是否需要启动代码执行 Worker...") + if code_executor and code_executor.docker_client: + logger.info("[Main] Docker 连接成功,正在启动代码执行 Worker...") + asyncio.create_task(code_executor.worker()) + else: + logger.warning("[Main] 未启动代码执行 Worker,因为 Docker 客户端未初始化或连接失败。") + await bot.connect() finally: if observer.is_alive(): diff --git a/plugins/broadcast.py b/plugins/broadcast.py new file mode 100644 index 0000000..37cfd32 --- /dev/null +++ b/plugins/broadcast.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +""" +管理员专用的广播插件 +功能: +- 仅限管理员在私聊中调用。 +- 通过回复一条消息并发送指令,将该消息转发给机器人所在的所有群聊。 +- 此插件不写入 __plugin_meta__,保持隐藏。 +""" +from core.command_manager import matcher +from models import MessageEvent +from core.permission_manager import ADMIN +from core.logger import logger + +@matcher.command("broadcast", "广播", permission=ADMIN) +async def broadcast_message(event: MessageEvent): + """ + 广播指令处理器。 + + :param event: 消息事件对象。 + """ + # 1. 检查是否为私聊消息 + if event.group_id: + # 在群聊中调用时,静默处理,不予响应 + return + + # 2. 检查是否回复了某条消息 + reply = event.reply + if not reply: + await event.reply("请通过“回复”一条您想广播的消息来使用此功能。") + return + + # 3. 获取机器人所在的群聊列表 + bot = event.bot + try: + group_list = await bot.get_group_list() + if not group_list: + await event.reply("机器人目前没有加入任何群聊。") + return + except Exception as e: + logger.error(f"[Broadcast] 获取群聊列表失败: {e}") + await event.reply(f"获取群聊列表时发生错误,无法广播。错误信息: {e}") + return + + # 4. 遍历所有群聊并转发消息 + success_count = 0 + failed_count = 0 + total_groups = len(group_list) + + await event.reply(f"准备向 {total_groups} 个群聊广播消息,请稍候...") + + for group in group_list: + group_id = group.get("group_id") + if not group_id: + continue + + try: + # 直接转发被回复的消息 + await bot.forward_message( + group_id=group_id, + message_id=reply.message_id + ) + success_count += 1 + logger.info(f"[Broadcast] 已成功将消息转发至群聊: {group_id}") + except Exception as e: + failed_count += 1 + logger.error(f"[Broadcast] 转发消息至群聊 {group_id} 失败: {e}") + + # 5. 向管理员报告结果 + report_message = ( + f"广播任务完成。\n" + f"总群聊数: {total_groups}\n" + f"成功: {success_count}\n" + f"失败: {failed_count}" + ) + await event.reply(report_message) diff --git a/plugins/code_py.py b/plugins/code_py.py index 2fa2988..055b034 100644 --- a/plugins/code_py.py +++ b/plugins/code_py.py @@ -1,176 +1,156 @@ -""" -code_py插件 - -输入/code py回车再加上python代码,机器人就会执行代码并返回执行结果。 -""" - +# -*- coding: utf-8 -*- +import html +import textwrap import asyncio -import re -import sys -import tempfile -import os -from typing import Tuple, Set +from typing import Dict -from core.bot import Bot from core.command_manager import matcher -from core.executor import run_in_thread_pool from models import MessageEvent +from core.permission_manager import ADMIN +from core.logger import logger __plugin_meta__ = { - "name": "code_py", - "description": "提供执行python代码的功能", - "usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码", + "name": "Python 代码执行", + "description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和转发回复。", + "usage": "/py <单行代码>\n/code_py <单行代码>\n/py (进入多行输入模式)", } -# --- 安全配置:危险模块和内置函数黑名单 --- -DANGEROUS_MODULES = [ - "os", "sys", "subprocess", "shutil", "socket", "requests", "urllib", - "http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing", - "asyncio", -] -DANGEROUS_BUILTINS = [ - "__import__", "open", "exec", "eval", "compile", "input", "breakpoint" -] +# --- 会话状态管理 --- +# 结构: {(user_id, group_id): asyncio.TimerHandle} +multi_line_sessions: Dict[tuple, asyncio.TimerHandle] = {} -# 编译后的正则表达式,用于分割语句 -STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]') -# 编译后的正则表达式,用于查找危险的内置函数调用 -BUILTIN_CALL_PATTERN = re.compile(r'\b(' + '|'.join(DANGEROUS_BUILTINS) + r')\s*\(') - -def is_code_safe(code: str) -> Tuple[bool, str]: +async def reply_as_forward(event: MessageEvent, input_code: str, output_result: str): """ - 检查代码中是否包含危险的模块导入或内置函数调用。 + 将输入和输出打包成转发消息进行回复。 + 参考 forward_test.py 的实现,兼容私聊和群聊。 """ - # 1. 检查危险的内置函数 - found_builtins = BUILTIN_CALL_PATTERN.search(code) - if found_builtins: - return False, f"检测到不允许的内置函数调用:'{found_builtins.group(1)}'" - - # 2. 检查危险的模块导入 - statements = STATEMENT_SPLIT_PATTERN.split(code) - for statement in statements: - statement = statement.strip() - if not statement: - continue - parts = statement.split() - if not parts: - continue - if parts[0] == 'from' and len(parts) > 1: - module_name = parts[1].strip() - if module_name in DANGEROUS_MODULES: - return False, f"检测到不允许的模块导入:'{module_name}'" - elif parts[0] == 'import' and len(parts) > 1: - modules_str = ' '.join(parts[1:]) - imported_modules = [m.strip() for m in modules_str.split(',')] - for module_name in imported_modules: - actual_module_name = module_name.split()[0] - if actual_module_name in DANGEROUS_MODULES: - return False, f"检测到不允许的模块导入:'{actual_module_name}'" - return True, "" - -async def run_code_in_subprocess(code_str: str, timeout: float = 10.0) -> Tuple[str, str]: - """ - 在子进程中安全地执行Python代码。 - """ - with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False, encoding="utf-8") as tf: - tf.write(code_str) - tf_path = tf.name - try: - proc = await asyncio.create_subprocess_exec( - sys.executable, tf_path, - stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - try: - out_bytes, err_bytes = await asyncio.wait_for(proc.communicate(), timeout=timeout) - except asyncio.TimeoutError: - proc.kill() - await proc.communicate() - return "", f"执行超时(>{timeout}s)" - return out_bytes.decode(errors="ignore"), err_bytes.decode(errors="ignore") - finally: - try: - os.remove(tf_path) - except Exception: - pass - -async def process_and_reply(bot: Bot, event: MessageEvent, code: str): - """ - 核心处理逻辑:安全检查、执行代码并回复结果。 - """ - safe, message = await run_in_thread_pool(is_code_safe, code) - if not safe: - await event.reply(f"代码安全检查未通过:\n{message}") - return - - try: - stdout, stderr = await run_code_in_subprocess(code, timeout=10.0) - except Exception as e: - await event.reply(f"执行失败:{e}") - return - - resp = stderr.strip() or stdout.strip() or "(无输出)" - MAX = 1500 - if len(resp) > MAX: - resp = resp[:MAX] + "\n...输出被截断..." - + bot = event.bot + + # 1. 构建消息节点列表 nodes = [ - bot.build_forward_node(user_id=event.self_id, nickname="输入代码", message=code), - bot.build_forward_node(user_id=event.self_id, nickname="执行结果", message=resp), + bot.build_forward_node( + user_id=event.user_id, + nickname=event.sender.nickname or str(event.user_id), + message=f"--- Your Code ---\n{input_code}" + ), + bot.build_forward_node( + user_id=event.self_id, + nickname="Code Executor", + message=f"--- Execution Result ---\n{output_result}" + ) ] + try: + # 2. 发送合并转发消息 await bot.send_forwarded_messages(event, nodes) except Exception as e: - await event.reply(f"结果发送失败: {e}\n\n{resp}") + logger.error(f"[code_py] 发送转发消息失败: {e}") + # 降级为普通消息回复 + await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}") -# --- 交互式会话状态 --- -# 使用集合存储正在等待代码输入的用户标识 -waiting_users: Set[str] = set() - -def get_session_id(event: MessageEvent) -> str: - """根据事件类型生成唯一的会话ID""" - if hasattr(event, 'group_id'): - # 群聊会话ID - return f"group_{event.group_id}-{event.user_id}" - else: - # 私聊会话ID - return f"private_{event.user_id}" - -@matcher.command("code_py") -async def handle_code_command(bot: Bot, event: MessageEvent, args: list[str]): - # 模式一:快速执行单行代码 - if args: - code = " ".join(args) - await process_and_reply(bot, event, code) +async def execute_code(event: MessageEvent, code: str): + """ + 核心代码执行逻辑。 + """ + code_executor = getattr(event.bot, 'code_executor', None) + if not code_executor or not code_executor.docker_client: + await event.reply("代码执行服务当前不可用,请检查 Docker 连接配置。") return - # 模式二:进入交互模式 - session_id = get_session_id(event) - if session_id in waiting_users: - await event.reply("您已经有一个正在等待输入的code会话了,请直接发送代码。") - return + # 修改 add_task,让它能直接接收回复函数 + await code_executor.add_task( + code, + lambda result: reply_as_forward(event, code, result) + ) + await event.reply("代码已提交至沙箱执行队列,请稍候...") + +def cleanup_session(session_key: tuple): + """ + 清理超时的会话。 + """ + if session_key in multi_line_sessions: + del multi_line_sessions[session_key] + logger.info(f"[code_py] 会话 {session_key} 已超时,自动取消。") + +def normalize_code(code: str) -> str: + """ + 规范化用户输入的 Python 代码字符串。 + + 主要处理两个问题: + 1. 对消息中可能存在的 HTML 实体进行解码 (e.g., [ -> [)。 + 2. 移除整个代码块的公共前导缩进,以修复因复制粘贴导致的多余缩进。 + + :param code: 原始代码字符串。 + :return: 规范化后的代码字符串。 + """ + # 1. 解码 HTML 实体 + code = html.unescape(code) + + # 2. 移除公共前导缩进 + try: + code = textwrap.dedent(code) + except Exception: + # 在某些情况下(例如,不一致的缩进),dedent 可能会失败, + # 但我们不希望因此中断流程,所以捕获异常并继续。 + pass - waiting_users.add(session_id) - await event.reply("请在下一条消息中发送要执行的Python代码块。(发送“取消”可退出)") + return code.strip() + + +@matcher.command("py", "python", "code_py", permission=ADMIN) +async def code_py_main(event: MessageEvent, args: list[str]): + """ + /py 命令的主入口。 + - 如果有参数,直接执行。 + - 如果没有参数,开启多行输入模式。 + """ + code_to_run = " ".join(args) + + if code_to_run: + # 单行模式,对代码进行规范化处理 + normalized_code = normalize_code(code_to_run) + if not normalized_code: + await event.reply("代码为空或格式错误,请输入有效的代码。") + return + await execute_code(event, normalized_code) + else: + # 多行模式 + # 使用 getattr 兼容私聊和群聊 + session_key = (event.user_id, getattr(event, 'group_id', 'private')) + + # 如果上一个会话的超时任务还在,先取消它 + if session_key in multi_line_sessions: + multi_line_sessions[session_key].cancel() + + await event.reply("已进入多行代码输入模式,请直接发送你的代码。\n(60秒内无操作将自动取消)") + + # 设置 60 秒超时 + loop = asyncio.get_running_loop() + timeout_handler = loop.call_later( + 60, + cleanup_session, + session_key + ) + multi_line_sessions[session_key] = timeout_handler @matcher.on_message() -async def handle_code_input(bot: Bot, event: MessageEvent): - session_id = get_session_id(event) - - # 检查用户是否处于等待状态 - if session_id in waiting_users: - # 从等待集合中移除,无论输入是什么 - waiting_users.remove(session_id) +async def handle_multi_line_code(event: MessageEvent): + """ + 通用消息处理器,用于捕获多行模式下的代码输入。 + """ + # 使用 getattr 兼容私聊和群聊 + session_key = (event.user_id, getattr(event, 'group_id', 'private')) + if session_key in multi_line_sessions: + # 取消超时任务 + multi_line_sessions[session_key].cancel() + del multi_line_sessions[session_key] + + # 对多行代码进行规范化处理 + normalized_code = normalize_code(event.raw_message) - # 处理取消操作 - if event.raw_message.strip() == "取消": - await event.reply("已取消输入。") - return True # 消费事件 - - # 执行代码 - await process_and_reply(bot, event, event.raw_message) - return True # 消费事件,防止被其他指令匹配 - - # 如果用户不在等待状态,则不处理 - return False - + if not normalized_code: + await event.reply("捕获到的代码为空或格式错误,已取消输入。") + return + await execute_code(event, normalized_code) + return True # 消费事件,防止其他处理器响应 diff --git a/requirements.txt b/requirements.txt index 17a9c33e3864882cb7217cb519675a201e0d8f3d..81bbec89684e9c693f0a5a1a37e498102247c737 100644 GIT binary patch literal 360 zcmXX?Npiy=5WMr3Py)2%!dqNOnpl((U=}OR>(f)ojh>!fn3Y^_{;P+YdLFGEr5dFX zYsGtzgVbW9f(37_9`q!Yk_xlKl}ha+rgFOAf2de%uoPO+cPoQ!INzrR}=ZhbWh@**~ZTFN%OI@P!P6*6{xnM>9u>u+y%ynOE7ZK zg3Jpxr+n#cjA%6W$&rij7fz&@=x`!q^3U8OdQWyEVN3VD<>RQYu*3= diff --git a/sandbox.Dockerfile b/sandbox.Dockerfile new file mode 100644 index 0000000..41f2bc3 --- /dev/null +++ b/sandbox.Dockerfile @@ -0,0 +1,19 @@ +# 使用一个轻量级的 Python 官方镜像作为基础 +FROM python:3.11-slim + +# 创建一个低权限的用户来运行代码,增加安全性 +# -S: 创建一个系统用户 (没有 home 目录) +# -u: 指定用户ID +# -g: 指定组ID +RUN groupadd -g 1001 sandbox && useradd -u 1001 -g sandbox -s /bin/sh -r sandbox + +# 创建一个工作目录,用于存放和执行用户的代码 +WORKDIR /sandbox +# 将目录所有权交给沙箱用户 +RUN chown sandbox:sandbox /sandbox + +# 切换到沙箱用户 +USER sandbox + +# 默认的启动命令是 python,这样容器启动时可以直接执行 .py 文件 +CMD ["python"]