From a733d3dc4ba3760bbb4f40957830c786a4ba4876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=95=80=E9=93=AC=E9=85=B8=E9=92=BE?= <148796996+K2cr2O1@users.noreply.github.com> Date: Sun, 4 Jan 2026 22:21:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=95=B4=E5=90=88=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E5=8E=86=E5=8F=B2=20(#20)=EF=BC=8C=E5=A4=A7=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E3=80=82=E3=80=82=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- README.md | 680 ++++++++++++++++++++++++------ core/admin_manager.py | 166 ++++++++ core/api/account.py | 2 +- core/api/friend.py | 10 +- core/api/group.py | 10 +- core/command_manager.py | 293 ++++--------- core/event_handler.py | 197 +++++++++ core/exceptions.py | 9 + core/executor.py | 27 ++ core/permission_manager.py | 252 +++++++++++ core/plugin_manager.py | 32 +- core/redis_manager.py | 51 ++- data/admin.json | 3 + data/permissions.json | 3 + html/404.html | 288 +++++++++++++ html/index.html | 387 +++++++++++++++++ main.py | 13 +- models/events/message.py | 6 + plugins/admin.py | 147 +++---- plugins/code_py.py | 13 +- plugins/data/admin.json | 1 - plugins/echo.py | 18 +- plugins/jrcd.py | 7 +- plugins/sync_async_test_plugin.py | 88 ++++ 25 files changed, 2199 insertions(+), 506 deletions(-) create mode 100644 core/admin_manager.py create mode 100644 core/event_handler.py create mode 100644 core/exceptions.py create mode 100644 core/executor.py create mode 100644 core/permission_manager.py create mode 100644 data/admin.json create mode 100644 data/permissions.json create mode 100644 html/404.html create mode 100644 html/index.html delete mode 100644 plugins/data/admin.json create mode 100644 plugins/sync_async_test_plugin.py diff --git a/.gitignore b/.gitignore index 093255f..2729cb2 100644 --- a/.gitignore +++ b/.gitignore @@ -138,4 +138,4 @@ dmypy.json # pytype static type analyzer .pytype/ -# End of https://www.toptal.com/developers/gitignore/api/python \ No newline at end of file +# End of https://www.toptal.com/developers/gitignore/api/python diff --git a/README.md b/README.md index f8ee6ff..99b7656 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ NEO 框架的设计遵循以下核心理念: * **类型安全**:基于 `dataclasses` 的强类型事件模型,开发体验更佳。 * **插件系统**:轻量级的装饰器风格插件系统,支持指令 (`@matcher.command`) 和事件监听 (`@matcher.on_notice`, `@matcher.on_request`)。 * **插件元数据与内置帮助**:插件可通过 `__plugin_meta__` 变量进行自我描述。框架核心内置了 `/help` 指令,可自动收集并展示所有插件的帮助信息,无需手动维护。 -* **🔥 热重载支持**:内置文件监控,修改 `base_plugins` 下的代码自动重载,无需重启,极大提升调试效率。 +* **🔥 热重载支持**:内置文件监控,修改 `plugins` 下的代码自动重载,无需重启,极大提升调试效率。 * **异步核心**:基于 `asyncio` 和 `websockets` 的高性能异步核心。 * **自动重连**:内置 WebSocket 断线重连机制。 @@ -131,43 +131,112 @@ latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True) ### 其他改进 - [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。 - [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。 -- [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制。 -- [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。 -- [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。 -- [ ] **权限系统**: 实现基础的权限管理(如超级管理员、群管理员等)。 +- [x] **权限系统**: 实现基础的权限管理(超级管理员、群管理员等)。 +- [x] **日志系统优化**: 引入 `loguru` 进行日志记录,支持文件输出和日志级别控制。 +- [x] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。 +- [x] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。 ## 📂 项目结构 ``` -NEO/ -├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载) -│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令 -│ ├── forward_test.py # 示例插件:演示合并转发消息的构建和发送 -│ ├── jrcd.py # 娱乐插件:提供 /jrcd 和 /bbcd 指令 -│ └── thpic.py # 图片插件:提供 /thpic 指令,发送随机东方图片 -├── core/ # 核心框架代码 -│ ├── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI) -│ │ ├── __init__.py -│ │ ├── account.py -│ │ ├── base.py -│ │ ├── friend.py -│ │ ├── group.py -│ │ └── message.py -│ ├── bot.py # Bot API 封装,提供 send_group_msg 等方法 -│ ├── command_manager.py # 命令与事件分发器 -│ ├── config_loader.py # 配置加载器 -│ ├── plugin_manager.py # 插件加载与管理 -│ ├── redis_manager.py # Redis 连接管理器 -│ └── ws.py # WebSocket 客户端核心 -├── models/ # 数据模型 -│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta) -│ ├── message.py # 消息段定义 (MessageSegment) -│ └── sender.py # 发送者定义 (Sender) -├── config.toml # 配置文件 -├── main.py # 启动入口(包含热重载监控) -└── requirements.txt # 项目依赖 +. +├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载) +│ ├── admin.py # 管理员插件 +│ ├── code_py.py # Python 代码执行插件 +│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令 +│ ├── forward_test.py # 示例插件:演示合并转发消息 +│ ├── jrcd.py # 娱乐插件:/jrcd 和 /bbcd +│ └── thpic.py # 图片插件:/thpic +├── core/ # 核心框架代码 +│ ├── api/ # API 模块抽象层 +│ ├── bot.py # Bot 实例与 API 封装 +│ ├── admin_manager.py # 管理员管理模块 +│ ├── command_manager.py # 命令与事件分发器 +│ ├── config_loader.py # 配置加载器 +│ ├── event_handler.py # 事件处理器 +│ ├── executor.py # 插件执行器 +│ ├── logger.py # 日志系统 +│ ├── permission_manager.py # 权限管理器 +│ ├── plugin_manager.py # 插件加载与管理 +│ ├── redis_manager.py # Redis 连接管理器 +│ └── ws.py # WebSocket 客户端核心 +├── data/ # 数据存储目录 +│ ├── admin.json # 管理员配置文件 +│ └── permissions.json # 权限数据 +├── html/ # HTML 静态文件 +│ ├── 404.html +│ └── index.html +├── models/ # 数据模型 +│ ├── events/ # OneBot 事件定义 +│ ├── message.py # 消息段定义 +│ ├── objects.py # API 返回对象定义 +│ └── sender.py # 发送者定义 +├── .gitignore +├── config.toml # 配置文件 +├── main.py # 启动入口(包含热重载监控) +└── requirements.txt # 项目依赖 ``` +### 目录结构详细说明 + +#### `plugins/` - 插件目录 +- **功能**存放:所有机器人插件,支持热重载机制 +- **加载机制**:框架会自动扫描此目录下的所有 `.py` 文件,并作为插件加载 +- **插件约定**:每个插件文件应包含 `__plugin_meta__` 字典用于插件元数据定义 +- **热重载**:开发过程中修改插件文件会自动触发重载,无需重启机器人 +- **内置插件**: + - `admin.py` - 管理员管理插件,支持动态添加/移除管理员 + - `code_py.py` - Python 代码执行插件,支持安全的代码执行环境 + - `echo.py` - 示例插件,演示基本指令处理 + - `forward_test.py` - 合并转发消息演示插件 + - `jrcd.py` - 娱乐插件,提供 `/jrcd` 和 `/bbcd` 指令 + - `thpic.py` - 图片插件,提供 `/thpic` 指令返回东方Project图片 + +#### `core/` - 核心框架代码 +- `api/` - API 模块抽象层 + - `base.py` - API 基类定义 + - `message.py` - 消息相关 API 封装 + - `group.py` - 群组管理 API 封装 + - `friend.py` - 好友相关 API 封装 + - `account.py` - 账号相关 API 封装 +- `bot.py` - Bot 核心类,通过 Mixin 模式继承所有 API 功能,提供统一的调用接口 + - `admin_manager.py` - 管理员管理模块,负责管理员的添加、移除和权限验证 + - `command_manager.py` - 命令与事件分发器,负责注册和处理所有指令和事件 +- `config_loader.py` - 配置加载器,读取和解析 `config.toml` 配置文件 +- `event_handler.py` - 事件处理器,负责将原始事件转换为类型化事件对象 +- `executor.py` - 插件执行器,提供线程池执行环境用于执行同步任务 +- `logger.py` - 日志系统,基于 `loguru` 提供高性能日志记录 +- `permission_manager.py` - 权限管理器,管理用户权限级别(admin、op、user) +- `plugin_manager.py` - 插件加载与管理,负责插件的扫描、加载和热重载 +- `redis_manager.py` - Redis 连接管理器,提供异步 Redis 客户端连接池 +- `ws.py` - WebSocket 客户端核心,负责与 OneBot 实现端建立和管理连接 + +#### `data/` - 数据存储目录 +- `admin.json` - 管理员配置文件,存储全局管理员列表 +- `permissions.json` - 权限数据文件,存储用户权限映射关系 + +#### `html/` - HTML 静态文件 +- `404.html` - 404 错误页面 +- `index.html` - 项目主页 HTML 文件,展示项目信息和特性 + +#### `models/` - 数据模型定义 +- `events/` - OneBot 事件定义 + - `base.py` - 事件基类定义 + - `message.py` - 消息事件定义 + - `notice.py` - 通知事件定义 + - `request.py` - 请求事件定义 + - `meta.py` - 元事件定义 + - `factory.py` - 事件工厂类,用于根据 JSON 数据创建对应事件对象 +- `message.py` - 消息段定义,支持文本、图片、表情等多种消息类型 +- `objects.py` - API 返回对象定义,提供强类型化的 API 响应数据模型 +- `sender.py` - 发送者定义,包含用户、群成员等信息 + +#### 根目录文件 +- `.gitignore` - Git 忽略文件配置 +- `config.toml` - 主配置文件,包含 WebSocket 连接、机器人指令前缀、Redis 连接等配置 +- `main.py` - 程序入口文件,负责初始化插件、启动热重载监控和建立 WebSocket 连接 +- `requirements.txt` - Python 依赖包列表 + ## 🚀 快速开始 ### 1. 环境准备 @@ -207,13 +276,13 @@ python main.py 项目集成了 `watchdog` 文件监控。在开发过程中,你只需要: 1. 保持 `main.py` 运行。 -2. 修改或新建 `base_plugins` 目录下的 `.py` 插件文件。 +2. 修改或新建 `plugins` 目录下的 `.py` 插件文件。 3. 保存文件。 4. 控制台会自动提示 `[HotReload] 插件重载完成`,新的逻辑立即生效。 ### 创建新插件 -在 `base_plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。 +在 `plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。 ### 示例代码 @@ -333,7 +402,7 @@ async def get_group_info_legacy(bot: Bot, event: MessageEvent, args: list[str]): **示例:** ```python -# base_plugins/echo.py +# plugins/echo.py __plugin_meta__ = { "name": "回声与交互", @@ -407,10 +476,184 @@ async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]): except Exception as e: await event.reply(f"执行失败:{str(e)}") # 记录日志 - import logging - logging.error(f"插件执行错误:{e}", exc_info=True) + from core.logger import logger + logger.error(f"插件执行错误:{e}", exc_info=True) ``` +### 处理同步阻塞操作 +为了保持机器人的响应性,所有可能导致长时间阻塞的同步操作都应该在单独的线程池中执行。框架提供了 `run_in_thread_pool` 函数来简化这一过程。 + +**示例:执行同步阻塞任务** +```python +from core.command_manager import matcher +from core.bot import Bot +from models import MessageEvent +from core.executor import run_in_thread_pool +import time + +# 模拟一个耗时的同步操作 +def blocking_task(duration: int): + time.sleep(duration) + return f"阻塞任务完成,耗时 {duration} 秒" + +@matcher.command("block_test") +async def handle_blocking_test(bot: Bot, event: MessageEvent, args: list[str]): + if not args or not args[0].isdigit(): + await event.reply("请提供一个数字作为阻塞时间(秒)。例如:/block_test 5") + return + + duration = int(args[0]) + await event.reply(f"开始执行阻塞任务,耗时 {duration} 秒...") + + # 将同步阻塞任务放入线程池执行 + result = await run_in_thread_pool(blocking_task, duration) + await event.reply(result) +``` + +### 权限管理 +框架内置了基于用户角色的权限管理系统,支持 `admin`(超级管理员)、`op`(操作员)、`user`(普通用户)三个权限级别。权限数据存储在 `data/permissions.json` 文件中。 + +#### 权限级别说明 +- **admin**:最高权限,可以执行所有管理命令,包括添加/移除其他管理员 +- **op**:操作员权限,可以执行大部分管理命令,但不能修改管理员列表 +- **user**:普通用户权限,只能使用基础功能 + +#### 在插件中使用权限控制 +注册命令时可以通过 `permission` 参数指定所需权限级别: + +```python +from models import MessageEvent + +# 只有管理员可以执行此命令 +@matcher.command("admin_only", permission=MessageEvent.ADMIN) +async def admin_command(bot: Bot, event: MessageEvent, args: list[str]): + await event.reply("此命令仅限管理员使用") + +# 操作员及以上权限可以执行 +@matcher.command("op_only", permission=MessageEvent.OP) +async def op_command(bot: Bot, event: MessageEvent, args: list[str]): + await event.reply("此命令需要操作员权限") + +# 所有用户都可以执行(默认) +@matcher.command("public") +async def public_command(bot: Bot, event: MessageEvent, args: list[str]): + await event.reply("所有用户都可以使用此命令") +``` + +#### 动态权限检查 +如果需要更复杂的权限逻辑,可以使用 `override_permission_check=True` 参数,然后在函数中手动检查权限: + +```python +@matcher.command( + "special", + permission=MessageEvent.OP, + override_permission_check=True +) +async def special_command(bot: Bot, event: MessageEvent, permission_granted: bool): + if not permission_granted: + await event.reply("权限不足!") + return + + # 额外的权限逻辑 + if event.user_id == 123456: + await event.reply("特殊用户,允许执行") + else: + await event.reply("普通用户,拒绝执行") +``` + +### 使用 Redis 进行数据缓存 +框架集成了 Redis 客户端,提供了便捷的异步接口用于数据缓存和持久化。Redis 连接管理器会自动管理连接池,你可以在插件中直接使用。 + +#### 基本用法 +```python +from core.redis_manager import redis_manager + +@matcher.command("cache") +async def cache_example(bot: Bot, event: MessageEvent, args: list[str]): + # 设置缓存 + await redis_manager.set("user:123:name", "张三") + + # 获取缓存 + name = await redis_manager.get("user:123:name") + + # 设置带过期时间的缓存(单位:秒) + await redis_manager.setex("temp:data", 3600, "临时数据") + + # 删除缓存 + await redis_manager.delete("user:123:name") + + await event.reply(f"用户名:{name}") +``` + +#### 使用哈希表(Hash) +```python +# 设置哈希字段 +await redis_manager.hset("user:123", "age", 20) +await redis_manager.hset("user:123", "city", "北京") + +# 获取哈希字段 +age = await redis_manager.hget("user:123", "age") +user_data = await redis_manager.hgetall("user:123") + +# 删除哈希字段 +await redis_manager.hdel("user:123", "city") +``` + +#### 使用列表(List) +```python +# 向列表添加元素 +await redis_manager.lpush("recent:actions", "login") +await redis_manager.rpush("recent:actions", "logout") + +# 获取列表范围 +actions = await redis_manager.lrange("recent:actions", 0, 9) + +# 获取列表长度 +length = await redis_manager.llen("recent:actions") +``` + +### 插件数据管理 +对于需要持久化存储配置或数据的插件,框架提供了 `PluginDataManager` 类,可以方便地管理 JSON 格式的数据文件。 + +#### 基本用法 +```python +from core.plugin_manager import PluginDataManager + +# 初始化数据管理器 +data_manager = PluginDataManager("weather_plugin") + +@matcher.command("weather_set") +async def set_weather_config(bot: Bot, event: MessageEvent, args: list[str]): + if len(args) < 2: + await event.reply("用法:/weather_set <城市> <温度>") + return + + city = args[0] + temperature = args[1] + + # 保存配置 + await data_manager.set(city, temperature) + await event.reply(f"已设置 {city} 的温度为 {temperature}℃") + +@matcher.command("weather_get") +async def get_weather_config(bot: Bot, event: MessageEvent, args: list[str]): + if not args: + await event.reply("用法:/weather_get <城市>") + return + + city = args[0] + + # 读取配置 + temperature = data_manager.get(city) + if temperature: + await event.reply(f"{city} 的温度是 {temperature}℃") + else: + await event.reply(f"未找到 {city} 的温度配置") +``` + +#### 数据文件位置 +插件数据文件保存在 `plugins/data/` 目录下,每个插件对应一个独立的 JSON 文件。例如 `weather_plugin` 插件的数据文件为 `plugins/data/weather_plugin.json`。 + ### 插件开发最佳实践 1. **单一职责**:每个插件专注于一个功能领域 2. **错误处理**:妥善处理可能发生的异常 @@ -530,97 +773,290 @@ async def welcome_new_member(bot: Bot, event): ## 📚 事件模型说明 -项目采用了基于工厂模式的事件处理系统,所有事件定义在 `models/events/` 下: +NEO 框架的事件模型是基于 OneBot v11 协议的强类型数据模型,采用 `dataclasses` 和类型注解构建。所有事件都继承自 `OneBotEvent` 基类,并通过事件工厂自动从 JSON 数据创建对应的事件对象。 -* **MessageEvent**: 消息事件,包含 `PrivateMessageEvent` 和 `GroupMessageEvent`。支持 `await event.reply()` 快速回复。 -* **NoticeEvent**: 通知事件,如 `FriendAddNoticeEvent`, `GroupRecallNoticeEvent` 等。 -* **RequestEvent**: 请求事件,如 `FriendRequestEvent`, `GroupRequestEvent`。 -* **MetaEvent**: 元事件,如心跳 `HeartbeatEvent`。 - -所有事件均继承自 `OneBotEvent`,并包含 `bot` 属性用于调用 API。 - -## 🏗️ 技术架构 - -NEO 框架采用分层架构设计,各层职责明确,便于维护和扩展: - -### 架构层次 - -1. **通信层 (WebSocket Client)** - - 负责与 OneBot 实现端的 Web Socket连接 - - 实现断线自动重连机制 - - 处理原始消息的收发和协议解析 - -2. **API 抽象层 (API Mixins)** - - 提供类型安全的 API 封装 - - 按功能领域划分:消息、群组、好友、账号 - - 所有 API 返回强类型数据模型对象 - -3. **业务逻辑层 (Bot & Command Manager)** - - Bot 类组合所有 API 功能 - - 指令和事件分发器 - - 插件加载和管理 - -4. **插件层 (Plugins)** - - 支持热重载的插件系统 - - 装饰器风格的注册方式 - - 独立的业务逻辑模块 - -5. **数据模型层 (Models)** - - 基于 dataclasses 的事件模型 - - API 响应数据模型 - - 类型安全的序列化/反序列化 - -### 核心组件交互 +### 事件层次结构 ``` -┌─────────────────────────────────────┐ -│ 插件层 (Plugins) │ -│ @matcher.command() │ -│ @matcher.on_notice() │ -└──────────────┬──────────────────────┘ - │ -┌──────────────▼──────────────────────┐ -│ 业务逻辑层 (Command Manager) │ -│ • 事件分发与路由 │ -│ • 指令参数解析 │ -└──────────────┬──────────────────────┘ - │ -┌──────────────▼──────────────────────┐ -│ Bot 组合类 │ -│ • 继承所有 API Mixin │ -│ • 提供统一接口 │ -└──────────────┬──────────────────────┘ - │ -┌──────────────▼──────────────────────┐ -│ API 抽象层 (Mixin) │ -│ • MessageAPI │ -│ • GroupAPI │ -│ • FriendAPI │ -│ • AccountAPI │ -└──────────────┬──────────────────────┘ - │ -┌──────────────▼──────────────────────┐ -│ 通信层 (WebSocket) │ -│ • 连接管理 │ -│ • 消息编解码 │ -│ • 断线重连 │ -└─────────────────────────────────────┘ +OneBotEvent (抽象基类) +├── MetaEvent (元事件) +│ ├── HeartbeatEvent (心跳事件) +│ └── LifeCycleEvent (生命周期事件) +├── MessageEvent (消息事件) +│ ├── PrivateMessageEvent (私聊消息事件) +│ └── GroupMessageEvent (群聊消息事件) +├── NoticeEvent (通知事件) +│ ├── FriendAddNoticeEvent (好友添加通知) +│ ├── FriendRecallNoticeEvent (好友消息撤回通知) +│ ├── GroupRecallNoticeEvent (群消息撤回通知) +│ ├── GroupIncreaseNoticeEvent (群成员增加通知) +│ ├── GroupDecreaseNoticeEvent (群成员减少通知) +│ ├── GroupAdminNoticeEvent (群管理员变动通知) +│ ├── GroupBanNoticeEvent (群禁言通知) +│ ├── GroupUploadNoticeEvent (群文件上传通知) +│ ├── PokeNotifyEvent (戳一戳通知) +│ ├── LuckyKingNotifyEvent (运气王通知) +│ ├── HonorNotifyEvent (群荣誉变更通知) +│ ├── GroupCardNoticeEvent (群成员名片更新通知) +│ ├── OfflineFileNoticeEvent (离线文件通知) +│ ├── ClientStatusNoticeEvent (客户端状态变更通知) +│ └── EssenceNoticeEvent (精华消息变动通知) +└── RequestEvent (请求事件) + ├── FriendRequestEvent (加好友请求) + └── GroupRequestEvent (加群请求/邀请) ``` -### 设计模式应用 +### 事件基类:OneBotEvent -- **工厂模式**:事件对象的创建和管理 -- **装饰器模式**:插件和指令的注册 -- **组合模式**:Bot 类通过继承组合 API 功能 -- **观察者模式**:事件监听和处理 -- **策略模式**:不同的消息处理策略 +所有事件的基类,定义了事件的通用属性和方法: -### 性能特点 +```python +@dataclass(slots=True) +class OneBotEvent(ABC): + """ + OneBot v11 事件的抽象基类。 + + Attributes: + time (int): 事件发生的时间戳 (秒) + self_id (int): 收到事件的机器人 QQ 号 + _bot (Optional[Bot]): 内部持有的 Bot 实例引用 + """ + time: int + self_id: int + _bot: Optional["Bot"] = field(default=None, init=False) + + @property + @abstractmethod + def post_type(self) -> str: + """事件的上报类型,子类必须重写此属性""" + pass + + @property + def bot(self) -> "Bot": + """获取与此事件关联的 Bot 实例""" + if self._bot is None: + raise ValueError("Bot instance not set for this event") + return self._bot + + @bot.setter + def bot(self, value: "Bot"): + """为事件对象设置关联的 Bot 实例""" + self._bot = value +``` -- **异步非阻塞**:全面基于 asyncio,支持高并发 -- **内存高效**:事件和模型对象使用 dataclasses,内存占用小 -- **快速响应**:插件热重载和事件分发机制确保快速响应 -- **可扩展性**:模块化设计便于功能扩展和定制 +### 事件类型常量 ---- -*Internal Use Only - DOGSOHA ond baby2016 by Fairy-Oracle-Sanctuary* +框架定义了完整的事件类型常量,用于标识不同种类的事件: + +```python +class EventType: + META = 'meta_event' # 元事件:心跳、生命周期等 + REQUEST = 'request ' # 请求事件:加好友请求、加群请求等 + NOTICE = 'notice' # 通知事件:群成员增加、文件上传等 + MESSAGE = 'message' # 消息事件:私聊消息、群消息等 + MESSAGE_SENT = 'message_sent' # 消息发送事件:机器人自己发送消息的上报 +``` + +### 消息事件 + +消息事件是机器人最常处理的事件类型,框架提供了完整的消息段支持和便捷的回复方法: + +#### MessageEvent (消息事件基类) + +```python +@dataclass +class MessageEvent(OneBotEvent): + message_type: str # 消息类型: private (私聊), group (群聊) + sub_type: str # 消息子类型 + message_id: int # 消息 ID + user_id: int # 发送者 QQ 号 + message: List[MessageSegment] # 消息内容列表 + raw_message: str # 原始消息内容 + font: int # 字体 + sender: Optional[Sender] # 发送者信息 + + @property + def post_type(self) -> str: + return EventType.MESSAGE + + async def reply(self, message: str, auto_escape: bool = False): + """回复消息(抽象方法,由子类实现)""" + raise NotImplementedError +``` + +#### PrivateMessageEvent (私聊消息事件) + +```python +@dataclass +class PrivateMessageEvent(MessageEvent): + async def reply(self, message: str, auto_escape: bool = False): + """回复私聊消息""" + await self.bot.send_private_msg( + user_id=self.user_id, message=message, auto_escape=auto_escape + ) +``` + +#### GroupMessageEvent (群聊消息事件) + +```python +@dataclass +class GroupMessageEvent(MessageEvent): + group_id: int = 0 # 群号 + anonymous: Optional[Anonymous] = None # 匿名信息 + + async def reply(self, message: str, auto_escape: bool = False): + """回复群聊消息""" + await self.bot.send_group_msg( + group_id=self.group_id, message=message, auto_escape=auto_escape + ) +``` + +### 通知事件 + +通知事件用于处理各种系统通知,如群成员变动、文件上传等: + +#### 常用通知事件示例 + +```python +@dataclass +class GroupIncreaseNoticeEvent(GroupNoticeEvent): + """群成员增加通知""" + operator_id: int = 0 # 操作者 QQ 号 + sub_type: str = "" # 子类型: approve (管理员同意入群), invite (管理员邀请入群) + +@dataclass +class GroupRecallNoticeEvent(GroupNoticeEvent): + """群消息撤回通知""" + operator_id: int = 0 # 操作者 QQ 号 + message_id: int = 0 # 被撤回的消息 ID + +@dataclass +class PokeNotifyEvent(NotifyNoticeEvent): + """戳一戳通知""" + target_id: int = 0 # 被戳者 QQ 号 + group_id: int = 0 # 群号 (如果是群内戳一戳) +``` + +### 请求事件 + +请求事件用于处理用户的主动请求,如加好友、加群等: + +```python +@dataclass +class FriendRequestEvent(RequestEvent): + """加好友请求事件""" + user_id: int = 0 # 发送请求的 QQ 号 + comment: str = "" # 验证信息 + flag: str = "" # 请求 flag,用于 API 调用 + +@dataclass +class GroupRequestEvent(RequestEvent): + """加群请求/邀请事件""" + sub_type: str = "" # 子类型: add (加群请求), invite (邀请登录号入群) + group_id: int = 0 # 群号 + user_id: int = 0 # 发送请求的 QQ 号 + comment: str = "" # 验证信息 + flag: str = "" # 请求 flag,用于 API 调用 +``` + +### 元事件 + +元事件用于处理框架自身状态变化,如心跳、生命周期等: + +```python +@dataclass +class HeartbeatEvent(MetaEvent): + """心跳事件,用于确认连接状态""" + meta_event_type: str = 'heartbeat' + status: HeartbeatStatus = field(default_factory=HeartbeatStatus) + interval: int = 0 # 心跳间隔时间(ms) + +@dataclass +class LifeCycleEvent(MetaEvent): + """生命周期事件,用于通知框架生命周期变化""" + meta_event_type: str = 'lifecycle' + sub_type: LifeCycleSubType = LifeCycleSubType.ENABLE # 子类型: enable, disable, connect +``` + +### 事件工厂:EventFactory + +事件工厂是框架的核心组件之一,负责将原始 JSON 数据转换为强类型的事件对象: + +```python +class EventFactory: + @staticmethod + def create_event(data: Dict[str, Any]) -> OneBotEvent: + """根据数据创建事件对象""" + post_type = data.get("post_type") + + if post_type == EventType.MESSAGE or post_type == EventType.MESSAGE_SENT: + return EventFactory._create_message_event(data, common_args) + elif post_type == EventType.NOTICE: + return EventFactory._create_notice_event(data, common_args) + elif post_type == EventType.REQUEST: + return EventFactory._create_request_event(data, common_args) + elif post_type == EventType.META: + return EventFactory._create_meta_event(data, common_args) + else: + raise ValueError(f"Unknown event type: {post_type}") +``` + +### 在插件中使用事件 + +插件可以直接使用这些事件类型来处理各种场景: + +```python +from core.command_manager import matcher +from core.bot import Bot +from models import GroupMessageEvent, PrivateMessageEvent +from models.events.notice import GroupIncreaseNoticeEvent +from models.events.request import FriendRequestEvent + +# 处理群消息事件 +@matcher.command("hello") +async def handle_hello(bot: Bot, event: GroupMessageEvent, args: list[str]): + await event.reply(f"你好 {event.sender.nickname}!") + +# 处理私聊消息事件 +@matcher.command("help", permission_level=MessageEvent.USER) +async def handle_help(bot: Bot, event: PrivateMessageEvent, args: list[str]): + await event.reply("这里是帮助信息...") + +# 处理群成员增加通知 +@matcher.on_notice("group_increase") +async def handle_group_increase(bot: Bot, event: GroupIncreaseNoticeEvent): + await bot.send_group_msg( + event.group_id, + f"欢迎新成员 {event.user_id} 加入!操作者:{event.operator_id}" + ) + +# 处理加好友请求 +@matcher.on_request("friend") +async def handle_friend_request(bot: Bot, event: FriendRequestEvent): + # 自动同意所有好友请求 + await bot.set_friend_add_request(flag=event.flag, approve=True) + await bot.send_private_msg(event.user_id, "已通过您的好友请求!") +``` + +### 事件处理的优势 + +1. **类型安全**:所有事件都有明确的类型定义,IDE 可以提供完整的代码提示和补全 +2. **易于测试**:事件对象可以轻松构造,便于编写单元测试 +3. **数据完整**:所有字段都有类型注解,确保数据的一致性和完整性 +4. **性能优化**:使用 `@dataclass(slots=True)` 减少内存占用,提高属性访问速度 +5. **可扩展性**:可以轻松定义自定义事件类型,扩展框架功能 + +### 常用事件属性速查 + +| 事件类型 | 关键属性 | 描述 | +|---------|---------|------| +| **MessageEvent** | `message_type`, `user_id`, `message`, `sender` | 所有消息事件的基类 | +| **PrivateMessageEvent** | 继承自 MessageEvent | 私聊消息事件 | +| **GroupMessageEvent** | `group_id`, `anonymous` | 群聊消息事件,包含群号和匿名信息 | +| **GroupIncreaseNoticeEvent** | `group_id`, `user_id`, `operator_id`, `sub_type` | 群成员增加通知 | +| **RecallGroupNoticeEvent** | `group_id`, `user_id`, `operator_id`, `message_id` | 群消息撤回通知 | +| **FriendRequestEvent** | `user_id`, `comment`, `flag` | 加好友请求事件 | +| **GroupRequestEvent** | `group_id`, `user_id`, `sub_type`, `comment`, `flag` | 加群请求/邀请事件 | +| **HeartbeatEvent** | `status`, `interval` | 心跳事件,用于监控连接状态 | + +通过这套完整的事件模型,NEO 框架为开发者提供了强大而灵活的事件处理能力,同时保持了代码的类型安全和良好的开发体验。 diff --git a/core/admin_manager.py b/core/admin_manager.py new file mode 100644 index 0000000..864df52 --- /dev/null +++ b/core/admin_manager.py @@ -0,0 +1,166 @@ +""" +管理员管理器模块 + +该模块负责管理机器人的管理员列表。 +它实现了文件和 Redis 缓存之间的数据同步,并提供了一套清晰的 API +供其他模块调用。 +""" +import json +import os +from typing import Set + +from .logger import logger + + +class AdminManager: + """ + 管理员管理器类 + + 负责加载、缓存和管理管理员列表。 + 使用单例模式,确保全局只有一个实例。 + """ + _instance = None + _REDIS_KEY = "neobot:admins" # 用于存储管理员集合的 Redis 键 + + def __new__(cls): + """ + 单例模式实现 + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """ + 初始化 AdminManager + """ + if getattr(self, "_initialized", False): + return + + # 管理员数据文件路径 + self.data_file = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "..", + "data", + "admin.json" + ) + + self._admins: Set[int] = set() + self._initialized = True + logger.info("管理员管理器初始化完成") + + async def initialize(self): + """ + 异步初始化,加载数据并同步到 Redis + """ + await self._load_from_file() + await self._sync_to_redis() + logger.info("管理员数据加载并同步到 Redis 完成") + + async def _load_from_file(self): + """ + 从 admin.json 加载管理员列表 + """ + try: + if os.path.exists(self.data_file): + with open(self.data_file, "r", encoding="utf-8") as f: + data = json.load(f) + admins = data.get("admins", []) + self._admins = set(int(admin_id) for admin_id in admins) + logger.debug(f"从 {self.data_file} 加载了 {len(self._admins)} 位管理员") + else: + # 如果文件不存在,创建一个空的 + self._admins = set() + await self._save_to_file() + except (json.JSONDecodeError, ValueError) as e: + logger.error(f"加载或解析 admin.json 失败: {e}") + self._admins = set() + + async def _save_to_file(self): + """ + 将当前管理员列表保存回 admin.json + """ + try: + # 确保目录存在 + os.makedirs(os.path.dirname(self.data_file), exist_ok=True) + # 将 set 转换为 list 以便 JSON 序列化 + admin_list = [str(admin_id) for admin_id in self._admins] + with open(self.data_file, "w", encoding="utf-8") as f: + json.dump({"admins": admin_list}, f, indent=2, ensure_ascii=False) + logger.debug(f"管理员列表已保存到 {self.data_file}") + except Exception as e: + logger.error(f"保存 admin.json 失败: {e}") + + async def _sync_to_redis(self): + """ + 将内存中的管理员集合同步到 Redis + """ + from .redis_manager import redis_manager + try: + # 首先清空旧的集合 + await redis_manager.redis.delete(self._REDIS_KEY) + if self._admins: + # 将所有管理员ID添加到集合中 + await redis_manager.redis.sadd(self._REDIS_KEY, *self._admins) + logger.debug(f"已将 {len(self._admins)} 位管理员同步到 Redis") + except Exception as e: + logger.error(f"同步管理员到 Redis 失败: {e}") + + async def is_admin(self, user_id: int) -> bool: + """ + 检查用户是否为管理员(从 Redis 缓存读取) + """ + from .redis_manager import redis_manager + try: + return await redis_manager.redis.sismember(self._REDIS_KEY, user_id) + except Exception as e: + logger.error(f"从 Redis 检查管理员权限失败: {e}") + # Redis 失败时,回退到内存检查 + return user_id in self._admins + + async def add_admin(self, user_id: int) -> bool: + """ + 添加管理员,并同步到文件和 Redis + """ + from .redis_manager import redis_manager + if user_id in self._admins: + return False # 用户已经是管理员 + + self._admins.add(user_id) + await self._save_to_file() + try: + await redis_manager.redis.sadd(self._REDIS_KEY, user_id) + logger.info(f"已添加新管理员 {user_id} 并更新缓存") + return True + except Exception as e: + logger.error(f"添加管理员 {user_id} 到 Redis 失败: {e}") + return False + + async def remove_admin(self, user_id: int) -> bool: + """ + 移除管理员,并同步到文件和 Redis + """ + from .redis_manager import redis_manager + if user_id not in self._admins: + return False # 用户不是管理员 + + self._admins.remove(user_id) + await self._save_to_file() + try: + await redis_manager.redis.srem(self._REDIS_KEY, user_id) + logger.info(f"已移除管理员 {user_id} 并更新缓存") + return True + except Exception as e: + logger.error(f"从 Redis 移除管理员 {user_id} 失败: {e}") + return False + + async def get_all_admins(self) -> Set[int]: + """ + 获取所有管理员的集合 + """ + return self._admins.copy() + + +# 全局 AdminManager 实例 +admin_manager = AdminManager() diff --git a/core/api/account.py b/core/api/account.py index 0a5ef0d..07bab8d 100644 --- a/core/api/account.py +++ b/core/api/account.py @@ -8,7 +8,7 @@ import json from typing import Dict, Any from .base import BaseAPI from models.objects import LoginInfo, VersionInfo, Status -from core.redis_manager import redis_client as redis_manager +from core.redis_manager import redis_manager class AccountAPI(BaseAPI): diff --git a/core/api/friend.py b/core/api/friend.py index 76e696b..823fa06 100644 --- a/core/api/friend.py +++ b/core/api/friend.py @@ -8,7 +8,7 @@ import json from typing import List, Dict, Any from .base import BaseAPI from models.objects import FriendInfo, StrangerInfo -from core.redis_manager import redis_client as redis_manager +from core.redis_manager import redis_manager class FriendAPI(BaseAPI): @@ -42,12 +42,12 @@ class FriendAPI(BaseAPI): """ cache_key = f"neobot:cache:get_stranger_info:{user_id}" if not no_cache: - cached_data = await redis_manager.get(cache_key) + cached_data = await redis_manager.redis.get(cache_key) if cached_data: return StrangerInfo(**json.loads(cached_data)) res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return StrangerInfo(**res) async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]: @@ -62,12 +62,12 @@ class FriendAPI(BaseAPI): """ cache_key = f"neobot:cache:get_friend_list:{self.self_id}" if not no_cache: - cached_data = await redis_manager.get(cache_key) + cached_data = await redis_manager.redis.get(cache_key) if cached_data: return [FriendInfo(**item) for item in json.loads(cached_data)] res = await self.call_api("get_friend_list") - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return [FriendInfo(**item) for item in res] async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]: diff --git a/core/api/group.py b/core/api/group.py index d4eb5ae..224d148 100644 --- a/core/api/group.py +++ b/core/api/group.py @@ -6,7 +6,7 @@ """ from typing import List, Dict, Any, Optional import json -from core.redis_manager import redis_client as redis_manager +from core.redis_manager import redis_manager from .base import BaseAPI from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo @@ -178,12 +178,12 @@ class GroupAPI(BaseAPI): """ cache_key = f"neobot:cache:get_group_info:{group_id}" if not no_cache: - cached_data = await redis_manager.get(cache_key) + cached_data = await redis_manager.redis.get(cache_key) if cached_data: return GroupInfo(**json.loads(cached_data)) res = await self.call_api("get_group_info", {"group_id": group_id}) - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return GroupInfo(**res) async def get_group_list(self) -> List[GroupInfo]: @@ -210,12 +210,12 @@ class GroupAPI(BaseAPI): """ cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}" if not no_cache: - cached_data = await redis_manager.get(cache_key) + cached_data = await redis_manager.redis.get(cache_key) if cached_data: return GroupMemberInfo(**json.loads(cached_data)) res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id}) - await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 + await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return GroupMemberInfo(**res) async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]: diff --git a/core/command_manager.py b/core/command_manager.py index ec875b2..c51794a 100644 --- a/core/command_manager.py +++ b/core/command_manager.py @@ -4,19 +4,12 @@ 该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心。 它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和 请求事件处理器的能力。 - -主要职责: -- 提供 `@matcher.command()` 装饰器来注册命令。 -- 提供 `@matcher.on_notice()` 装饰器来注册通知处理器。 -- 提供 `@matcher.on_request()` 装饰器来注册请求处理器。 -- 负责解析收到的消息,匹配命令前缀并分发给对应的处理器。 -- 统一处理所有类型的事件,并将其分发给所有已注册的处理器。 -- 内置一个 `/help` 命令,用于展示所有已加载插件的帮助信息。 """ -import inspect -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, Optional, Tuple from .config_loader import global_config +from .event_handler import MessageHandler, NoticeHandler, RequestHandler + # 从配置中获取命令前缀 comm_prefixes = global_config.bot.get("command", ("/",)) @@ -27,6 +20,7 @@ class CommandManager: 命令管理器,负责注册和分发所有类型的事件。 这是一个单例对象(`matcher`),在整个应用中共享。 + 它将不同类型的事件处理委托给专门的处理器类。 """ def __init__(self, prefixes: Tuple[str, ...]): @@ -36,51 +30,91 @@ class CommandManager: Args: prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。 """ - # --- 初始化所有处理器列表 --- - self.prefixes = prefixes - self.commands: Dict[str, Callable] = {} - self.message_handlers: List[Callable] = [] - self.notice_handlers: List[Dict] = [] - self.request_handlers: List[Dict] = [] self.plugins: Dict[str, Dict[str, Any]] = {} + + # 初始化专门的事件处理器 + self.message_handler = MessageHandler(prefixes) + self.notice_handler = NoticeHandler() + self.request_handler = RequestHandler() - # --- 注册内置指令 --- - self.commands["help"] = self._help_command + # 将处理器映射到事件类型 + self.handler_map = { + "message": self.message_handler, + "notice": self.notice_handler, + "request": self.request_handler, + } + + # 注册内置的 /help 命令 + self._register_internal_commands() + + def _register_internal_commands(self): + """ + 注册框架内置的命令 + """ + # Help 命令 + self.message_handler.command("help")(self._help_command) self.plugins["core.help"] = { "name": "帮助", "description": "显示所有可用指令的帮助信息", "usage": "/help", } + # --- 装饰器代理 --- + def on_message(self) -> Callable: """ - 装饰器:用于注册一个通用的消息处理器。 - - 被此装饰器注册的函数,会在每次收到消息时(在指令匹配前)被调用。 - 如果函数返回 True,则表示该消息已被“消费”,后续的指令匹配将不会进行。 - - Example: - @matcher.on_message() - async def code_input_handler(bot, event): - if is_waiting_for_code(event.user_id): - await process_code(event.raw_message) - return True # 消费事件 + 装饰器:注册一个通用的消息处理器。 """ - def decorator(func: Callable) -> Callable: - self.message_handlers.append(func) - return func - return decorator + return self.message_handler.on_message() + def command( + self, + name: str, + permission: Optional[Any] = None, + override_permission_check: bool = False + ) -> Callable: + """ + 装饰器:注册一个消息指令处理器。 + """ + return self.message_handler.command( + name, + permission=permission, + override_permission_check=override_permission_check + ) + + def on_notice(self, notice_type: Optional[str] = None) -> Callable: + """ + 装饰器:注册一个通知事件处理器。 + """ + return self.notice_handler.register(notice_type=notice_type) + + def on_request(self, request_type: Optional[str] = None) -> Callable: + """ + 装饰器:注册一个请求事件处理器。 + """ + return self.request_handler.register(request_type=request_type) + + # --- 事件处理 --- + + async def handle_event(self, bot, event): + """ + 统一的事件分发入口。 + + 根据事件的 `post_type` 将其分发给对应的处理器。 + """ + if event.post_type == 'message' and global_config.bot.get('ignore_self_message', False): + if hasattr(event, 'user_id') and hasattr(event, 'self_id') and event.user_id == event.self_id: + return + + handler = self.handler_map.get(event.post_type) + if handler: + await handler.handle(bot, event) + + # --- 内置命令实现 --- async def _help_command(self, bot, event): """ 内置的 `/help` 命令的实现。 - - 该命令会遍历所有已加载插件的元数据,并生成一段格式化的帮助文本。 - - Args: - bot: Bot 实例。 - event: 消息事件对象。 """ help_text = "--- 可用指令列表 ---\n" @@ -95,187 +129,6 @@ class CommandManager: await bot.send(event, help_text.strip()) - def command(self, name: str) -> Callable: - """ - 装饰器:用于注册一个消息指令处理器。 - - Example: - @matcher.command("echo") - async def handle_echo(bot, event, args): - await bot.send(event, " ".join(args)) - - Args: - name (str): 指令的名称(不包含命令前缀)。 - - Returns: - Callable: 原函数,使其可以继续被调用。 - """ - - def decorator(func: Callable) -> Callable: - self.commands[name] = func - return func - - return decorator - - def on_notice(self, notice_type: str = None) -> Callable: - """ - 装饰器:用于注册一个通知事件处理器。 - - 如果 `notice_type` 未指定,则该处理器会接收所有类型的通知事件。 - - Args: - notice_type (str, optional): 要处理的通知类型 (e.g., "group_increase")。 - Defaults to None. - - Returns: - Callable: 原函数。 - """ - - def decorator(func: Callable) -> Callable: - self.notice_handlers.append({"type": notice_type, "func": func}) - return func - - return decorator - - def on_request(self, request_type: str = None) -> Callable: - """ - 装饰器:用于注册一个请求事件处理器。 - - 如果 `request_type` 未指定,则该处理器会接收所有类型的请求事件。 - - Args: - request_type (str, optional): 要处理的请求类型 (e.g., "friend", "group")。 - Defaults to None. - - Returns: - Callable: 原函数。 - """ - - def decorator(func: Callable) -> Callable: - self.request_handlers.append({"type": request_type, "func": func}) - return func - - return decorator - - async def handle_event(self, bot, event): - """ - 统一的事件分发入口。 - - 由 `WS` 客户端在接收到事件后调用。该方法会根据事件的 `post_type` - 将其分发给对应的具体处理方法。 - - Args: - bot: Bot 实例。 - event: 已解析的事件对象。 - """ - # --- 全局过滤机器人自身消息 --- - # 仅对消息事件生效 - if event.post_type == 'message' and global_config.bot.get('ignore_self_message', False): - - if hasattr(event, 'user_id') and hasattr(event, 'self_id') and event.user_id == event.self_id: - return - - post_type = event.post_type - - if post_type == 'message': - await self.handle_message(bot, event) - elif post_type == 'notice': - await self.handle_notice(bot, event) - elif post_type == 'request': - await self.handle_request(bot, event) - - async def handle_message(self, bot, event): - """ - 处理消息事件,优先执行通用处理器,然后解析并分发指令。 - """ - # --- 1. 执行通用消息处理器 --- - for handler in self.message_handlers: - # 如果任何一个处理器返回 True,则中断后续处理 - consumed = await self._run_handler(handler, bot, event) - if consumed: - return - - # --- 2. 检查并执行指令 --- - if not event.raw_message: - return - - raw_text = event.raw_message.strip() - - prefix_found = None - for p in self.prefixes: - if raw_text.startswith(p): - prefix_found = p - break - - if not prefix_found: - return - - full_cmd = raw_text[len(prefix_found) :].split() - if not full_cmd: - return - - cmd_name = full_cmd[0] - args = full_cmd[1:] - - if cmd_name in self.commands: - func = self.commands[cmd_name] - await self._run_handler(func, bot, event, args) - - async def handle_notice(self, bot, event): - """ - 分发通知事件给所有匹配的处理器。 - - Args: - bot: Bot 实例。 - event: 通知事件对象。 - """ - for handler in self.notice_handlers: - if handler["type"] is None or handler["type"] == event.notice_type: - await self._run_handler(handler["func"], bot, event) - - async def handle_request(self, bot, event): - """ - 分发请求事件给所有匹配的处理器。 - - Args: - bot: Bot 实例。 - event: 请求事件对象。 - """ - for handler in self.request_handlers: - if handler["type"] is None or handler["type"] == event.request_type: - await self._run_handler(handler["func"], bot, event) - - async def _run_handler(self, func: Callable, bot, event, args: List[str] = None): - """ - 智能执行事件处理器,并返回事件是否被消费。 - - 该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参数 - (如 `bot`, `event`, `args`),实现了依赖注入。 - - Args: - func (Callable): 目标处理器函数。 - bot: Bot 实例。 - event: 事件对象。 - args (List[str], optional): 指令参数列表(仅对消息事件有效)。 - - Returns: - bool: 如果处理器函数返回 True,则返回 True,否则返回 False。 - """ - sig = inspect.signature(func) - params = sig.parameters - kwargs = {} - - if "bot" in params: - kwargs["bot"] = bot - if "event" in params: - kwargs["event"] = event - if "args" in params and args is not None: - kwargs["args"] = args - - # 执行函数并获取返回值 - result = await func(**kwargs) - return result is True - # --- 全局单例 --- diff --git a/core/event_handler.py b/core/event_handler.py new file mode 100644 index 0000000..b355718 --- /dev/null +++ b/core/event_handler.py @@ -0,0 +1,197 @@ +""" +事件处理器模块 + +该模块定义了用于处理不同类型事件的处理器类。 +每个处理器都负责注册和分发特定类型的事件。 +""" +import inspect +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, List, Optional, Tuple + +from .bot import Bot +from .permission_manager import Permission, permission_manager +from .exceptions import SyncHandlerError +from .executor import run_in_thread_pool + + +class BaseHandler(ABC): + """ + 事件处理器抽象基类 + """ + def __init__(self): + self.handlers: List[Dict[str, Any]] = [] + + @abstractmethod + async def handle(self, bot: Bot, event: Any): + """ + 处理事件 + """ + raise NotImplementedError + + async def _run_handler( + self, + func: Callable, + bot: Bot, + event: Any, + args: Optional[List[str]] = None, + permission_granted: Optional[bool] = None + ): + """ + 智能执行事件处理器,并注入所需参数 + """ + sig = inspect.signature(func) + params = sig.parameters + kwargs = {} + + if "bot" in params: + kwargs["bot"] = bot + if "event" in params: + kwargs["event"] = event + if "args" in params and args is not None: + kwargs["args"] = args + if "permission_granted" in params and permission_granted is not None: + kwargs["permission_granted"] = permission_granted + + if inspect.iscoroutinefunction(func): + result = await func(**kwargs) + else: + # 如果是同步函数,则放入线程池执行 + result = await run_in_thread_pool(func, **kwargs) + return result is True + + +class MessageHandler(BaseHandler): + """ + 消息事件处理器 + """ + def __init__(self, prefixes: Tuple[str, ...]): + super().__init__() + self.prefixes = prefixes + self.commands: Dict[str, Dict] = {} + self.message_handlers: List[Callable] = [] + + def on_message(self) -> Callable: + """ + 注册通用消息处理器 + """ + def decorator(func: Callable) -> Callable: + if not inspect.iscoroutinefunction(func): + raise SyncHandlerError(f"消息处理器 {func.__name__} 必须是异步函数 (async def).") + self.message_handlers.append(func) + return func + return decorator + + def command( + self, + name: str, + permission: Optional[Permission] = None, + override_permission_check: bool = False + ) -> Callable: + """ + 注册命令处理器 + """ + def decorator(func: Callable) -> Callable: + if not inspect.iscoroutinefunction(func): + raise SyncHandlerError(f"命令处理器 {func.__name__} 必须是异步函数 (async def).") + self.commands[name] = { + "func": func, + "permission": permission, + "override_permission_check": override_permission_check, + } + return func + return decorator + + async def handle(self, bot: Bot, event: Any): + """ + 处理消息事件,包括通用消息和命令 + """ + for handler in self.message_handlers: + consumed = await self._run_handler(handler, bot, event) + if consumed: + return + + if not event.raw_message: + return + + raw_text = event.raw_message.strip() + prefix_found = next((p for p in self.prefixes if raw_text.startswith(p)), None) + + if not prefix_found: + return + + full_cmd = raw_text[len(prefix_found):].split() + if not full_cmd: + return + + cmd_name = full_cmd[0] + args = full_cmd[1:] + + if cmd_name in self.commands: + command_info = self.commands[cmd_name] + func = command_info["func"] + permission = command_info.get("permission") + override_check = command_info.get("override_permission_check", False) + + permission_granted = True + if permission: + permission_granted = await permission_manager.check_permission(event.user_id, permission) + + if not permission_granted and not override_check: + await bot.send(event, f"权限不足,需要 {permission.name} 权限") + return + + await self._run_handler( + func, + bot, + event, + args=args, + permission_granted=permission_granted + ) + + +class NoticeHandler(BaseHandler): + """ + 通知事件处理器 + """ + def register(self, notice_type: Optional[str] = None) -> Callable: + """ + 注册通知处理器 + """ + def decorator(func: Callable) -> Callable: + if not inspect.iscoroutinefunction(func): + raise SyncHandlerError(f"通知处理器 {func.__name__} 必须是异步函数 (async def).") + self.handlers.append({"type": notice_type, "func": func}) + return func + return decorator + + async def handle(self, bot: Bot, event: Any): + """ + 处理通知事件 + """ + for handler in self.handlers: + if handler["type"] is None or handler["type"] == event.notice_type: + await self._run_handler(handler["func"], bot, event) + + +class RequestHandler(BaseHandler): + """ + 请求事件处理器 + """ + def register(self, request_type: Optional[str] = None) -> Callable: + """ + 注册请求处理器 + """ + def decorator(func: Callable) -> Callable: + if not inspect.iscoroutinefunction(func): + raise SyncHandlerError(f"请求处理器 {func.__name__} 必须是异步函数 (async def).") + self.handlers.append({"type": request_type, "func": func}) + return func + return decorator + + async def handle(self, bot: Bot, event: Any): + """ + 处理请求事件 + """ + for handler in self.handlers: + if handler["type"] is None or handler["type"] == event.request_type: + await self._run_handler(handler["func"], bot, event) diff --git a/core/exceptions.py b/core/exceptions.py new file mode 100644 index 0000000..9b8cd18 --- /dev/null +++ b/core/exceptions.py @@ -0,0 +1,9 @@ +""" +自定义异常模块 +""" + +class SyncHandlerError(Exception): + """ + 当尝试注册同步函数作为异步事件处理器时抛出此异常。 + """ + pass diff --git a/core/executor.py b/core/executor.py new file mode 100644 index 0000000..6d691bd --- /dev/null +++ b/core/executor.py @@ -0,0 +1,27 @@ +""" +线程池执行器 + +提供一个全局的线程池和异步接口,用于在事件循环中安全地运行同步函数。 +""" +import asyncio +from concurrent.futures import ThreadPoolExecutor +from functools import partial +from typing import Any, Callable + +# 创建一个全局的线程池,可以根据需要调整 max_workers +executor = ThreadPoolExecutor(max_workers=10) + +async def run_in_thread_pool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + """ + 在线程池中异步运行同步函数 + + :param func: 要运行的同步函数 + :param args: 函数的位置参数 + :param kwargs: 函数的关键字参数 + :return: 函数的返回值 + """ + loop = asyncio.get_running_loop() + # 使用 functools.partial 绑定函数和参数,以便传递给 run_in_executor + func_to_run = partial(func, *args, **kwargs) + # loop.run_in_executor 会返回一个 awaitable 对象 + return await loop.run_in_executor(executor, func_to_run) diff --git a/core/permission_manager.py b/core/permission_manager.py new file mode 100644 index 0000000..c79a18d --- /dev/null +++ b/core/permission_manager.py @@ -0,0 +1,252 @@ +""" +权限管理器模块 + +该模块负责管理用户权限,支持 admin、op、user 三个权限级别。 +权限数据存储在 `permissions.json` 文件中,格式为: +{ + "users": { + "123456": "admin", + "789012": "op", + "345678": "user" + } +} +""" +import json +import os +from functools import total_ordering +from typing import Dict + +from .logger import logger +from .admin_manager import admin_manager # 导入 AdminManager + + +@total_ordering +class Permission: + """ + 权限封装类 + + 封装了权限的名称和等级,并提供了比较方法。 + 使用 @total_ordering 装饰器可以自动生成所有的比较运算符。 + """ + def __init__(self, name: str, level: int): + """ + 初始化权限对象 + + Args: + name (str): 权限名称 (e.g., "admin", "op") + level (int): 权限等级,数字越大权限越高 + """ + self.name = name + self.level = level + + def __eq__(self, other): + """ + 判断权限是否相等 + """ + if not isinstance(other, Permission): + return NotImplemented + return self.level == other.level + + def __lt__(self, other): + """ + 判断权限是否小于另一个权限 + """ + if not isinstance(other, Permission): + return NotImplemented + return self.level < other.level + + def __str__(self) -> str: + """ + 返回权限的字符串表示(即权限名称) + """ + return self.name + + +# 定义全局权限常量 +ADMIN = Permission("admin", 3) +OP = Permission("op", 2) +USER = Permission("user", 1) + +# 用于从字符串名称查找权限对象的字典 +_PERMISSIONS: Dict[str, Permission] = { + p.name: p for p in [ADMIN, OP, USER] +} + + +class PermissionManager: + """ + 权限管理器类 + + 负责加载、保存和查询用户权限数据。 + 使用单例模式,确保全局只有一个权限管理器实例。 + """ + + _instance = None + + def __new__(cls): + """ + 单例模式实现 + + Returns: + PermissionManager: 全局唯一的权限管理器实例 + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """ + 初始化权限管理器 + + 如果已经初始化过,则直接返回。 + """ + if getattr(self, "_initialized", False): + return + + # 权限数据文件路径 + self.data_file = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "..", + "data", + "permissions.json" + ) + + # 确保数据目录存在 + data_dir = os.path.dirname(self.data_file) + os.makedirs(data_dir, exist_ok=True) + + # 权限数据存储结构:{"users": {"user_id": "level_name"}} + self._data: Dict[str, Dict[str, str]] = {"users": {}} + + # 加载现有数据 + self.load() + + self._initialized = True + logger.info("权限管理器初始化完成") + + def load(self) -> None: + """ + 从文件加载权限数据 + + 如果文件不存在,则创建空文件并初始化默认数据结构。 + """ + try: + if os.path.exists(self.data_file): + with open(self.data_file, "r", encoding="utf-8") as f: + data = json.load(f) + # 兼容旧格式 + if "users" in data: + self._data["users"] = data["users"] + else: + self._data["users"] = {} + logger.debug(f"权限数据已从 {self.data_file} 加载") + else: + # 文件不存在,创建空文件 + self.save() + logger.debug(f"创建空的权限数据文件: {self.data_file}") + except json.JSONDecodeError as e: + logger.error(f"权限数据文件格式错误: {e}") + # 文件损坏,重置为空数据 + self._data["users"] = {} + self.save() + except Exception as e: + logger.error(f"加载权限数据失败: {e}") + self._data["users"] = {} + + def save(self) -> None: + """ + 将权限数据保存到文件 + """ + try: + with open(self.data_file, "w", encoding="utf-8") as f: + json.dump(self._data, f, indent=2, ensure_ascii=False) + logger.debug(f"权限数据已保存到 {self.data_file}") + except Exception as e: + logger.error(f"保存权限数据失败: {e}") + + async def get_user_permission(self, user_id: int) -> Permission: + """ + 获取指定用户的权限对象 + + Args: + user_id (int): 用户 QQ 号 + + Returns: + Permission: 用户的权限对象,如果用户不存在则返回默认级别 USER + """ + # 首先,通过 AdminManager 检查是否为管理员 + if await admin_manager.is_admin(user_id): + return ADMIN + + # 如果不是管理员,则从 permissions.json 中查找 + user_id_str = str(user_id) + level_name = self._data["users"].get(user_id_str, USER.name) + return _PERMISSIONS.get(level_name, USER) + + def set_user_permission(self, user_id: int, permission: Permission) -> None: + """ + 设置指定用户的权限级别 + + Args: + user_id (int): 用户 QQ 号 + permission (Permission): 权限对象 + + Raises: + ValueError: 如果权限对象无效 + """ + if not isinstance(permission, Permission) or permission.name not in _PERMISSIONS: + raise ValueError(f"无效的权限对象: {permission}") + + user_id_str = str(user_id) + self._data["users"][user_id_str] = permission.name + self.save() + logger.info(f"设置用户 {user_id} 的权限级别为 {permission.name}") + + def remove_user(self, user_id: int) -> None: + """ + 移除指定用户的权限设置,恢复为默认级别 + + Args: + user_id (int): 用户 QQ 号 + """ + user_id_str = str(user_id) + if user_id_str in self._data["users"]: + del self._data["users"][user_id_str] + self.save() + logger.info(f"移除用户 {user_id} 的权限设置") + + async def check_permission(self, user_id: int, required_permission: Permission) -> bool: + """ + 检查用户是否具有指定权限级别 + + Args: + user_id (int): 用户 QQ 号 + required_permission (Permission): 所需的权限对象 + + Returns: + bool: 如果用户权限 >= 所需权限,返回 True,否则返回 False + """ + user_permission = await self.get_user_permission(user_id) + return user_permission >= required_permission + + def get_all_users(self) -> Dict[str, str]: + """ + 获取所有设置了权限的用户及其级别名称 + + Returns: + Dict[str, str]: 用户ID到权限级别名称的映射 + """ + return self._data["users"].copy() + + def clear_all(self) -> None: + """ + 清空所有权限设置 + """ + self._data["users"].clear() + self.save() + logger.info("已清空所有权限设置") + + +# 全局权限管理器实例 +permission_manager = PermissionManager() \ No newline at end of file diff --git a/core/plugin_manager.py b/core/plugin_manager.py index a80a197..8b0e7f1 100644 --- a/core/plugin_manager.py +++ b/core/plugin_manager.py @@ -11,7 +11,9 @@ import pkgutil import sys from core.command_manager import matcher +from core.exceptions import SyncHandlerError from .logger import logger +from .executor import run_in_thread_pool def load_all_plugins(): @@ -49,6 +51,8 @@ def load_all_plugins(): type_str = "包" if is_pkg else "文件" logger.success(f" [{type_str}] 成功{action}: {module_name}") + except SyncHandlerError as e: + logger.error(f" 插件 {module_name} 加载失败: {e} (跳过此插件)") except Exception as e: print( f" {action if 'action' in locals() else '加载'}插件 {module_name} 失败: {e}" @@ -75,50 +79,48 @@ class PluginDataManager: self.plugin_name + ".json", ) self.data = {} - self.load() - def load(self): + async def load(self): """读取配置文件""" if not os.path.exists(self.data_file): - with open(self.data_file, "w", encoding="utf-8") as f: - self.set(self.plugin_name, []) + await self.set(self.plugin_name, []) try: with open(self.data_file, "r", encoding="utf-8") as f: - self.data = json.load(f) + self.data = await run_in_thread_pool(json.load, f) except json.JSONDecodeError: self.data = {} - def save(self): + async def save(self): """保存配置到文件""" with open(self.data_file, "w", encoding="utf-8") as f: - json.dump(self.data, f, indent=2, ensure_ascii=False) + await run_in_thread_pool(json.dump, self.data, f, indent=2, ensure_ascii=False) def get(self, key, default=None): """获取配置项""" return self.data.get(key, default) - def set(self, key, value): + async def set(self, key, value): """设置配置项""" self.data[key] = value - self.save() + await self.save() - def add(self, key, value): + async def add(self, key, value): """添加配置项""" if key not in self.data: self.data[key] = [] self.data[key].append(value) - self.save() + await self.save() - def remove(self, key): + async def remove(self, key): """删除配置项""" if key in self.data: del self.data[key] - self.save() + await self.save() - def clear(self): + async def clear(self): """清空所有配置""" self.data.clear() - self.save() + await self.save() def get_all(self): return self.data.copy() diff --git a/core/redis_manager.py b/core/redis_manager.py index 3786d9c..7440a22 100644 --- a/core/redis_manager.py +++ b/core/redis_manager.py @@ -1,20 +1,24 @@ -import redis +import redis.asyncio as redis from .config_loader import global_config as config from .logger import logger class RedisManager: """ - Redis 连接管理器 + Redis 连接管理器(异步单例) """ - _pool = None - _client = None + _instance = None + _redis = None - @classmethod - def initialize(cls): + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + async def initialize(self): """ - 初始化 Redis 连接并进行健康检查 + 异步初始化 Redis 连接并进行健康检查 """ - if cls._pool is None: + if self._redis is None: try: host = config.redis['host'] port = config.redis['port'] @@ -23,39 +27,32 @@ class RedisManager: logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}") - cls._pool = redis.ConnectionPool( + self._redis = redis.Redis( host=host, port=port, db=db, password=password, decode_responses=True ) - cls._client = redis.Redis(connection_pool=cls._pool) - if cls._client.ping(): + if await self._redis.ping(): logger.success("Redis 连接成功!") else: logger.error("Redis 连接失败: PING 命令无响应") except redis.exceptions.ConnectionError as e: logger.error(f"Redis 连接失败: {e}") - cls._pool = None - cls._client = None + self._redis = None except Exception as e: logger.exception(f"Redis 初始化时发生未知错误: {e}") - cls._pool = None - cls._client = None + self._redis = None - @classmethod - def get_redis(cls): + @property + def redis(self): """ - 获取 Redis 连接 - - :return: Redis 连接实例 + 获取 Redis 连接实例 """ - if cls._client is None: - # 理论上 initialize 应该在程序启动时被调用,这里作为备用 - cls.initialize() - return cls._client + if self._redis is None: + raise ConnectionError("Redis 未初始化或连接失败,请先调用 initialize()") + return self._redis -# 在模块加载时直接初始化 -RedisManager.initialize() -redis_client = RedisManager.get_redis() +# 全局 Redis 管理器实例 +redis_manager = RedisManager() diff --git a/data/admin.json b/data/admin.json new file mode 100644 index 0000000..577c240 --- /dev/null +++ b/data/admin.json @@ -0,0 +1,3 @@ +{ + "admins": [2221577113] +} \ No newline at end of file diff --git a/data/permissions.json b/data/permissions.json new file mode 100644 index 0000000..864ddb4 --- /dev/null +++ b/data/permissions.json @@ -0,0 +1,3 @@ +{ + "users": {} +} \ No newline at end of file diff --git a/html/404.html b/html/404.html new file mode 100644 index 0000000..be035f3 --- /dev/null +++ b/html/404.html @@ -0,0 +1,288 @@ + +404 - Signal Lost + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + +
+
+
+
+ + +
+ + +
+ + +
+

+ 404 +

+
+ Sys.Malfunction + 0x00_DEAD +
+
+ + +
+ +
+
+
+
+
+
+
root@neobot:~/system/logs
+
+ + +
+
+ +
> initiating_handshake...
> resolving_host: calglaubot.internal
> connection_established (port: 443)
> GET /requested_resource HTTP/1.1
> waiting_for_response...
> FATAL: endpoint_not_found
> stack_trace_dump:
> at Router.resolve (core.js:204)
> at Neobot.Handler (main.py:404)
> error: signal_lost_in_void
+
+ + +
+
+
+ + + + +
+ + +
+
+ CPU: 98% + | + MEM: OVERFLOW +
+
+ NEOBOT FRAMEWORK +
+
+
\ No newline at end of file diff --git a/html/index.html b/html/index.html new file mode 100644 index 0000000..4eba739 --- /dev/null +++ b/html/index.html @@ -0,0 +1,387 @@ + + + + + + NEOBOT | F.O.S FRAMEWORK + + + + + + + + + + + + + +
+
+ + +
+
+
+ 🚀 HIGH PERFORMANCE ASYNC FRAMEWORK +
+ +

+ 为现代开发而生
+ NEO 机器人框架 +

+ +

+ 基于 Python 异步生态构建的 OneBot 11 解决方案。内置 Redis 缓存、插件热重载与类型安全检查。这是我的第一个 Python 作品,致力于极致的开发体验。 +

+ + +
+
Core Team
+
+ + 镀铬酸钾 + + baby2016 +
+ Fairy-Oracle-Sanctuary +
+ +
+ +
+ $ git clone ... + +
+
+
+ + +
+
+
+
+
+
+
+
+
+ main.py +
+
+
"""
+NEO Bot 主程序入口
+负责启动 WebSocket 连接,初始化插件系统,并提供热重载功能。
+"""
+import asyncio
+from watchdog.observers import Observer
+from watchdog.events import FileSystemEventHandler
+
+from core.logger import logger
+from core.ws import WS
+from core.plugin_manager import load_all_plugins
+
+class PluginReloadHandler(FileSystemEventHandler):
+    """监听文件变更,触发热重载"""
+    def on_any_event(self, event):
+        if not event.src_path.endswith(".py"):
+            return
+        
+        logger.info(f"检测到文件变更: {event.src_path}")
+        try:
+            run_in_thread_pool(load_all_plugins)
+            logger.success("插件重载完成")
+        except Exception as e:
+            logger.exception(f"重载失败: {e}")
+
+@logger.catch
+async def main():
+    # 1. 初始化核心组件
+    await run_in_thread_pool(load_all_plugins)
+    await redis_manager.initialize()
+    await admin_manager.initialize()
+
+    # 2. 启动 Watchdog 热重载
+    observer = Observer()
+    observer.schedule(PluginReloadHandler(), plugin_path, recursive=True)
+    observer.start()
+
+    # 3. 启动 WebSocket 客户端
+    try:
+        bot = WS()
+        await bot.connect()
+    finally:
+        observer.stop()
+
+if __name__ == "__main__":
+    asyncio.run(main())
+
+
+
+
+ + +
+
+

为什么选择 NEO?

+

不仅仅是一个框架,更是一套完整的现代化开发解决方案。

+
+ +
+ +
+
+ +
+

高性能异步 IO

+

+ 基于 Python 原生 asynciowebsockets 构建。完全非阻塞设计,单进程即可轻松处理海量并发消息,拒绝卡顿。 +

+
+ + +
+
+ +
+

智能插件热重载

+

+ 基于 watchdog 实现文件监控。修改代码后自动重载插件逻辑,无需重启机器人进程。让调试和开发效率提升 200%。 +

+
+ + +
+
+ +
+

Redis 深度集成

+

+ 内置 Redis 连接池。自动缓存群信息、好友列表等高频数据,减少 API 调用延迟,让响应速度快人一步。 +

+
+ + +
+
+ +
+

类型安全

+

+ 全面采用 Pydantic 和 Dataclasses。为所有事件和数据模型提供完整的类型注解,IDE 智能补全,减少运行时错误。 +

+
+ + +
+
+ +
+

精细权限管理

+

+ 内置 Admin/Op/User 三级权限体系。支持动态添加管理员,通过装饰器即可轻松控制每个指令的访问权限。 +

+
+ + +
+
+ +
+

标准 OneBot 11

+

+ 完美兼容 OneBot v11 协议标准。支持 NapCatQQ、LLOneBot 等主流实现端,无缝对接,开箱即用。 +

+
+
+
+ + +
+
+ +

TERMINAL OUTPUT

+
+
+
+ +
+
+
+ + +
+
+ +
+
+

性能建议:使用 PyPy

+

+ 为了获得最佳性能,我们强烈推荐使用 PyPy JIT 编译器 来运行 NEO 框架。在处理高并发消息时,PyPy 相比标准 CPython 能提供显著的性能提升。 +

+
+
+ +
+
+ + + + + + diff --git a/main.py b/main.py index d4c723c..5b15394 100644 --- a/main.py +++ b/main.py @@ -13,8 +13,11 @@ from watchdog.events import FileSystemEventHandler # 初始化日志系统,必须在其他 core 模块导入之前执行 from core.logger import logger +from core.admin_manager import admin_manager from core.ws import WS from core.plugin_manager import load_all_plugins +from core.redis_manager import redis_manager +from core.executor import run_in_thread_pool class PluginReloadHandler(FileSystemEventHandler): @@ -62,7 +65,7 @@ class PluginReloadHandler(FileSystemEventHandler): try: # 重新扫描并加载插件 - load_all_plugins() + run_in_thread_pool(load_all_plugins) logger.success("插件重载完成") except Exception as e: logger.exception(f"重载失败: {e}") @@ -78,7 +81,13 @@ async def main(): 3. 建立连接并保持运行 """ # 首次加载插件 - load_all_plugins() + await run_in_thread_pool(load_all_plugins) + + # 初始化 Redis 连接 + await redis_manager.initialize() + + # 初始化管理员管理器 + await admin_manager.initialize() # 启动文件监控 # 监控 plugins 目录 diff --git a/models/events/message.py b/models/events/message.py index 3ede724..febb1d9 100644 --- a/models/events/message.py +++ b/models/events/message.py @@ -6,6 +6,7 @@ from dataclasses import dataclass, field from typing import List, Optional +from core.permission_manager import ADMIN, OP, USER from models.message import MessageSegment from models.sender import Sender from .base import OneBotEvent, EventType @@ -32,6 +33,11 @@ class MessageEvent(OneBotEvent): 消息事件基类 """ + # 权限级别常量,用于装饰器参数 + ADMIN = ADMIN + OP = OP + USER = USER + message_type: str """消息类型: private (私聊), group (群聊)""" diff --git a/plugins/admin.py b/plugins/admin.py index 8e1b0e6..2d4854d 100644 --- a/plugins/admin.py +++ b/plugins/admin.py @@ -1,115 +1,74 @@ -from core import PluginDataManager +""" +管理员管理插件 + +提供通过聊天指令动态添加或移除机器人管理员的功能。 +""" from core.bot import Bot from core.command_manager import matcher -from models import GroupMessageEvent +from core.admin_manager import admin_manager +from models.events.message import MessageEvent __plugin_meta__ = { - "name": "admin", - "description": "机器人权限管理插件", - "usage": "/admin", + "name": "管理员管理", + "description": "管理机器人的全局管理员", + "usage": ( + "/admin list - 列出所有管理员\n" + "/admin add - 添加管理员\n" + "/admin remove - 移除管理员" + ), } -data = PluginDataManager("admin") +@matcher.command("admin", permission=MessageEvent.ADMIN) +async def handle_admin_command(bot: Bot, event: MessageEvent, args: list[str]): + """ + 处理 /admin 指令 -@matcher.command("admin") -async def handle_permission(bot: Bot, event: GroupMessageEvent, args: list[str]): + :param bot: Bot 实例 + :param event: 消息事件实例 + :param args: 指令参数列表 + """ if not args: - await event.reply( - "机器人权限管理插件指令:\n/admin list 列出所有权限\n/admin add member 添加群成员权限\n/admin remove member 删除群成员权限\n/admin add group <群号> 添加群权限\n/admin remove group <群号> 删除群权限\n/admin clear member 清空群成员权限\n/admin clear group 清空群权限\n/admin clear all 清空所有权限" - ) - return - - if str(event.user_id) not in data.get("members", []): - await event.reply("你没有权限使用此命令。") - return - if str(event.group_id) not in data.get("groups", []): - await event.reply("群聊不在权限中") + await event.reply(__plugin_meta__["usage"]) return action = args[0].lower() - # ensure storage keys exist - members = data.get("members", []) or [] - groups = data.get("groups", []) or [] - if action == "list": - msg_lines = ["当前权限列表:"] - msg_lines.append( - f"群成员权限 ({len(members)}): {', '.join(members) if members else '无'}" - ) - msg_lines.append( - f"群权限 ({len(groups)}): {', '.join(groups) if groups else '无'}" - ) - await event.reply("\n".join(msg_lines)) + admins = await admin_manager.get_all_admins() + if not admins: + await event.reply("当前没有设置任何管理员。") + return + + admin_list_str = "\n".join(str(admin_id) for admin_id in admins) + await event.reply(f"当前管理员列表 ({len(admins)}):\n{admin_list_str}") return if action in ("add", "remove"): - if len(args) < 3: - await event.reply("参数错误,示例:/admin add member 123456") + if len(args) < 2 or not args[1].isdigit(): + await event.reply("参数错误,请提供一个有效的 QQ 号。\n示例: /admin add 123456") return - target = args[1].lower() - value = args[2] - - if target == "member": - # operate on members list - if action == "add": - if str(value) in members: - await event.reply(f"成员 {value} 已存在,无需重复添加。") - return - members.append(str(value)) - data.set("members", members) - await event.reply(f"已添加群成员权限:{value}") - return - else: # remove - if str(value) not in members: - await event.reply(f"成员 {value} 不在权限列表中。") - return - members = [m for m in members if m != str(value)] - data.set("members", members) - await event.reply(f"已移除群成员权限:{value}") - return - - if target == "group": - if action == "add": - if str(value) in groups: - await event.reply(f"群 {value} 已存在,无需重复添加。") - return - groups.append(str(value)) - data.set("groups", groups) - await event.reply(f"已添加群权限:{value}") - return - else: # remove - if str(value) not in groups: - await event.reply(f"群 {value} 不在权限列表中。") - return - groups = [g for g in groups if g != str(value)] - data.set("groups", groups) - await event.reply(f"已移除群权限:{value}") - return - - await event.reply("未知目标类型,请使用 member 或 group") - return - - if action == "clear": - if len(args) < 2: - await event.reply("参数错误,示例:/admin clear member") + try: + user_id = int(args[1]) + except ValueError: + await event.reply("无效的 QQ 号,请输入纯数字。") return - target = args[1].lower() - if target == "member": - data.set("members", []) - await event.reply("已清空群成员权限。") - return - if target == "group": - data.set("groups", []) - await event.reply("已清空群权限。") - return - if target == "all": - data.clear() - await event.reply("已清空所有权限。") - return - await event.reply("未知清空目标,请使用 member/group/all") - return - await event.reply("未知指令,使用 /admin 查看帮助") + if action == "add": + success = await admin_manager.add_admin(user_id) + if success: + await event.reply(f"成功添加管理员: {user_id}") + else: + await event.reply(f"管理员 {user_id} 已存在,无需重复添加。") + return + + elif action == "remove": + success = await admin_manager.remove_admin(user_id) + if success: + await event.reply(f"成功移除管理员: {user_id}") + else: + await event.reply(f"管理员 {user_id} 不存在。") + return + + await event.reply(f"未知的指令: {action}\n\n{__plugin_meta__['usage']}") diff --git a/plugins/code_py.py b/plugins/code_py.py index cb53f2b..b595354 100644 --- a/plugins/code_py.py +++ b/plugins/code_py.py @@ -13,6 +13,7 @@ from typing import Tuple, Set from core.bot import Bot from core.command_manager import matcher +from core.executor import run_in_thread_pool from models import MessageEvent __plugin_meta__ = { @@ -38,9 +39,11 @@ def is_code_safe(code: str) -> Tuple[bool, str]: statements = STATEMENT_SPLIT_PATTERN.split(code) for statement in statements: statement = statement.strip() - if not statement: continue + if not statement: + continue parts = statement.split() - if not parts: continue + if not parts: + continue if parts[0] == 'from' and len(parts) > 1: module_name = parts[1].strip() if module_name in DANGEROUS_MODULES: @@ -83,7 +86,7 @@ async def process_and_reply(bot: Bot, event: MessageEvent, code: str): """ 核心处理逻辑:安全检查、执行代码并回复结果。 """ - safe, message = is_code_safe(code) + safe, message = await run_in_thread_pool(is_code_safe, code) if not safe: await event.reply(f"代码安全检查未通过:\n{message}") return @@ -150,11 +153,11 @@ async def handle_code_input(bot: Bot, event: MessageEvent): # 处理取消操作 if event.raw_message.strip() == "取消": await event.reply("已取消输入。") - return True # 消费事件 + return True # 消费事件 # 执行代码 await process_and_reply(bot, event, event.raw_message) - return True # 消费事件,防止被其他指令匹配 + return True # 消费事件,防止被其他指令匹配 # 如果用户不在等待状态,则不处理 return False diff --git a/plugins/data/admin.json b/plugins/data/admin.json deleted file mode 100644 index 9e26dfe..0000000 --- a/plugins/data/admin.json +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file diff --git a/plugins/echo.py b/plugins/echo.py index 24a997d..34a7f22 100644 --- a/plugins/echo.py +++ b/plugins/echo.py @@ -29,18 +29,26 @@ async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): await event.reply(reply_msg) -@matcher.command("赞我") -async def handle_poke(bot: Bot, event: MessageEvent, args: list[str]): +@matcher.command( + "赞我", + permission=MessageEvent.ADMIN, + override_permission_check=True +) +async def handle_poke(bot: Bot, event: MessageEvent, permission_granted: bool): """ 处理 赞我 指令,发送点赞 :param bot: Bot 实例 :param event: 消息事件对象 - :param args: 指令参数列表(本指令不使用参数) + :param permission_granted: 权限检查结果 """ + if not permission_granted: + await event.reply("只有我的操作员才能让我点赞哦!(。•ˇ‸ˇ•。)") + return + try: # 尝试发送赞 await bot.send_like(event.user_id, times=10) - await event.reply("戳一戳发送成功!") + await event.reply("好感度+10!(〃'▽'〃)") except Exception as e: - await event.reply(f"戳一戳发送失败:{str(e)}") \ No newline at end of file + await event.reply(f"点赞失败了 >_<: {str(e)}") \ No newline at end of file diff --git a/plugins/jrcd.py b/plugins/jrcd.py index d4df757..f1d4767 100644 --- a/plugins/jrcd.py +++ b/plugins/jrcd.py @@ -9,6 +9,7 @@ from datetime import datetime from core.bot import Bot from core.command_manager import matcher +from core.executor import run_in_thread_pool from models import MessageEvent, MessageSegment __plugin_meta__ = { @@ -77,7 +78,7 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]): :param args: 指令参数列表(未使用)。 """ user_id = event.user_id - jrcd = get_jrcd(user_id) + jrcd = await run_in_thread_pool(get_jrcd, user_id) msg = [MessageSegment.at(user_id)] if jrcd <= 9: msg.append(MessageSegment.text(random.choice(JRCDMSG_1) % jrcd)) @@ -112,8 +113,8 @@ async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]): await event.reply("不能和自己比!") return - jrcd1 = get_jrcd(user_id1) - jrcd2 = get_jrcd(user_id2) + jrcd1 = await run_in_thread_pool(get_jrcd, user_id1) + jrcd2 = await run_in_thread_pool(get_jrcd, user_id2) jrcz = jrcd1 - jrcd2 diff --git a/plugins/sync_async_test_plugin.py b/plugins/sync_async_test_plugin.py new file mode 100644 index 0000000..b863959 --- /dev/null +++ b/plugins/sync_async_test_plugin.py @@ -0,0 +1,88 @@ +""" +同步/异步函数测试插件 + +用于演示 SyncHandlerError 异常以及如何将同步函数放入线程池执行。 +""" +import time +from typing import Any +from core.command_manager import matcher +from core.executor import run_in_thread_pool +from core.bot import Bot +from core.logger import logger + +# 插件元数据 +__plugin_meta__ = { + "name": "SyncAsyncTestPlugin", + "description": "用于测试同步/异步函数处理的插件。", + "usage": ( + "/test_sync_error - 尝试注册一个同步函数作为异步处理器,会触发错误。\n" + "/test_blocking_task - 演示将同步阻塞任务放入线程池执行。" + ), +} + +# --- 示例 1: 触发 SyncHandlerError (此函数不会被成功注册) --- + +# 这是一个同步函数,如果直接用 @matcher.message_handler 装饰, +# 并且 event_handler 检查到它是同步的,就会抛出 SyncHandlerError。 +# 注意:为了演示错误,我们不会真正注册它,因为注册会失败。 +def _sync_function_that_should_fail(bot: Bot, event: Any): + """ + 一个同步函数,如果直接作为异步事件处理器注册,会触发 SyncHandlerError。 + """ + logger.info("这个同步函数不应该被直接调用。") + return "这是一个同步函数的结果。" + +# --- 示例 2: 将同步阻塞任务放入线程池运行 --- + +def _blocking_task(duration: int) -> str: + """ + 一个模拟耗时操作的同步函数。 + Args: + duration (int): 模拟阻塞的秒数。 + Returns: + str: 任务完成消息。 + """ + logger.info(f"同步阻塞任务开始,持续 {duration} 秒...") + time.sleep(duration) + logger.info("同步阻塞任务结束。") + return f"阻塞任务完成,耗时 {duration} 秒。" + +@matcher.message_handler.command("test_blocking_task") +async def test_blocking_task_handler(bot: Bot, event: Any, args: list): + """ + 处理 /test_blocking_task 命令,将同步阻塞任务放入线程池执行。 + Args: + bot (Bot): 机器人实例。 + event (Any): 接收到的事件对象。 + args (list): 命令参数列表。 + """ + if not args: + await bot.send(event, "请提供阻塞时长,例如:/test_blocking_task 5") + return + + try: + duration = int(args[0]) + if duration <= 0: + raise ValueError("时长必须是正整数。") + except ValueError: + await bot.send(event, "无效的时长,请提供一个正整数。") + return + + await bot.send(event, f"开始执行同步阻塞任务,预计耗时 {duration} 秒...") + + # 将同步函数放入线程池执行 + result = await run_in_thread_pool(_blocking_task, duration) + + await bot.send(event, f"同步阻塞任务已完成:{result}") + +# --- 示例 3: 尝试注册一个同步函数作为异步处理器 (会失败) --- +# 这个函数不会被成功注册,因为 event_handler 会检测到它是同步的并抛出 SyncHandlerError。 +# 插件管理器会捕获这个错误并跳过加载此插件。 +# 为了演示,我们故意尝试注册它。 +# @matcher.message_handler.command("test_sync_error") +# def test_sync_error_handler(bot: Bot, event: Any): +# """ +# 这个同步函数尝试作为异步处理器注册,会触发 SyncHandlerError。 +# """ +# logger.error("这个同步函数不应该被直接注册为异步处理器。") +# return "这个消息不应该被看到。"