feat: 整合开发历史 (#20),大更新。。。

This commit is contained in:
镀铬酸钾
2026-01-04 22:21:35 +08:00
committed by GitHub
parent 0965123c1d
commit a733d3dc4b
25 changed files with 2199 additions and 506 deletions

2
.gitignore vendored
View File

@@ -138,4 +138,4 @@ dmypy.json
# pytype static type analyzer # pytype static type analyzer
.pytype/ .pytype/
# End of https://www.toptal.com/developers/gitignore/api/python # End of https://www.toptal.com/developers/gitignore/api/python

680
README.md
View File

@@ -35,7 +35,7 @@ NEO 框架的设计遵循以下核心理念:
* **类型安全**:基于 `dataclasses` 的强类型事件模型,开发体验更佳。 * **类型安全**:基于 `dataclasses` 的强类型事件模型,开发体验更佳。
* **插件系统**:轻量级的装饰器风格插件系统,支持指令 (`@matcher.command`) 和事件监听 (`@matcher.on_notice`, `@matcher.on_request`)。 * **插件系统**:轻量级的装饰器风格插件系统,支持指令 (`@matcher.command`) 和事件监听 (`@matcher.on_notice`, `@matcher.on_request`)。
* **插件元数据与内置帮助**:插件可通过 `__plugin_meta__` 变量进行自我描述。框架核心内置了 `/help` 指令,可自动收集并展示所有插件的帮助信息,无需手动维护。 * **插件元数据与内置帮助**:插件可通过 `__plugin_meta__` 变量进行自我描述。框架核心内置了 `/help` 指令,可自动收集并展示所有插件的帮助信息,无需手动维护。
* **🔥 热重载支持**:内置文件监控,修改 `base_plugins` 下的代码自动重载,无需重启,极大提升调试效率。 * **🔥 热重载支持**:内置文件监控,修改 `plugins` 下的代码自动重载,无需重启,极大提升调试效率。
* **异步核心**:基于 `asyncio``websockets` 的高性能异步核心。 * **异步核心**:基于 `asyncio``websockets` 的高性能异步核心。
* **自动重连**:内置 WebSocket 断线重连机制。 * **自动重连**:内置 WebSocket 断线重连机制。
@@ -131,43 +131,112 @@ latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True)
### 其他改进 ### 其他改进
- [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。 - [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。
- [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。 - [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。
- [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制 - [x] **权限系统**: 实现基础的权限管理(超级管理员、群管理员等)
- [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot - [x] **日志系统优化**: 引入 `loguru` 进行日志记录,支持文件输出和日志级别控制
- [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理 - [x] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot
- [ ] **权限系统**: 实现基础的权限管理(如超级管理员、群管理员等) - [x] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理
## 📂 项目结构 ## 📂 项目结构
``` ```
NEO/ .
├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载) ├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载)
│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令 │ ├── admin.py # 管理员插件
│ ├── forward_test.py # 示例插件:演示合并转发消息的构建和发送 │ ├── code_py.py # Python 代码执行插件
│ ├── jrcd.py # 娱乐插件:提供 /jrcd 和 /bbcd 指令 │ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令
── thpic.py # 图片插件:提供 /thpic 指令,发送随机东方图片 ── forward_test.py # 示例插件:演示合并转发消息
├── core/ # 核心框架代码 │ ├── jrcd.py # 娱乐插件:/jrcd 和 /bbcd
── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI) ── thpic.py # 图片插件:/thpic
├── __init__.py ├── core/ # 核心框架代码
│ ├── account.py │ ├── api/ # API 模块抽象层
│ ├── base.py │ ├── bot.py # Bot 实例与 API 封装
│ ├── friend.py │ ├── admin_manager.py # 管理员管理模块
│ ├── group.py │ ├── command_manager.py # 命令与事件分发器
│ └── message.py ├── config_loader.py # 配置加载器
│ ├── bot.py # Bot API 封装,提供 send_group_msg 等方法 │ ├── event_handler.py # 事件处理器
│ ├── command_manager.py # 命令与事件分发 │ ├── executor.py # 插件执行
│ ├── config_loader.py # 配置加载器 │ ├── logger.py # 日志系统
│ ├── plugin_manager.py # 插件加载与管理 │ ├── permission_manager.py # 权限管理
│ ├── redis_manager.py # Redis 连接管理 │ ├── plugin_manager.py # 插件加载与管理
── ws.py # WebSocket 客户端核心 ── redis_manager.py # Redis 连接管理器
├── models/ # 数据模型 │ └── ws.py # WebSocket 客户端核心
│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta) ├── data/ # 数据存储目录
│ ├── message.py # 消息段定义 (MessageSegment) │ ├── admin.json # 管理员配置文件
│ └── sender.py # 发送者定义 (Sender) │ └── permissions.json # 权限数据
├── config.toml # 配置文件 ├── html/ # HTML 静态文件
├── main.py # 启动入口(包含热重载监控) │ ├── 404.html
└── requirements.txt # 项目依赖 │ └── index.html
├── models/ # 数据模型
│ ├── events/ # OneBot 事件定义
│ ├── message.py # 消息段定义
│ ├── objects.py # API 返回对象定义
│ └── sender.py # 发送者定义
├── .gitignore
├── config.toml # 配置文件
├── main.py # 启动入口(包含热重载监控)
└── requirements.txt # 项目依赖
``` ```
### 目录结构详细说明
#### `plugins/` - 插件目录
- **功能**存放:所有机器人插件,支持热重载机制
- **加载机制**:框架会自动扫描此目录下的所有 `.py` 文件,并作为插件加载
- **插件约定**:每个插件文件应包含 `__plugin_meta__` 字典用于插件元数据定义
- **热重载**:开发过程中修改插件文件会自动触发重载,无需重启机器人
- **内置插件**
- `admin.py` - 管理员管理插件,支持动态添加/移除管理员
- `code_py.py` - Python 代码执行插件,支持安全的代码执行环境
- `echo.py` - 示例插件,演示基本指令处理
- `forward_test.py` - 合并转发消息演示插件
- `jrcd.py` - 娱乐插件,提供 `/jrcd``/bbcd` 指令
- `thpic.py` - 图片插件,提供 `/thpic` 指令返回东方Project图片
#### `core/` - 核心框架代码
- `api/` - API 模块抽象层
- `base.py` - API 基类定义
- `message.py` - 消息相关 API 封装
- `group.py` - 群组管理 API 封装
- `friend.py` - 好友相关 API 封装
- `account.py` - 账号相关 API 封装
- `bot.py` - Bot 核心类,通过 Mixin 模式继承所有 API 功能,提供统一的调用接口
- `admin_manager.py` - 管理员管理模块,负责管理员的添加、移除和权限验证
- `command_manager.py` - 命令与事件分发器,负责注册和处理所有指令和事件
- `config_loader.py` - 配置加载器,读取和解析 `config.toml` 配置文件
- `event_handler.py` - 事件处理器,负责将原始事件转换为类型化事件对象
- `executor.py` - 插件执行器,提供线程池执行环境用于执行同步任务
- `logger.py` - 日志系统,基于 `loguru` 提供高性能日志记录
- `permission_manager.py` - 权限管理器管理用户权限级别admin、op、user
- `plugin_manager.py` - 插件加载与管理,负责插件的扫描、加载和热重载
- `redis_manager.py` - Redis 连接管理器,提供异步 Redis 客户端连接池
- `ws.py` - WebSocket 客户端核心,负责与 OneBot 实现端建立和管理连接
#### `data/` - 数据存储目录
- `admin.json` - 管理员配置文件,存储全局管理员列表
- `permissions.json` - 权限数据文件,存储用户权限映射关系
#### `html/` - HTML 静态文件
- `404.html` - 404 错误页面
- `index.html` - 项目主页 HTML 文件,展示项目信息和特性
#### `models/` - 数据模型定义
- `events/` - OneBot 事件定义
- `base.py` - 事件基类定义
- `message.py` - 消息事件定义
- `notice.py` - 通知事件定义
- `request.py` - 请求事件定义
- `meta.py` - 元事件定义
- `factory.py` - 事件工厂类,用于根据 JSON 数据创建对应事件对象
- `message.py` - 消息段定义,支持文本、图片、表情等多种消息类型
- `objects.py` - API 返回对象定义,提供强类型化的 API 响应数据模型
- `sender.py` - 发送者定义,包含用户、群成员等信息
#### 根目录文件
- `.gitignore` - Git 忽略文件配置
- `config.toml` - 主配置文件,包含 WebSocket 连接、机器人指令前缀、Redis 连接等配置
- `main.py` - 程序入口文件,负责初始化插件、启动热重载监控和建立 WebSocket 连接
- `requirements.txt` - Python 依赖包列表
## 🚀 快速开始 ## 🚀 快速开始
### 1. 环境准备 ### 1. 环境准备
@@ -207,13 +276,13 @@ python main.py
项目集成了 `watchdog` 文件监控。在开发过程中,你只需要: 项目集成了 `watchdog` 文件监控。在开发过程中,你只需要:
1. 保持 `main.py` 运行。 1. 保持 `main.py` 运行。
2. 修改或新建 `base_plugins` 目录下的 `.py` 插件文件。 2. 修改或新建 `plugins` 目录下的 `.py` 插件文件。
3. 保存文件。 3. 保存文件。
4. 控制台会自动提示 `[HotReload] 插件重载完成`,新的逻辑立即生效。 4. 控制台会自动提示 `[HotReload] 插件重载完成`,新的逻辑立即生效。
### 创建新插件 ### 创建新插件
`base_plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。 `plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。
### 示例代码 ### 示例代码
@@ -333,7 +402,7 @@ async def get_group_info_legacy(bot: Bot, event: MessageEvent, args: list[str]):
**示例:** **示例:**
```python ```python
# base_plugins/echo.py # plugins/echo.py
__plugin_meta__ = { __plugin_meta__ = {
"name": "回声与交互", "name": "回声与交互",
@@ -407,10 +476,184 @@ async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]):
except Exception as e: except Exception as e:
await event.reply(f"执行失败:{str(e)}") await event.reply(f"执行失败:{str(e)}")
# 记录日志 # 记录日志
import logging from core.logger import logger
logging.error(f"插件执行错误:{e}", exc_info=True) logger.error(f"插件执行错误:{e}", exc_info=True)
``` ```
### 处理同步阻塞操作
为了保持机器人的响应性,所有可能导致长时间阻塞的同步操作都应该在单独的线程池中执行。框架提供了 `run_in_thread_pool` 函数来简化这一过程。
**示例:执行同步阻塞任务**
```python
from core.command_manager import matcher
from core.bot import Bot
from models import MessageEvent
from core.executor import run_in_thread_pool
import time
# 模拟一个耗时的同步操作
def blocking_task(duration: int):
time.sleep(duration)
return f"阻塞任务完成,耗时 {duration}"
@matcher.command("block_test")
async def handle_blocking_test(bot: Bot, event: MessageEvent, args: list[str]):
if not args or not args[0].isdigit():
await event.reply("请提供一个数字作为阻塞时间(秒)。例如:/block_test 5")
return
duration = int(args[0])
await event.reply(f"开始执行阻塞任务,耗时 {duration} 秒...")
# 将同步阻塞任务放入线程池执行
result = await run_in_thread_pool(blocking_task, duration)
await event.reply(result)
```
### 权限管理
框架内置了基于用户角色的权限管理系统,支持 `admin`(超级管理员)、`op`(操作员)、`user`(普通用户)三个权限级别。权限数据存储在 `data/permissions.json` 文件中。
#### 权限级别说明
- **admin**:最高权限,可以执行所有管理命令,包括添加/移除其他管理员
- **op**:操作员权限,可以执行大部分管理命令,但不能修改管理员列表
- **user**:普通用户权限,只能使用基础功能
#### 在插件中使用权限控制
注册命令时可以通过 `permission` 参数指定所需权限级别:
```python
from models import MessageEvent
# 只有管理员可以执行此命令
@matcher.command("admin_only", permission=MessageEvent.ADMIN)
async def admin_command(bot: Bot, event: MessageEvent, args: list[str]):
await event.reply("此命令仅限管理员使用")
# 操作员及以上权限可以执行
@matcher.command("op_only", permission=MessageEvent.OP)
async def op_command(bot: Bot, event: MessageEvent, args: list[str]):
await event.reply("此命令需要操作员权限")
# 所有用户都可以执行(默认)
@matcher.command("public")
async def public_command(bot: Bot, event: MessageEvent, args: list[str]):
await event.reply("所有用户都可以使用此命令")
```
#### 动态权限检查
如果需要更复杂的权限逻辑,可以使用 `override_permission_check=True` 参数,然后在函数中手动检查权限:
```python
@matcher.command(
"special",
permission=MessageEvent.OP,
override_permission_check=True
)
async def special_command(bot: Bot, event: MessageEvent, permission_granted: bool):
if not permission_granted:
await event.reply("权限不足!")
return
# 额外的权限逻辑
if event.user_id == 123456:
await event.reply("特殊用户,允许执行")
else:
await event.reply("普通用户,拒绝执行")
```
### 使用 Redis 进行数据缓存
框架集成了 Redis 客户端提供了便捷的异步接口用于数据缓存和持久化。Redis 连接管理器会自动管理连接池,你可以在插件中直接使用。
#### 基本用法
```python
from core.redis_manager import redis_manager
@matcher.command("cache")
async def cache_example(bot: Bot, event: MessageEvent, args: list[str]):
# 设置缓存
await redis_manager.set("user:123:name", "张三")
# 获取缓存
name = await redis_manager.get("user:123:name")
# 设置带过期时间的缓存(单位:秒)
await redis_manager.setex("temp:data", 3600, "临时数据")
# 删除缓存
await redis_manager.delete("user:123:name")
await event.reply(f"用户名:{name}")
```
#### 使用哈希表Hash
```python
# 设置哈希字段
await redis_manager.hset("user:123", "age", 20)
await redis_manager.hset("user:123", "city", "北京")
# 获取哈希字段
age = await redis_manager.hget("user:123", "age")
user_data = await redis_manager.hgetall("user:123")
# 删除哈希字段
await redis_manager.hdel("user:123", "city")
```
#### 使用列表List
```python
# 向列表添加元素
await redis_manager.lpush("recent:actions", "login")
await redis_manager.rpush("recent:actions", "logout")
# 获取列表范围
actions = await redis_manager.lrange("recent:actions", 0, 9)
# 获取列表长度
length = await redis_manager.llen("recent:actions")
```
### 插件数据管理
对于需要持久化存储配置或数据的插件,框架提供了 `PluginDataManager` 类,可以方便地管理 JSON 格式的数据文件。
#### 基本用法
```python
from core.plugin_manager import PluginDataManager
# 初始化数据管理器
data_manager = PluginDataManager("weather_plugin")
@matcher.command("weather_set")
async def set_weather_config(bot: Bot, event: MessageEvent, args: list[str]):
if len(args) < 2:
await event.reply("用法:/weather_set <城市> <温度>")
return
city = args[0]
temperature = args[1]
# 保存配置
await data_manager.set(city, temperature)
await event.reply(f"已设置 {city} 的温度为 {temperature}")
@matcher.command("weather_get")
async def get_weather_config(bot: Bot, event: MessageEvent, args: list[str]):
if not args:
await event.reply("用法:/weather_get <城市>")
return
city = args[0]
# 读取配置
temperature = data_manager.get(city)
if temperature:
await event.reply(f"{city} 的温度是 {temperature}")
else:
await event.reply(f"未找到 {city} 的温度配置")
```
#### 数据文件位置
插件数据文件保存在 `plugins/data/` 目录下,每个插件对应一个独立的 JSON 文件。例如 `weather_plugin` 插件的数据文件为 `plugins/data/weather_plugin.json`
### 插件开发最佳实践 ### 插件开发最佳实践
1. **单一职责**:每个插件专注于一个功能领域 1. **单一职责**:每个插件专注于一个功能领域
2. **错误处理**:妥善处理可能发生的异常 2. **错误处理**:妥善处理可能发生的异常
@@ -530,97 +773,290 @@ async def welcome_new_member(bot: Bot, event):
## 📚 事件模型说明 ## 📚 事件模型说明
项目采用了基于工厂模式的事件处理系统,所有事件定义在 `models/events/` 下: NEO 框架的事件模型是基于 OneBot v11 协议的强类型数据模型,采用 `dataclasses` 和类型注解构建。所有事件都继承自 `OneBotEvent` 基类,并通过事件工厂自动从 JSON 数据创建对应的事件对象。
* **MessageEvent**: 消息事件,包含 `PrivateMessageEvent` 和 `GroupMessageEvent`。支持 `await event.reply()` 快速回复。 ### 事件层次结构
* **NoticeEvent**: 通知事件,如 `FriendAddNoticeEvent`, `GroupRecallNoticeEvent` 等。
* **RequestEvent**: 请求事件,如 `FriendRequestEvent`, `GroupRequestEvent`。
* **MetaEvent**: 元事件,如心跳 `HeartbeatEvent`。
所有事件均继承自 `OneBotEvent`,并包含 `bot` 属性用于调用 API。
## 🏗️ 技术架构
NEO 框架采用分层架构设计,各层职责明确,便于维护和扩展:
### 架构层次
1. **通信层 (WebSocket Client)**
- 负责与 OneBot 实现端的 Web Socket连接
- 实现断线自动重连机制
- 处理原始消息的收发和协议解析
2. **API 抽象层 (API Mixins)**
- 提供类型安全的 API 封装
- 按功能领域划分:消息、群组、好友、账号
- 所有 API 返回强类型数据模型对象
3. **业务逻辑层 (Bot & Command Manager)**
- Bot 类组合所有 API 功能
- 指令和事件分发器
- 插件加载和管理
4. **插件层 (Plugins)**
- 支持热重载的插件系统
- 装饰器风格的注册方式
- 独立的业务逻辑模块
5. **数据模型层 (Models)**
- 基于 dataclasses 的事件模型
- API 响应数据模型
- 类型安全的序列化/反序列化
### 核心组件交互
``` ```
┌─────────────────────────────────────┐ OneBotEvent (抽象基类)
│ 插件层 (Plugins) │ ├── MetaEvent (元事件)
@matcher.command() │ ├── HeartbeatEvent (心跳事件)
@matcher.on_notice() │ └── LifeCycleEvent (生命周期事件)
└──────────────┬──────────────────────┘ ├── MessageEvent (消息事件)
├── PrivateMessageEvent (私聊消息事件)
┌──────────────▼──────────────────────┐ │ └── GroupMessageEvent (群聊消息事件)
│ 业务逻辑层 (Command Manager) │ ├── NoticeEvent (通知事件)
• 事件分发与路由 │ ├── FriendAddNoticeEvent (好友添加通知)
• 指令参数解析 │ ├── FriendRecallNoticeEvent (好友消息撤回通知)
└──────────────┬──────────────────────┘ │ ├── GroupRecallNoticeEvent (群消息撤回通知)
│ ├── GroupIncreaseNoticeEvent (群成员增加通知)
┌──────────────▼──────────────────────┐ │ ├── GroupDecreaseNoticeEvent (群成员减少通知)
Bot 组合类 │ ├── GroupAdminNoticeEvent (群管理员变动通知)
• 继承所有 API Mixin │ ├── GroupBanNoticeEvent (群禁言通知)
• 提供统一接口 │ ├── GroupUploadNoticeEvent (群文件上传通知)
└──────────────┬──────────────────────┘ │ ├── PokeNotifyEvent (戳一戳通知)
├── LuckyKingNotifyEvent (运气王通知)
┌──────────────▼──────────────────────┐ │ ├── HonorNotifyEvent (群荣誉变更通知)
API 抽象层 (Mixin) │ ├── GroupCardNoticeEvent (群成员名片更新通知)
• MessageAPI │ ├── OfflineFileNoticeEvent (离线文件通知)
• GroupAPI │ ├── ClientStatusNoticeEvent (客户端状态变更通知)
• FriendAPI │ └── EssenceNoticeEvent (精华消息变动通知)
│ • AccountAPI │ └── RequestEvent (请求事件)
└──────────────┬──────────────────────┘ ├── FriendRequestEvent (加好友请求)
└── GroupRequestEvent (加群请求/邀请)
┌──────────────▼──────────────────────┐
│ 通信层 (WebSocket) │
│ • 连接管理 │
│ • 消息编解码 │
│ • 断线重连 │
└─────────────────────────────────────┘
``` ```
### 设计模式应用 ### 事件基类OneBotEvent
- **工厂模式**:事件对象的创建和管理 所有事件的基类,定义了事件的通用属性和方法:
- **装饰器模式**:插件和指令的注册
- **组合模式**Bot 类通过继承组合 API 功能
- **观察者模式**:事件监听和处理
- **策略模式**:不同的消息处理策略
### 性能特点 ```python
@dataclass(slots=True)
class OneBotEvent(ABC):
"""
OneBot v11 事件的抽象基类。
Attributes:
time (int): 事件发生的时间戳 (秒)
self_id (int): 收到事件的机器人 QQ 号
_bot (Optional[Bot]): 内部持有的 Bot 实例引用
"""
time: int
self_id: int
_bot: Optional["Bot"] = field(default=None, init=False)
@property
@abstractmethod
def post_type(self) -> str:
"""事件的上报类型,子类必须重写此属性"""
pass
@property
def bot(self) -> "Bot":
"""获取与此事件关联的 Bot 实例"""
if self._bot is None:
raise ValueError("Bot instance not set for this event")
return self._bot
@bot.setter
def bot(self, value: "Bot"):
"""为事件对象设置关联的 Bot 实例"""
self._bot = value
```
- **异步非阻塞**:全面基于 asyncio支持高并发 ### 事件类型常量
- **内存高效**:事件和模型对象使用 dataclasses内存占用小
- **快速响应**:插件热重载和事件分发机制确保快速响应
- **可扩展性**:模块化设计便于功能扩展和定制
--- 框架定义了完整的事件类型常量,用于标识不同种类的事件:
*Internal Use Only - DOGSOHA ond baby2016 by Fairy-Oracle-Sanctuary*
```python
class EventType:
META = 'meta_event' # 元事件:心跳、生命周期等
REQUEST = 'request ' # 请求事件:加好友请求、加群请求等
NOTICE = 'notice' # 通知事件:群成员增加、文件上传等
MESSAGE = 'message' # 消息事件:私聊消息、群消息等
MESSAGE_SENT = 'message_sent' # 消息发送事件:机器人自己发送消息的上报
```
### 消息事件
消息事件是机器人最常处理的事件类型,框架提供了完整的消息段支持和便捷的回复方法:
#### MessageEvent (消息事件基类)
```python
@dataclass
class MessageEvent(OneBotEvent):
message_type: str # 消息类型: private (私聊), group (群聊)
sub_type: str # 消息子类型
message_id: int # 消息 ID
user_id: int # 发送者 QQ 号
message: List[MessageSegment] # 消息内容列表
raw_message: str # 原始消息内容
font: int # 字体
sender: Optional[Sender] # 发送者信息
@property
def post_type(self) -> str:
return EventType.MESSAGE
async def reply(self, message: str, auto_escape: bool = False):
"""回复消息(抽象方法,由子类实现)"""
raise NotImplementedError
```
#### PrivateMessageEvent (私聊消息事件)
```python
@dataclass
class PrivateMessageEvent(MessageEvent):
async def reply(self, message: str, auto_escape: bool = False):
"""回复私聊消息"""
await self.bot.send_private_msg(
user_id=self.user_id, message=message, auto_escape=auto_escape
)
```
#### GroupMessageEvent (群聊消息事件)
```python
@dataclass
class GroupMessageEvent(MessageEvent):
group_id: int = 0 # 群号
anonymous: Optional[Anonymous] = None # 匿名信息
async def reply(self, message: str, auto_escape: bool = False):
"""回复群聊消息"""
await self.bot.send_group_msg(
group_id=self.group_id, message=message, auto_escape=auto_escape
)
```
### 通知事件
通知事件用于处理各种系统通知,如群成员变动、文件上传等:
#### 常用通知事件示例
```python
@dataclass
class GroupIncreaseNoticeEvent(GroupNoticeEvent):
"""群成员增加通知"""
operator_id: int = 0 # 操作者 QQ 号
sub_type: str = "" # 子类型: approve (管理员同意入群), invite (管理员邀请入群)
@dataclass
class GroupRecallNoticeEvent(GroupNoticeEvent):
"""群消息撤回通知"""
operator_id: int = 0 # 操作者 QQ 号
message_id: int = 0 # 被撤回的消息 ID
@dataclass
class PokeNotifyEvent(NotifyNoticeEvent):
"""戳一戳通知"""
target_id: int = 0 # 被戳者 QQ 号
group_id: int = 0 # 群号 (如果是群内戳一戳)
```
### 请求事件
请求事件用于处理用户的主动请求,如加好友、加群等:
```python
@dataclass
class FriendRequestEvent(RequestEvent):
"""加好友请求事件"""
user_id: int = 0 # 发送请求的 QQ 号
comment: str = "" # 验证信息
flag: str = "" # 请求 flag用于 API 调用
@dataclass
class GroupRequestEvent(RequestEvent):
"""加群请求/邀请事件"""
sub_type: str = "" # 子类型: add (加群请求), invite (邀请登录号入群)
group_id: int = 0 # 群号
user_id: int = 0 # 发送请求的 QQ 号
comment: str = "" # 验证信息
flag: str = "" # 请求 flag用于 API 调用
```
### 元事件
元事件用于处理框架自身状态变化,如心跳、生命周期等:
```python
@dataclass
class HeartbeatEvent(MetaEvent):
"""心跳事件,用于确认连接状态"""
meta_event_type: str = 'heartbeat'
status: HeartbeatStatus = field(default_factory=HeartbeatStatus)
interval: int = 0 # 心跳间隔时间(ms)
@dataclass
class LifeCycleEvent(MetaEvent):
"""生命周期事件,用于通知框架生命周期变化"""
meta_event_type: str = 'lifecycle'
sub_type: LifeCycleSubType = LifeCycleSubType.ENABLE # 子类型: enable, disable, connect
```
### 事件工厂EventFactory
事件工厂是框架的核心组件之一,负责将原始 JSON 数据转换为强类型的事件对象:
```python
class EventFactory:
@staticmethod
def create_event(data: Dict[str, Any]) -> OneBotEvent:
"""根据数据创建事件对象"""
post_type = data.get("post_type")
if post_type == EventType.MESSAGE or post_type == EventType.MESSAGE_SENT:
return EventFactory._create_message_event(data, common_args)
elif post_type == EventType.NOTICE:
return EventFactory._create_notice_event(data, common_args)
elif post_type == EventType.REQUEST:
return EventFactory._create_request_event(data, common_args)
elif post_type == EventType.META:
return EventFactory._create_meta_event(data, common_args)
else:
raise ValueError(f"Unknown event type: {post_type}")
```
### 在插件中使用事件
插件可以直接使用这些事件类型来处理各种场景:
```python
from core.command_manager import matcher
from core.bot import Bot
from models import GroupMessageEvent, PrivateMessageEvent
from models.events.notice import GroupIncreaseNoticeEvent
from models.events.request import FriendRequestEvent
# 处理群消息事件
@matcher.command("hello")
async def handle_hello(bot: Bot, event: GroupMessageEvent, args: list[str]):
await event.reply(f"你好 {event.sender.nickname}")
# 处理私聊消息事件
@matcher.command("help", permission_level=MessageEvent.USER)
async def handle_help(bot: Bot, event: PrivateMessageEvent, args: list[str]):
await event.reply("这里是帮助信息...")
# 处理群成员增加通知
@matcher.on_notice("group_increase")
async def handle_group_increase(bot: Bot, event: GroupIncreaseNoticeEvent):
await bot.send_group_msg(
event.group_id,
f"欢迎新成员 {event.user_id} 加入!操作者:{event.operator_id}"
)
# 处理加好友请求
@matcher.on_request("friend")
async def handle_friend_request(bot: Bot, event: FriendRequestEvent):
# 自动同意所有好友请求
await bot.set_friend_add_request(flag=event.flag, approve=True)
await bot.send_private_msg(event.user_id, "已通过您的好友请求!")
```
### 事件处理的优势
1. **类型安全**所有事件都有明确的类型定义IDE 可以提供完整的代码提示和补全
2. **易于测试**:事件对象可以轻松构造,便于编写单元测试
3. **数据完整**:所有字段都有类型注解,确保数据的一致性和完整性
4. **性能优化**:使用 `@dataclass(slots=True)` 减少内存占用,提高属性访问速度
5. **可扩展性**:可以轻松定义自定义事件类型,扩展框架功能
### 常用事件属性速查
| 事件类型 | 关键属性 | 描述 |
|---------|---------|------|
| **MessageEvent** | `message_type`, `user_id`, `message`, `sender` | 所有消息事件的基类 |
| **PrivateMessageEvent** | 继承自 MessageEvent | 私聊消息事件 |
| **GroupMessageEvent** | `group_id`, `anonymous` | 群聊消息事件,包含群号和匿名信息 |
| **GroupIncreaseNoticeEvent** | `group_id`, `user_id`, `operator_id`, `sub_type` | 群成员增加通知 |
| **RecallGroupNoticeEvent** | `group_id`, `user_id`, `operator_id`, `message_id` | 群消息撤回通知 |
| **FriendRequestEvent** | `user_id`, `comment`, `flag` | 加好友请求事件 |
| **GroupRequestEvent** | `group_id`, `user_id`, `sub_type`, `comment`, `flag` | 加群请求/邀请事件 |
| **HeartbeatEvent** | `status`, `interval` | 心跳事件,用于监控连接状态 |
通过这套完整的事件模型NEO 框架为开发者提供了强大而灵活的事件处理能力,同时保持了代码的类型安全和良好的开发体验。

166
core/admin_manager.py Normal file
View File

@@ -0,0 +1,166 @@
"""
管理员管理器模块
该模块负责管理机器人的管理员列表。
它实现了文件和 Redis 缓存之间的数据同步,并提供了一套清晰的 API
供其他模块调用。
"""
import json
import os
from typing import Set
from .logger import logger
class AdminManager:
"""
管理员管理器类
负责加载、缓存和管理管理员列表。
使用单例模式,确保全局只有一个实例。
"""
_instance = None
_REDIS_KEY = "neobot:admins" # 用于存储管理员集合的 Redis 键
def __new__(cls):
"""
单例模式实现
"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""
初始化 AdminManager
"""
if getattr(self, "_initialized", False):
return
# 管理员数据文件路径
self.data_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"data",
"admin.json"
)
self._admins: Set[int] = set()
self._initialized = True
logger.info("管理员管理器初始化完成")
async def initialize(self):
"""
异步初始化,加载数据并同步到 Redis
"""
await self._load_from_file()
await self._sync_to_redis()
logger.info("管理员数据加载并同步到 Redis 完成")
async def _load_from_file(self):
"""
从 admin.json 加载管理员列表
"""
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = json.load(f)
admins = data.get("admins", [])
self._admins = set(int(admin_id) for admin_id in admins)
logger.debug(f"{self.data_file} 加载了 {len(self._admins)} 位管理员")
else:
# 如果文件不存在,创建一个空的
self._admins = set()
await self._save_to_file()
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"加载或解析 admin.json 失败: {e}")
self._admins = set()
async def _save_to_file(self):
"""
将当前管理员列表保存回 admin.json
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
# 将 set 转换为 list 以便 JSON 序列化
admin_list = [str(admin_id) for admin_id in self._admins]
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump({"admins": admin_list}, f, indent=2, ensure_ascii=False)
logger.debug(f"管理员列表已保存到 {self.data_file}")
except Exception as e:
logger.error(f"保存 admin.json 失败: {e}")
async def _sync_to_redis(self):
"""
将内存中的管理员集合同步到 Redis
"""
from .redis_manager import redis_manager
try:
# 首先清空旧的集合
await redis_manager.redis.delete(self._REDIS_KEY)
if self._admins:
# 将所有管理员ID添加到集合中
await redis_manager.redis.sadd(self._REDIS_KEY, *self._admins)
logger.debug(f"已将 {len(self._admins)} 位管理员同步到 Redis")
except Exception as e:
logger.error(f"同步管理员到 Redis 失败: {e}")
async def is_admin(self, user_id: int) -> bool:
"""
检查用户是否为管理员(从 Redis 缓存读取)
"""
from .redis_manager import redis_manager
try:
return await redis_manager.redis.sismember(self._REDIS_KEY, user_id)
except Exception as e:
logger.error(f"从 Redis 检查管理员权限失败: {e}")
# Redis 失败时,回退到内存检查
return user_id in self._admins
async def add_admin(self, user_id: int) -> bool:
"""
添加管理员,并同步到文件和 Redis
"""
from .redis_manager import redis_manager
if user_id in self._admins:
return False # 用户已经是管理员
self._admins.add(user_id)
await self._save_to_file()
try:
await redis_manager.redis.sadd(self._REDIS_KEY, user_id)
logger.info(f"已添加新管理员 {user_id} 并更新缓存")
return True
except Exception as e:
logger.error(f"添加管理员 {user_id} 到 Redis 失败: {e}")
return False
async def remove_admin(self, user_id: int) -> bool:
"""
移除管理员,并同步到文件和 Redis
"""
from .redis_manager import redis_manager
if user_id not in self._admins:
return False # 用户不是管理员
self._admins.remove(user_id)
await self._save_to_file()
try:
await redis_manager.redis.srem(self._REDIS_KEY, user_id)
logger.info(f"已移除管理员 {user_id} 并更新缓存")
return True
except Exception as e:
logger.error(f"从 Redis 移除管理员 {user_id} 失败: {e}")
return False
async def get_all_admins(self) -> Set[int]:
"""
获取所有管理员的集合
"""
return self._admins.copy()
# 全局 AdminManager 实例
admin_manager = AdminManager()

View File

@@ -8,7 +8,7 @@ import json
from typing import Dict, Any from typing import Dict, Any
from .base import BaseAPI from .base import BaseAPI
from models.objects import LoginInfo, VersionInfo, Status from models.objects import LoginInfo, VersionInfo, Status
from core.redis_manager import redis_client as redis_manager from core.redis_manager import redis_manager
class AccountAPI(BaseAPI): class AccountAPI(BaseAPI):

View File

@@ -8,7 +8,7 @@ import json
from typing import List, Dict, Any from typing import List, Dict, Any
from .base import BaseAPI from .base import BaseAPI
from models.objects import FriendInfo, StrangerInfo from models.objects import FriendInfo, StrangerInfo
from core.redis_manager import redis_client as redis_manager from core.redis_manager import redis_manager
class FriendAPI(BaseAPI): class FriendAPI(BaseAPI):
@@ -42,12 +42,12 @@ class FriendAPI(BaseAPI):
""" """
cache_key = f"neobot:cache:get_stranger_info:{user_id}" cache_key = f"neobot:cache:get_stranger_info:{user_id}"
if not no_cache: if not no_cache:
cached_data = await redis_manager.get(cache_key) cached_data = await redis_manager.redis.get(cache_key)
if cached_data: if cached_data:
return StrangerInfo(**json.loads(cached_data)) return StrangerInfo(**json.loads(cached_data))
res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache})
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return StrangerInfo(**res) return StrangerInfo(**res)
async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]: async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]:
@@ -62,12 +62,12 @@ class FriendAPI(BaseAPI):
""" """
cache_key = f"neobot:cache:get_friend_list:{self.self_id}" cache_key = f"neobot:cache:get_friend_list:{self.self_id}"
if not no_cache: if not no_cache:
cached_data = await redis_manager.get(cache_key) cached_data = await redis_manager.redis.get(cache_key)
if cached_data: if cached_data:
return [FriendInfo(**item) for item in json.loads(cached_data)] return [FriendInfo(**item) for item in json.loads(cached_data)]
res = await self.call_api("get_friend_list") res = await self.call_api("get_friend_list")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return [FriendInfo(**item) for item in res] return [FriendInfo(**item) for item in res]
async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]: async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]:

View File

@@ -6,7 +6,7 @@
""" """
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
import json import json
from core.redis_manager import redis_client as redis_manager from core.redis_manager import redis_manager
from .base import BaseAPI from .base import BaseAPI
from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo
@@ -178,12 +178,12 @@ class GroupAPI(BaseAPI):
""" """
cache_key = f"neobot:cache:get_group_info:{group_id}" cache_key = f"neobot:cache:get_group_info:{group_id}"
if not no_cache: if not no_cache:
cached_data = await redis_manager.get(cache_key) cached_data = await redis_manager.redis.get(cache_key)
if cached_data: if cached_data:
return GroupInfo(**json.loads(cached_data)) return GroupInfo(**json.loads(cached_data))
res = await self.call_api("get_group_info", {"group_id": group_id}) res = await self.call_api("get_group_info", {"group_id": group_id})
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return GroupInfo(**res) return GroupInfo(**res)
async def get_group_list(self) -> List[GroupInfo]: async def get_group_list(self) -> List[GroupInfo]:
@@ -210,12 +210,12 @@ class GroupAPI(BaseAPI):
""" """
cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}" cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}"
if not no_cache: if not no_cache:
cached_data = await redis_manager.get(cache_key) cached_data = await redis_manager.redis.get(cache_key)
if cached_data: if cached_data:
return GroupMemberInfo(**json.loads(cached_data)) return GroupMemberInfo(**json.loads(cached_data))
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id}) res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id})
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 await redis_manager.redis.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return GroupMemberInfo(**res) return GroupMemberInfo(**res)
async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]: async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]:

View File

@@ -4,19 +4,12 @@
该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心。 该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心。
它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和 它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和
请求事件处理器的能力。 请求事件处理器的能力。
主要职责:
- 提供 `@matcher.command()` 装饰器来注册命令。
- 提供 `@matcher.on_notice()` 装饰器来注册通知处理器。
- 提供 `@matcher.on_request()` 装饰器来注册请求处理器。
- 负责解析收到的消息,匹配命令前缀并分发给对应的处理器。
- 统一处理所有类型的事件,并将其分发给所有已注册的处理器。
- 内置一个 `/help` 命令,用于展示所有已加载插件的帮助信息。
""" """
import inspect from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Dict, List, Tuple
from .config_loader import global_config from .config_loader import global_config
from .event_handler import MessageHandler, NoticeHandler, RequestHandler
# 从配置中获取命令前缀 # 从配置中获取命令前缀
comm_prefixes = global_config.bot.get("command", ("/",)) comm_prefixes = global_config.bot.get("command", ("/",))
@@ -27,6 +20,7 @@ class CommandManager:
命令管理器,负责注册和分发所有类型的事件。 命令管理器,负责注册和分发所有类型的事件。
这是一个单例对象(`matcher`),在整个应用中共享。 这是一个单例对象(`matcher`),在整个应用中共享。
它将不同类型的事件处理委托给专门的处理器类。
""" """
def __init__(self, prefixes: Tuple[str, ...]): def __init__(self, prefixes: Tuple[str, ...]):
@@ -36,51 +30,91 @@ class CommandManager:
Args: Args:
prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。 prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。
""" """
# --- 初始化所有处理器列表 ---
self.prefixes = prefixes
self.commands: Dict[str, Callable] = {}
self.message_handlers: List[Callable] = []
self.notice_handlers: List[Dict] = []
self.request_handlers: List[Dict] = []
self.plugins: Dict[str, Dict[str, Any]] = {} self.plugins: Dict[str, Dict[str, Any]] = {}
# 初始化专门的事件处理器
self.message_handler = MessageHandler(prefixes)
self.notice_handler = NoticeHandler()
self.request_handler = RequestHandler()
# --- 注册内置指令 --- # 将处理器映射到事件类型
self.commands["help"] = self._help_command self.handler_map = {
"message": self.message_handler,
"notice": self.notice_handler,
"request": self.request_handler,
}
# 注册内置的 /help 命令
self._register_internal_commands()
def _register_internal_commands(self):
"""
注册框架内置的命令
"""
# Help 命令
self.message_handler.command("help")(self._help_command)
self.plugins["core.help"] = { self.plugins["core.help"] = {
"name": "帮助", "name": "帮助",
"description": "显示所有可用指令的帮助信息", "description": "显示所有可用指令的帮助信息",
"usage": "/help", "usage": "/help",
} }
# --- 装饰器代理 ---
def on_message(self) -> Callable: def on_message(self) -> Callable:
""" """
装饰器:用于注册一个通用的消息处理器。 装饰器:注册一个通用的消息处理器。
被此装饰器注册的函数,会在每次收到消息时(在指令匹配前)被调用。
如果函数返回 True则表示该消息已被“消费”后续的指令匹配将不会进行。
Example:
@matcher.on_message()
async def code_input_handler(bot, event):
if is_waiting_for_code(event.user_id):
await process_code(event.raw_message)
return True # 消费事件
""" """
def decorator(func: Callable) -> Callable: return self.message_handler.on_message()
self.message_handlers.append(func)
return func
return decorator
def command(
self,
name: str,
permission: Optional[Any] = None,
override_permission_check: bool = False
) -> Callable:
"""
装饰器:注册一个消息指令处理器。
"""
return self.message_handler.command(
name,
permission=permission,
override_permission_check=override_permission_check
)
def on_notice(self, notice_type: Optional[str] = None) -> Callable:
"""
装饰器:注册一个通知事件处理器。
"""
return self.notice_handler.register(notice_type=notice_type)
def on_request(self, request_type: Optional[str] = None) -> Callable:
"""
装饰器:注册一个请求事件处理器。
"""
return self.request_handler.register(request_type=request_type)
# --- 事件处理 ---
async def handle_event(self, bot, event):
"""
统一的事件分发入口。
根据事件的 `post_type` 将其分发给对应的处理器。
"""
if event.post_type == 'message' and global_config.bot.get('ignore_self_message', False):
if hasattr(event, 'user_id') and hasattr(event, 'self_id') and event.user_id == event.self_id:
return
handler = self.handler_map.get(event.post_type)
if handler:
await handler.handle(bot, event)
# --- 内置命令实现 ---
async def _help_command(self, bot, event): async def _help_command(self, bot, event):
""" """
内置的 `/help` 命令的实现。 内置的 `/help` 命令的实现。
该命令会遍历所有已加载插件的元数据,并生成一段格式化的帮助文本。
Args:
bot: Bot 实例。
event: 消息事件对象。
""" """
help_text = "--- 可用指令列表 ---\n" help_text = "--- 可用指令列表 ---\n"
@@ -95,187 +129,6 @@ class CommandManager:
await bot.send(event, help_text.strip()) await bot.send(event, help_text.strip())
def command(self, name: str) -> Callable:
"""
装饰器:用于注册一个消息指令处理器。
Example:
@matcher.command("echo")
async def handle_echo(bot, event, args):
await bot.send(event, " ".join(args))
Args:
name (str): 指令的名称(不包含命令前缀)。
Returns:
Callable: 原函数,使其可以继续被调用。
"""
def decorator(func: Callable) -> Callable:
self.commands[name] = func
return func
return decorator
def on_notice(self, notice_type: str = None) -> Callable:
"""
装饰器:用于注册一个通知事件处理器。
如果 `notice_type` 未指定,则该处理器会接收所有类型的通知事件。
Args:
notice_type (str, optional): 要处理的通知类型 (e.g., "group_increase")。
Defaults to None.
Returns:
Callable: 原函数。
"""
def decorator(func: Callable) -> Callable:
self.notice_handlers.append({"type": notice_type, "func": func})
return func
return decorator
def on_request(self, request_type: str = None) -> Callable:
"""
装饰器:用于注册一个请求事件处理器。
如果 `request_type` 未指定,则该处理器会接收所有类型的请求事件。
Args:
request_type (str, optional): 要处理的请求类型 (e.g., "friend", "group")。
Defaults to None.
Returns:
Callable: 原函数。
"""
def decorator(func: Callable) -> Callable:
self.request_handlers.append({"type": request_type, "func": func})
return func
return decorator
async def handle_event(self, bot, event):
"""
统一的事件分发入口。
由 `WS` 客户端在接收到事件后调用。该方法会根据事件的 `post_type`
将其分发给对应的具体处理方法。
Args:
bot: Bot 实例。
event: 已解析的事件对象。
"""
# --- 全局过滤机器人自身消息 ---
# 仅对消息事件生效
if event.post_type == 'message' and global_config.bot.get('ignore_self_message', False):
if hasattr(event, 'user_id') and hasattr(event, 'self_id') and event.user_id == event.self_id:
return
post_type = event.post_type
if post_type == 'message':
await self.handle_message(bot, event)
elif post_type == 'notice':
await self.handle_notice(bot, event)
elif post_type == 'request':
await self.handle_request(bot, event)
async def handle_message(self, bot, event):
"""
处理消息事件,优先执行通用处理器,然后解析并分发指令。
"""
# --- 1. 执行通用消息处理器 ---
for handler in self.message_handlers:
# 如果任何一个处理器返回 True则中断后续处理
consumed = await self._run_handler(handler, bot, event)
if consumed:
return
# --- 2. 检查并执行指令 ---
if not event.raw_message:
return
raw_text = event.raw_message.strip()
prefix_found = None
for p in self.prefixes:
if raw_text.startswith(p):
prefix_found = p
break
if not prefix_found:
return
full_cmd = raw_text[len(prefix_found) :].split()
if not full_cmd:
return
cmd_name = full_cmd[0]
args = full_cmd[1:]
if cmd_name in self.commands:
func = self.commands[cmd_name]
await self._run_handler(func, bot, event, args)
async def handle_notice(self, bot, event):
"""
分发通知事件给所有匹配的处理器。
Args:
bot: Bot 实例。
event: 通知事件对象。
"""
for handler in self.notice_handlers:
if handler["type"] is None or handler["type"] == event.notice_type:
await self._run_handler(handler["func"], bot, event)
async def handle_request(self, bot, event):
"""
分发请求事件给所有匹配的处理器。
Args:
bot: Bot 实例。
event: 请求事件对象。
"""
for handler in self.request_handlers:
if handler["type"] is None or handler["type"] == event.request_type:
await self._run_handler(handler["func"], bot, event)
async def _run_handler(self, func: Callable, bot, event, args: List[str] = None):
"""
智能执行事件处理器,并返回事件是否被消费。
该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参数
(如 `bot`, `event`, `args`),实现了依赖注入。
Args:
func (Callable): 目标处理器函数。
bot: Bot 实例。
event: 事件对象。
args (List[str], optional): 指令参数列表(仅对消息事件有效)。
Returns:
bool: 如果处理器函数返回 True则返回 True否则返回 False。
"""
sig = inspect.signature(func)
params = sig.parameters
kwargs = {}
if "bot" in params:
kwargs["bot"] = bot
if "event" in params:
kwargs["event"] = event
if "args" in params and args is not None:
kwargs["args"] = args
# 执行函数并获取返回值
result = await func(**kwargs)
return result is True
# --- 全局单例 --- # --- 全局单例 ---

197
core/event_handler.py Normal file
View File

@@ -0,0 +1,197 @@
"""
事件处理器模块
该模块定义了用于处理不同类型事件的处理器类。
每个处理器都负责注册和分发特定类型的事件。
"""
import inspect
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Tuple
from .bot import Bot
from .permission_manager import Permission, permission_manager
from .exceptions import SyncHandlerError
from .executor import run_in_thread_pool
class BaseHandler(ABC):
"""
事件处理器抽象基类
"""
def __init__(self):
self.handlers: List[Dict[str, Any]] = []
@abstractmethod
async def handle(self, bot: Bot, event: Any):
"""
处理事件
"""
raise NotImplementedError
async def _run_handler(
self,
func: Callable,
bot: Bot,
event: Any,
args: Optional[List[str]] = None,
permission_granted: Optional[bool] = None
):
"""
智能执行事件处理器,并注入所需参数
"""
sig = inspect.signature(func)
params = sig.parameters
kwargs = {}
if "bot" in params:
kwargs["bot"] = bot
if "event" in params:
kwargs["event"] = event
if "args" in params and args is not None:
kwargs["args"] = args
if "permission_granted" in params and permission_granted is not None:
kwargs["permission_granted"] = permission_granted
if inspect.iscoroutinefunction(func):
result = await func(**kwargs)
else:
# 如果是同步函数,则放入线程池执行
result = await run_in_thread_pool(func, **kwargs)
return result is True
class MessageHandler(BaseHandler):
"""
消息事件处理器
"""
def __init__(self, prefixes: Tuple[str, ...]):
super().__init__()
self.prefixes = prefixes
self.commands: Dict[str, Dict] = {}
self.message_handlers: List[Callable] = []
def on_message(self) -> Callable:
"""
注册通用消息处理器
"""
def decorator(func: Callable) -> Callable:
if not inspect.iscoroutinefunction(func):
raise SyncHandlerError(f"消息处理器 {func.__name__} 必须是异步函数 (async def).")
self.message_handlers.append(func)
return func
return decorator
def command(
self,
name: str,
permission: Optional[Permission] = None,
override_permission_check: bool = False
) -> Callable:
"""
注册命令处理器
"""
def decorator(func: Callable) -> Callable:
if not inspect.iscoroutinefunction(func):
raise SyncHandlerError(f"命令处理器 {func.__name__} 必须是异步函数 (async def).")
self.commands[name] = {
"func": func,
"permission": permission,
"override_permission_check": override_permission_check,
}
return func
return decorator
async def handle(self, bot: Bot, event: Any):
"""
处理消息事件,包括通用消息和命令
"""
for handler in self.message_handlers:
consumed = await self._run_handler(handler, bot, event)
if consumed:
return
if not event.raw_message:
return
raw_text = event.raw_message.strip()
prefix_found = next((p for p in self.prefixes if raw_text.startswith(p)), None)
if not prefix_found:
return
full_cmd = raw_text[len(prefix_found):].split()
if not full_cmd:
return
cmd_name = full_cmd[0]
args = full_cmd[1:]
if cmd_name in self.commands:
command_info = self.commands[cmd_name]
func = command_info["func"]
permission = command_info.get("permission")
override_check = command_info.get("override_permission_check", False)
permission_granted = True
if permission:
permission_granted = await permission_manager.check_permission(event.user_id, permission)
if not permission_granted and not override_check:
await bot.send(event, f"权限不足,需要 {permission.name} 权限")
return
await self._run_handler(
func,
bot,
event,
args=args,
permission_granted=permission_granted
)
class NoticeHandler(BaseHandler):
"""
通知事件处理器
"""
def register(self, notice_type: Optional[str] = None) -> Callable:
"""
注册通知处理器
"""
def decorator(func: Callable) -> Callable:
if not inspect.iscoroutinefunction(func):
raise SyncHandlerError(f"通知处理器 {func.__name__} 必须是异步函数 (async def).")
self.handlers.append({"type": notice_type, "func": func})
return func
return decorator
async def handle(self, bot: Bot, event: Any):
"""
处理通知事件
"""
for handler in self.handlers:
if handler["type"] is None or handler["type"] == event.notice_type:
await self._run_handler(handler["func"], bot, event)
class RequestHandler(BaseHandler):
"""
请求事件处理器
"""
def register(self, request_type: Optional[str] = None) -> Callable:
"""
注册请求处理器
"""
def decorator(func: Callable) -> Callable:
if not inspect.iscoroutinefunction(func):
raise SyncHandlerError(f"请求处理器 {func.__name__} 必须是异步函数 (async def).")
self.handlers.append({"type": request_type, "func": func})
return func
return decorator
async def handle(self, bot: Bot, event: Any):
"""
处理请求事件
"""
for handler in self.handlers:
if handler["type"] is None or handler["type"] == event.request_type:
await self._run_handler(handler["func"], bot, event)

9
core/exceptions.py Normal file
View File

@@ -0,0 +1,9 @@
"""
自定义异常模块
"""
class SyncHandlerError(Exception):
"""
当尝试注册同步函数作为异步事件处理器时抛出此异常。
"""
pass

27
core/executor.py Normal file
View File

@@ -0,0 +1,27 @@
"""
线程池执行器
提供一个全局的线程池和异步接口,用于在事件循环中安全地运行同步函数。
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable
# 创建一个全局的线程池,可以根据需要调整 max_workers
executor = ThreadPoolExecutor(max_workers=10)
async def run_in_thread_pool(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""
在线程池中异步运行同步函数
:param func: 要运行的同步函数
:param args: 函数的位置参数
:param kwargs: 函数的关键字参数
:return: 函数的返回值
"""
loop = asyncio.get_running_loop()
# 使用 functools.partial 绑定函数和参数,以便传递给 run_in_executor
func_to_run = partial(func, *args, **kwargs)
# loop.run_in_executor 会返回一个 awaitable 对象
return await loop.run_in_executor(executor, func_to_run)

252
core/permission_manager.py Normal file
View File

@@ -0,0 +1,252 @@
"""
权限管理器模块
该模块负责管理用户权限,支持 admin、op、user 三个权限级别。
权限数据存储在 `permissions.json` 文件中,格式为:
{
"users": {
"123456": "admin",
"789012": "op",
"345678": "user"
}
}
"""
import json
import os
from functools import total_ordering
from typing import Dict
from .logger import logger
from .admin_manager import admin_manager # 导入 AdminManager
@total_ordering
class Permission:
"""
权限封装类
封装了权限的名称和等级,并提供了比较方法。
使用 @total_ordering 装饰器可以自动生成所有的比较运算符。
"""
def __init__(self, name: str, level: int):
"""
初始化权限对象
Args:
name (str): 权限名称 (e.g., "admin", "op")
level (int): 权限等级,数字越大权限越高
"""
self.name = name
self.level = level
def __eq__(self, other):
"""
判断权限是否相等
"""
if not isinstance(other, Permission):
return NotImplemented
return self.level == other.level
def __lt__(self, other):
"""
判断权限是否小于另一个权限
"""
if not isinstance(other, Permission):
return NotImplemented
return self.level < other.level
def __str__(self) -> str:
"""
返回权限的字符串表示(即权限名称)
"""
return self.name
# 定义全局权限常量
ADMIN = Permission("admin", 3)
OP = Permission("op", 2)
USER = Permission("user", 1)
# 用于从字符串名称查找权限对象的字典
_PERMISSIONS: Dict[str, Permission] = {
p.name: p for p in [ADMIN, OP, USER]
}
class PermissionManager:
"""
权限管理器类
负责加载、保存和查询用户权限数据。
使用单例模式,确保全局只有一个权限管理器实例。
"""
_instance = None
def __new__(cls):
"""
单例模式实现
Returns:
PermissionManager: 全局唯一的权限管理器实例
"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""
初始化权限管理器
如果已经初始化过,则直接返回。
"""
if getattr(self, "_initialized", False):
return
# 权限数据文件路径
self.data_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"data",
"permissions.json"
)
# 确保数据目录存在
data_dir = os.path.dirname(self.data_file)
os.makedirs(data_dir, exist_ok=True)
# 权限数据存储结构:{"users": {"user_id": "level_name"}}
self._data: Dict[str, Dict[str, str]] = {"users": {}}
# 加载现有数据
self.load()
self._initialized = True
logger.info("权限管理器初始化完成")
def load(self) -> None:
"""
从文件加载权限数据
如果文件不存在,则创建空文件并初始化默认数据结构。
"""
try:
if os.path.exists(self.data_file):
with open(self.data_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 兼容旧格式
if "users" in data:
self._data["users"] = data["users"]
else:
self._data["users"] = {}
logger.debug(f"权限数据已从 {self.data_file} 加载")
else:
# 文件不存在,创建空文件
self.save()
logger.debug(f"创建空的权限数据文件: {self.data_file}")
except json.JSONDecodeError as e:
logger.error(f"权限数据文件格式错误: {e}")
# 文件损坏,重置为空数据
self._data["users"] = {}
self.save()
except Exception as e:
logger.error(f"加载权限数据失败: {e}")
self._data["users"] = {}
def save(self) -> None:
"""
将权限数据保存到文件
"""
try:
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, ensure_ascii=False)
logger.debug(f"权限数据已保存到 {self.data_file}")
except Exception as e:
logger.error(f"保存权限数据失败: {e}")
async def get_user_permission(self, user_id: int) -> Permission:
"""
获取指定用户的权限对象
Args:
user_id (int): 用户 QQ 号
Returns:
Permission: 用户的权限对象,如果用户不存在则返回默认级别 USER
"""
# 首先,通过 AdminManager 检查是否为管理员
if await admin_manager.is_admin(user_id):
return ADMIN
# 如果不是管理员,则从 permissions.json 中查找
user_id_str = str(user_id)
level_name = self._data["users"].get(user_id_str, USER.name)
return _PERMISSIONS.get(level_name, USER)
def set_user_permission(self, user_id: int, permission: Permission) -> None:
"""
设置指定用户的权限级别
Args:
user_id (int): 用户 QQ 号
permission (Permission): 权限对象
Raises:
ValueError: 如果权限对象无效
"""
if not isinstance(permission, Permission) or permission.name not in _PERMISSIONS:
raise ValueError(f"无效的权限对象: {permission}")
user_id_str = str(user_id)
self._data["users"][user_id_str] = permission.name
self.save()
logger.info(f"设置用户 {user_id} 的权限级别为 {permission.name}")
def remove_user(self, user_id: int) -> None:
"""
移除指定用户的权限设置,恢复为默认级别
Args:
user_id (int): 用户 QQ 号
"""
user_id_str = str(user_id)
if user_id_str in self._data["users"]:
del self._data["users"][user_id_str]
self.save()
logger.info(f"移除用户 {user_id} 的权限设置")
async def check_permission(self, user_id: int, required_permission: Permission) -> bool:
"""
检查用户是否具有指定权限级别
Args:
user_id (int): 用户 QQ 号
required_permission (Permission): 所需的权限对象
Returns:
bool: 如果用户权限 >= 所需权限,返回 True否则返回 False
"""
user_permission = await self.get_user_permission(user_id)
return user_permission >= required_permission
def get_all_users(self) -> Dict[str, str]:
"""
获取所有设置了权限的用户及其级别名称
Returns:
Dict[str, str]: 用户ID到权限级别名称的映射
"""
return self._data["users"].copy()
def clear_all(self) -> None:
"""
清空所有权限设置
"""
self._data["users"].clear()
self.save()
logger.info("已清空所有权限设置")
# 全局权限管理器实例
permission_manager = PermissionManager()

View File

@@ -11,7 +11,9 @@ import pkgutil
import sys import sys
from core.command_manager import matcher from core.command_manager import matcher
from core.exceptions import SyncHandlerError
from .logger import logger from .logger import logger
from .executor import run_in_thread_pool
def load_all_plugins(): def load_all_plugins():
@@ -49,6 +51,8 @@ def load_all_plugins():
type_str = "" if is_pkg else "文件" type_str = "" if is_pkg else "文件"
logger.success(f" [{type_str}] 成功{action}: {module_name}") logger.success(f" [{type_str}] 成功{action}: {module_name}")
except SyncHandlerError as e:
logger.error(f" 插件 {module_name} 加载失败: {e} (跳过此插件)")
except Exception as e: except Exception as e:
print( print(
f" {action if 'action' in locals() else '加载'}插件 {module_name} 失败: {e}" f" {action if 'action' in locals() else '加载'}插件 {module_name} 失败: {e}"
@@ -75,50 +79,48 @@ class PluginDataManager:
self.plugin_name + ".json", self.plugin_name + ".json",
) )
self.data = {} self.data = {}
self.load()
def load(self): async def load(self):
"""读取配置文件""" """读取配置文件"""
if not os.path.exists(self.data_file): if not os.path.exists(self.data_file):
with open(self.data_file, "w", encoding="utf-8") as f: await self.set(self.plugin_name, [])
self.set(self.plugin_name, [])
try: try:
with open(self.data_file, "r", encoding="utf-8") as f: with open(self.data_file, "r", encoding="utf-8") as f:
self.data = json.load(f) self.data = await run_in_thread_pool(json.load, f)
except json.JSONDecodeError: except json.JSONDecodeError:
self.data = {} self.data = {}
def save(self): async def save(self):
"""保存配置到文件""" """保存配置到文件"""
with open(self.data_file, "w", encoding="utf-8") as f: with open(self.data_file, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2, ensure_ascii=False) await run_in_thread_pool(json.dump, self.data, f, indent=2, ensure_ascii=False)
def get(self, key, default=None): def get(self, key, default=None):
"""获取配置项""" """获取配置项"""
return self.data.get(key, default) return self.data.get(key, default)
def set(self, key, value): async def set(self, key, value):
"""设置配置项""" """设置配置项"""
self.data[key] = value self.data[key] = value
self.save() await self.save()
def add(self, key, value): async def add(self, key, value):
"""添加配置项""" """添加配置项"""
if key not in self.data: if key not in self.data:
self.data[key] = [] self.data[key] = []
self.data[key].append(value) self.data[key].append(value)
self.save() await self.save()
def remove(self, key): async def remove(self, key):
"""删除配置项""" """删除配置项"""
if key in self.data: if key in self.data:
del self.data[key] del self.data[key]
self.save() await self.save()
def clear(self): async def clear(self):
"""清空所有配置""" """清空所有配置"""
self.data.clear() self.data.clear()
self.save() await self.save()
def get_all(self): def get_all(self):
return self.data.copy() return self.data.copy()

View File

@@ -1,20 +1,24 @@
import redis import redis.asyncio as redis
from .config_loader import global_config as config from .config_loader import global_config as config
from .logger import logger from .logger import logger
class RedisManager: class RedisManager:
""" """
Redis 连接管理器 Redis 连接管理器(异步单例)
""" """
_pool = None _instance = None
_client = None _redis = None
@classmethod def __new__(cls):
def initialize(cls): if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def initialize(self):
""" """
初始化 Redis 连接并进行健康检查 异步初始化 Redis 连接并进行健康检查
""" """
if cls._pool is None: if self._redis is None:
try: try:
host = config.redis['host'] host = config.redis['host']
port = config.redis['port'] port = config.redis['port']
@@ -23,39 +27,32 @@ class RedisManager:
logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}") logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
cls._pool = redis.ConnectionPool( self._redis = redis.Redis(
host=host, host=host,
port=port, port=port,
db=db, db=db,
password=password, password=password,
decode_responses=True decode_responses=True
) )
cls._client = redis.Redis(connection_pool=cls._pool) if await self._redis.ping():
if cls._client.ping():
logger.success("Redis 连接成功!") logger.success("Redis 连接成功!")
else: else:
logger.error("Redis 连接失败: PING 命令无响应") logger.error("Redis 连接失败: PING 命令无响应")
except redis.exceptions.ConnectionError as e: except redis.exceptions.ConnectionError as e:
logger.error(f"Redis 连接失败: {e}") logger.error(f"Redis 连接失败: {e}")
cls._pool = None self._redis = None
cls._client = None
except Exception as e: except Exception as e:
logger.exception(f"Redis 初始化时发生未知错误: {e}") logger.exception(f"Redis 初始化时发生未知错误: {e}")
cls._pool = None self._redis = None
cls._client = None
@classmethod @property
def get_redis(cls): def redis(self):
""" """
获取 Redis 连接 获取 Redis 连接实例
:return: Redis 连接实例
""" """
if cls._client is None: if self._redis is None:
# 理论上 initialize 应该在程序启动时被调用,这里作为备用 raise ConnectionError("Redis 未初始化或连接失败,请先调用 initialize()")
cls.initialize() return self._redis
return cls._client
# 在模块加载时直接初始化 # 全局 Redis 管理器实例
RedisManager.initialize() redis_manager = RedisManager()
redis_client = RedisManager.get_redis()

3
data/admin.json Normal file
View File

@@ -0,0 +1,3 @@
{
"admins": [2221577113]
}

3
data/permissions.json Normal file
View File

@@ -0,0 +1,3 @@
{
"users": {}
}

288
html/404.html Normal file
View File

@@ -0,0 +1,288 @@
<!DOCTYPE html>
<html><head><title></title><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>404 - Signal Lost</title><script src="./tailwindcss.js"></script><script src="./iconify-icon.min.js"></script></head><body>
<style id="style-404">
/* 核心背景:动态噪点与 CRT 效果 */
@keyframes noise {
0%, 100% { transform: translate(0, 0); }
10% { transform: translate(-5%, -5%); }
20% { transform: translate(-10%, 5%); }
30% { transform: translate(5%, -10%); }
40% { transform: translate(-5%, 15%); }
50% { transform: translate(-10%, 5%); }
60% { transform: translate(15%, 0); }
70% { transform: translate(0, 10%); }
80% { transform: translate(-15%, 0); }
90% { transform: translate(10%, 5%); }
}
.bg-noise {
position: fixed;
top: -50%;
left: -50%;
right: -50%;
bottom: -50%;
width: 200%;
height: 200vh;
background: transparent url('http://assets.iceable.com/img/noise-transparent.png') repeat 0 0;
background-repeat: repeat;
animation: noise .2s infinite;
opacity: .05;
visibility: visible;
pointer-events: none;
z-index: 1;
}
.crt-overlay {
background: linear-gradient(rgba(18, 16, 16, 0) 50%, rgba(0, 0, 0, 0.25) 50%), linear-gradient(90deg, rgba(255, 0, 0, 0.06), rgba(0, 255, 0, 0.02), rgba(0, 0, 255, 0.06));
background-size: 100% 2px, 3px 100%;
pointer-events: none;
}
.vignette {
background: radial-gradient(circle, rgba(0,0,0,0) 60%, rgba(0,0,0,1) 100%);
pointer-events: none;
}
/* 高级 Glitch 文本效果 */
.cyber-glitch {
position: relative;
color: #fff;
mix-blend-mode: lighten;
}
.cyber-glitch::before,
.cyber-glitch::after {
content: attr(data-text);
position: absolute;
top: 0;
width: 100%;
background: #050505;
clip: rect(0, 0, 0, 0);
}
.cyber-glitch::before {
left: -2px;
text-shadow: 2px 0 #ff00c1;
animation: glitch-anim-1 2s infinite linear alternate-reverse;
}
.cyber-glitch::after {
left: 2px;
text-shadow: -2px 0 #00fff9;
animation: glitch-anim-2 3s infinite linear alternate-reverse;
}
@keyframes glitch-anim-1 {
0% { clip: rect(20px, 9999px, 10px, 0); }
20% { clip: rect(50px, 9999px, 80px, 0); }
40% { clip: rect(10px, 9999px, 40px, 0); }
60% { clip: rect(80px, 9999px, 20px, 0); }
80% { clip: rect(30px, 9999px, 60px, 0); }
100% { clip: rect(60px, 9999px, 30px, 0); }
}
@keyframes glitch-anim-2 {
0% { clip: rect(60px, 9999px, 30px, 0); }
20% { clip: rect(10px, 9999px, 50px, 0); }
40% { clip: rect(70px, 9999px, 10px, 0); }
60% { clip: rect(30px, 9999px, 90px, 0); }
80% { clip: rect(90px, 9999px, 20px, 0); }
100% { clip: rect(20px, 9999px, 60px, 0); }
}
/* 装饰性扫描线 */
.scanline-bar {
width: 100%;
height: 5px;
background: rgba(0, 255, 249, 0.3);
position: absolute;
z-index: 10;
animation: scan 3s linear infinite;
opacity: 0.3;
box-shadow: 0 0 10px rgba(0, 255, 249, 0.5);
}
@keyframes scan {
0% { top: -10%; }
100% { top: 110%; }
}
/* 终端光标闪烁 */
.cursor-blink {
animation: blink 1s step-end infinite;
}
@keyframes blink {
0%, 100% { opacity: 1; }
50% { opacity: 0; }
}
</style>
<div id="content-404" class="relative min-h-screen bg-[#050505] flex flex-col items-center justify-center overflow-hidden font-mono text-gray-300 selection:bg-electric/30 selection:text-white">
<!-- Environmental Effects -->
<div class="bg-noise"></div>
<div class="crt-overlay absolute inset-0 z-50 pointer-events-none"></div>
<div class="vignette absolute inset-0 z-40 pointer-events-none"></div>
<div class="scanline-bar pointer-events-none"></div>
<!-- Background Grid -->
<div class="absolute inset-0 bg-[linear-gradient(rgba(30,30,30,0.5)_1px,transparent_1px),linear-gradient(90deg,rgba(30,30,30,0.5)_1px,transparent_1px)] bg-[size:40px_40px] opacity-10 pointer-events-none z-0" style="perspective: 500px; transform: rotateX(20deg) scale(1.5);"></div>
<!-- Main Content -->
<div class="relative z-30 flex flex-col items-center w-full max-w-4xl px-6">
<!-- Glitch Title -->
<div class="relative mb-6 group cursor-default">
<h1 class="cyber-glitch text-[120px] md:text-[180px] font-black leading-none tracking-tighter opacity-90 select-none" data-text="404">
404
</h1>
<div class="absolute -bottom-4 left-0 w-full flex justify-between text-[10px] md:text-xs text-electric/40 uppercase tracking-[0.5em] font-bold">
<span>Sys.Malfunction</span>
<span>0x00_DEAD</span>
</div>
</div>
<!-- Terminal Window -->
<div class="w-full max-w-2xl mt-8 mb-12 backdrop-blur-md bg-black/40 border border-white/5 rounded-sm shadow-[0_0_30px_rgba(0,0,0,0.8)] overflow-hidden">
<!-- Terminal Header -->
<div class="flex items-center justify-between px-4 py-2 bg-white/5 border-b border-white/5">
<div class="flex gap-2">
<div class="w-3 h-3 rounded-full bg-red-900/50 border border-red-500/30"></div>
<div class="w-3 h-3 rounded-full bg-yellow-900/50 border border-yellow-500/30"></div>
<div class="w-3 h-3 rounded-full bg-green-900/50 border border-green-500/30"></div>
</div>
<div class="text-[10px] text-gray-600 font-mono">root@neobot:~/system/logs</div>
</div>
<!-- Terminal Body -->
<div class="p-6 font-mono text-sm md:text-base leading-relaxed h-48 overflow-y-auto custom-scrollbar">
<div id="terminal-content" class="space-y-1">
<!-- Content will be injected by JS -->
<div class="text-gray-500 transition-opacity duration-100">&gt; initiating_handshake...</div><div class="text-gray-400 transition-opacity duration-100">&gt; resolving_host: calglaubot.internal</div><div class="text-green-500/50 transition-opacity duration-100">&gt; connection_established (port: 443)</div><div class="text-blue-400/60 transition-opacity duration-100">&gt; GET /requested_resource HTTP/1.1</div><div class="text-gray-500 transition-opacity duration-100">&gt; waiting_for_response...</div><div class="text-red-500 font-bold transition-opacity duration-100">&gt; FATAL: endpoint_not_found</div><div class="text-gray-600 mt-2 transition-opacity duration-100">&gt; stack_trace_dump:</div><div class="text-gray-600 pl-4 transition-opacity duration-100">&gt; at Router.resolve (core.js:204)</div><div class="text-gray-600 pl-4 transition-opacity duration-100">&gt; at Neobot.Handler (main.py:404)</div><div class="text-electric/80 mt-2 transition-opacity duration-100">&gt; error: signal_lost_in_void</div></div>
<div class="flex items-center mt-2 text-electric">
<span class="mr-2"></span>
<span class="cursor-blink w-2 h-4 bg-electric block"></span>
</div>
</div>
</div>
<!-- Navigation -->
<div class="flex flex-col md:flex-row gap-6 items-center">
<a href="index.html" class="group relative px-8 py-3 bg-transparent overflow-hidden">
<!-- Button Borders -->
<div class="absolute top-0 left-0 w-2 h-2 border-t border-l border-electric/50 transition-all group-hover:w-full group-hover:h-full group-hover:border-electric"></div>
<div class="absolute bottom-0 right-0 w-2 h-2 border-b border-r border-electric/50 transition-all group-hover:w-full group-hover:h-full group-hover:border-electric"></div>
<!-- Button Content -->
<div class="relative flex items-center gap-3">
<iconify-icon icon="mdi:console-network" class="text-xl text-electric/70 group-hover:text-electric transition-colors"></iconify-icon>
<span class="font-bold tracking-widest text-sm text-gray-400 group-hover:text-white transition-colors">REBOOT_SYSTEM</span>
</div>
<!-- Hover Background -->
<div class="absolute inset-0 bg-electric/5 translate-y-full group-hover:translate-y-0 transition-transform duration-300"></div>
</a>
<a href="#" class="text-xs text-gray-600 hover:text-electric/60 transition-colors uppercase tracking-widest border-b border-transparent hover:border-electric/30 pb-0.5">
Report_Incident
</a>
</div>
</div>
<!-- Footer Stats -->
<div class="absolute bottom-6 left-6 right-6 flex justify-between text-[10px] text-gray-700 font-mono uppercase z-30">
<div>
<span>CPU: <span class="text-gray-500">98%</span></span>
<span class="mx-2">|</span>
<span>MEM: <span class="text-red-900 animate-pulse">OVERFLOW</span></span>
</div>
<div>
<span>NEOBOT FRAMEWORK</span>
</div>
</div>
</div><script id="script-404">(function() {
document.addEventListener('DOMContentLoaded', () => {
const terminalContent = document.getElementById('terminal-content');
const logs = [
{ text: 'initiating_handshake...', delay: 100, class: 'text-gray-500' },
{ text: 'resolving_host: calglaubot.internal', delay: 300, class: 'text-gray-400' },
{ text: 'connection_established (port: 443)', delay: 600, class: 'text-green-500/50' },
{ text: 'GET /requested_resource HTTP/1.1', delay: 900, class: 'text-blue-400/60' },
{ text: 'waiting_for_response...', delay: 1200, class: 'text-gray-500' },
{ text: 'FATAL: endpoint_not_found', delay: 2000, class: 'text-red-500 font-bold' },
{ text: 'stack_trace_dump:', delay: 2200, class: 'text-gray-600 mt-2' },
{ text: ' at Router.resolve (core.js:204)', delay: 2300, class: 'text-gray-600 pl-4' },
{ text: ' at Neobot.Handler (main.py:404)', delay: 2400, class: 'text-gray-600 pl-4' },
{ text: 'error: signal_lost_in_void', delay: 2800, class: 'text-electric/80 mt-2' }
];
let currentLine = 0;
function typeWriter() {
if (currentLine < logs.length) {
const line = logs[currentLine];
const p = document.createElement('div');
p.className = `${line.class} opacity-0 transition-opacity duration-100`;
p.textContent = `> ${line.text}`;
terminalContent.appendChild(p);
// Trigger reflow
void p.offsetWidth;
p.classList.remove('opacity-0');
// Auto scroll to bottom
const container = terminalContent.parentElement;
container.scrollTop = container.scrollHeight;
currentLine++;
setTimeout(typeWriter, Math.random() * 200 + 100); // Random typing speed variation
}
}
setTimeout(typeWriter, 500);
});
})();</script></body></html>

387
html/index.html Normal file
View File

@@ -0,0 +1,387 @@
<!DOCTYPE html>
<html lang="zh-CN" class="scroll-smooth">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>NEOBOT | F.O.S FRAMEWORK</title>
<script src="https://cdn.tailwindcss.com"></script>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600&family=Inter:wght@300;400;600;800&display=swap" rel="stylesheet">
<script src="https://code.iconify.design/iconify-icon/1.0.7/iconify-icon.min.js"></script>
<script>
tailwind.config = {
theme: {
extend: {
fontFamily: {
sans: ['"Inter"', 'sans-serif'],
mono: ['"JetBrains Mono"', 'monospace'],
},
colors: {
brand: {
dark: '#0B0C10',
gray: '#1F2833',
light: '#C5C6C7',
accent: '#66FCF1', // 经典的青色高亮
accentDim: '#45A29E',
}
},
animation: {
'float': 'float 6s ease-in-out infinite',
'pulse-slow': 'pulse 4s cubic-bezier(0.4, 0, 0.6, 1) infinite',
},
keyframes: {
float: {
'0%, 100%': { transform: 'translateY(0)' },
'50%': { transform: 'translateY(-10px)' },
}
}
}
}
}
</script>
<style>
body {
background-color: #0B0C10;
color: #C5C6C7;
background-image:
radial-gradient(circle at 15% 50%, rgba(69, 162, 158, 0.08) 0%, transparent 25%),
radial-gradient(circle at 85% 30%, rgba(102, 252, 241, 0.05) 0%, transparent 25%);
}
/* 现代卡片样式 */
.modern-card {
background: rgba(31, 40, 51, 0.6);
backdrop-filter: blur(12px);
border: 1px solid rgba(255, 255, 255, 0.05);
border-radius: 12px;
transition: all 0.3s ease;
}
.modern-card:hover {
border-color: rgba(102, 252, 241, 0.3);
transform: translateY(-4px);
box-shadow: 0 10px 30px -10px rgba(102, 252, 241, 0.1);
}
/* 代码块样式 */
.code-window {
background: #151515;
border-radius: 8px;
border: 1px solid #333;
box-shadow: 0 20px 50px rgba(0,0,0,0.5);
}
/* 自定义滚动条 */
::-webkit-scrollbar { width: 6px; }
::-webkit-scrollbar-track { background: #0B0C10; }
::-webkit-scrollbar-thumb { background: #333; border-radius: 3px; }
::-webkit-scrollbar-thumb:hover { background: #66FCF1; }
.text-glow {
text-shadow: 0 0 20px rgba(102, 252, 241, 0.3);
}
</style>
</head>
<body class="antialiased selection:bg-brand-accent selection:text-brand-dark">
<!-- 顶部导航 -->
<nav class="fixed top-0 w-full z-50 border-b border-white/5 bg-brand-dark/80 backdrop-blur-md">
<div class="max-w-7xl mx-auto px-6 h-16 flex items-center justify-between">
<div class="flex items-center gap-3">
<div class="w-8 h-8 rounded-lg bg-brand-accent flex items-center justify-center text-brand-dark">
<iconify-icon icon="mdi:robot" class="text-xl"></iconify-icon>
</div>
<span class="font-bold text-white tracking-wide text-lg">NEO<span class="text-brand-accent">BOT</span></span>
</div>
<div class="hidden md:flex items-center gap-6 text-sm font-medium text-gray-400">
<span class="flex items-center gap-2"><span class="w-2 h-2 rounded-full bg-green-500 animate-pulse"></span>OneBot 11</span>
<span>v1.0.0</span>
</div>
<a href="https://github.com/Fairy-Oracle-Sanctuary/NEO-Bot-Framework" target="_blank"
class="flex items-center gap-2 px-4 py-2 bg-white/5 hover:bg-white/10 rounded-full transition-colors text-sm font-medium text-white border border-white/10">
<iconify-icon icon="mdi:github" class="text-lg"></iconify-icon>
GitHub
</a>
</div>
</nav>
<main class="pt-32 pb-20 px-6">
<div class="max-w-7xl mx-auto space-y-32">
<!-- Hero 区域 -->
<section class="grid lg:grid-cols-2 gap-16 items-center">
<div class="space-y-8 animate-float">
<div class="inline-flex items-center gap-2 px-3 py-1 rounded-full bg-brand-accent/10 border border-brand-accent/20 text-brand-accent text-xs font-mono font-bold tracking-wider">
🚀 HIGH PERFORMANCE ASYNC FRAMEWORK
</div>
<h1 class="text-5xl md:text-7xl font-extrabold text-white leading-tight">
为现代开发而生<br>
<span class="text-transparent bg-clip-text bg-gradient-to-r from-brand-accent to-blue-500 text-glow">NEO 机器人框架</span>
</h1>
<p class="text-lg text-gray-400 max-w-xl leading-relaxed">
基于 Python 异步生态构建的 OneBot 11 解决方案。内置 Redis 缓存、插件热重载与类型安全检查。这是我的第一个 Python 作品,致力于极致的开发体验。
</p>
<!-- 团队信息 (头像已更新) -->
<div class="flex items-center gap-4 py-4 border-t border-white/10 border-b border-white/10">
<div class="text-xs font-mono text-gray-500 uppercase tracking-widest">Core Team</div>
<div class="flex -space-x-3 hover:space-x-1 transition-all duration-300">
<!-- 镀铬酸钾 -->
<img src="https://q1.qlogo.cn/g?b=qq&nk=2221577113&s=640" alt="镀铬酸钾" title="镀铬酸钾 (Lead Developer)"
class="w-10 h-10 rounded-full border-2 border-brand-dark cursor-help hover:scale-110 transition-transform">
<!-- baby2016 -->
<img src="https://q1.qlogo.cn/g?b=qq&nk=2185823427&s=640" alt="baby2016" title="baby2016 (Co-Founder)"
class="w-10 h-10 rounded-full border-2 border-brand-dark cursor-help hover:scale-110 transition-transform">
</div>
<span class="text-sm font-medium text-white pl-2">Fairy-Oracle-Sanctuary</span>
</div>
<div class="flex flex-wrap gap-4">
<button onclick="copyInstall()" class="group relative px-8 py-4 bg-brand-accent text-brand-dark font-bold rounded-xl transition-all hover:bg-white hover:scale-105 active:scale-95 flex items-center gap-2">
<span>快速开始</span>
<iconify-icon icon="mdi:arrow-right" class="group-hover:translate-x-1 transition-transform"></iconify-icon>
</button>
<div class="flex items-center gap-2 px-6 py-4 rounded-xl bg-brand-gray border border-white/5 text-gray-400 font-mono text-sm">
<span class="text-brand-accent">$</span> git clone ...
<button onclick="copyClone()" class="ml-2 hover:text-white transition-colors" title="复制指令">
<iconify-icon icon="mdi:content-copy"></iconify-icon>
</button>
</div>
</div>
</div>
<!-- 代码演示 (真实 main.py) -->
<div class="relative group">
<div class="absolute -inset-1 bg-gradient-to-r from-brand-accent to-blue-600 rounded-lg blur opacity-20 group-hover:opacity-40 transition duration-1000"></div>
<div class="code-window relative overflow-hidden flex flex-col h-[500px]">
<div class="flex items-center justify-between px-4 py-3 bg-[#1a1a1a] border-b border-white/5 shrink-0">
<div class="flex gap-2">
<div class="w-3 h-3 rounded-full bg-red-500"></div>
<div class="w-3 h-3 rounded-full bg-yellow-500"></div>
<div class="w-3 h-3 rounded-full bg-green-500"></div>
</div>
<span class="text-xs font-mono text-gray-500">main.py</span>
</div>
<div class="p-6 overflow-y-auto custom-scrollbar grow">
<pre class="font-mono text-xs md:text-sm leading-6"><code><span class="text-gray-500">"""
NEO Bot 主程序入口
负责启动 WebSocket 连接,初始化插件系统,并提供热重载功能。
"""</span>
<span class="text-pink-400">import</span> asyncio
<span class="text-pink-400">from</span> watchdog.observers <span class="text-pink-400">import</span> Observer
<span class="text-pink-400">from</span> watchdog.events <span class="text-pink-400">import</span> FileSystemEventHandler
<span class="text-pink-400">from</span> core.logger <span class="text-pink-400">import</span> logger
<span class="text-pink-400">from</span> core.ws <span class="text-pink-400">import</span> WS
<span class="text-pink-400">from</span> core.plugin_manager <span class="text-pink-400">import</span> load_all_plugins
<span class="text-pink-400">class</span> <span class="text-yellow-300">PluginReloadHandler</span>(FileSystemEventHandler):
<span class="text-gray-500">"""监听文件变更,触发热重载"""</span>
<span class="text-pink-400">def</span> <span class="text-blue-400">on_any_event</span>(self, event):
<span class="text-pink-400">if not</span> event.src_path.endswith(<span class="text-green-400">".py"</span>):
<span class="text-pink-400">return</span>
logger.info(<span class="text-green-400">f"检测到文件变更: {event.src_path}"</span>)
<span class="text-pink-400">try</span>:
<span class="text-purple-400">run_in_thread_pool</span>(load_all_plugins)
logger.success(<span class="text-green-400">"插件重载完成"</span>)
<span class="text-pink-400">except</span> Exception <span class="text-pink-400">as</span> e:
logger.exception(<span class="text-green-400">f"重载失败: {e}"</span>)
<span class="text-cyan-400">@logger.catch</span>
<span class="text-pink-400">async def</span> <span class="text-yellow-300">main</span>():
<span class="text-gray-500"># 1. 初始化核心组件</span>
<span class="text-pink-400">await</span> run_in_thread_pool(load_all_plugins)
<span class="text-pink-400">await</span> redis_manager.initialize()
<span class="text-pink-400">await</span> admin_manager.initialize()
<span class="text-gray-500"># 2. 启动 Watchdog 热重载</span>
observer = Observer()
observer.schedule(PluginReloadHandler(), plugin_path, recursive=<span class="text-pink-400">True</span>)
observer.start()
<span class="text-gray-500"># 3. 启动 WebSocket 客户端</span>
<span class="text-pink-400">try</span>:
bot = WS()
<span class="text-pink-400">await</span> bot.connect()
<span class="text-pink-400">finally</span>:
observer.stop()
<span class="text-pink-400">if</span> __name__ == <span class="text-green-400">"__main__"</span>:
asyncio.run(main())</code></pre>
</div>
</div>
</div>
</section>
<!-- 特性介绍 -->
<section>
<div class="text-center mb-16 space-y-4">
<h2 class="text-3xl md:text-4xl font-bold text-white">为什么选择 NEO?</h2>
<p class="text-gray-400 max-w-2xl mx-auto">不仅仅是一个框架,更是一套完整的现代化开发解决方案。</p>
</div>
<div class="grid md:grid-cols-2 lg:grid-cols-3 gap-6">
<!-- Feature 1 -->
<div class="modern-card p-8 group">
<div class="w-12 h-12 rounded-xl bg-blue-500/10 flex items-center justify-center text-blue-400 mb-6 group-hover:scale-110 transition-transform">
<iconify-icon icon="mdi:lightning-bolt" class="text-3xl"></iconify-icon>
</div>
<h3 class="text-xl font-bold text-white mb-3">高性能异步 IO</h3>
<p class="text-gray-400 text-sm leading-relaxed">
基于 Python 原生 <code>asyncio</code><code>websockets</code> 构建。完全非阻塞设计,单进程即可轻松处理海量并发消息,拒绝卡顿。
</p>
</div>
<!-- Feature 2 -->
<div class="modern-card p-8 group">
<div class="w-12 h-12 rounded-xl bg-green-500/10 flex items-center justify-center text-green-400 mb-6 group-hover:scale-110 transition-transform">
<iconify-icon icon="mdi:refresh-auto" class="text-3xl"></iconify-icon>
</div>
<h3 class="text-xl font-bold text-white mb-3">智能插件热重载</h3>
<p class="text-gray-400 text-sm leading-relaxed">
基于 <code>watchdog</code> 实现文件监控。修改代码后自动重载插件逻辑,无需重启机器人进程。让调试和开发效率提升 200%。
</p>
</div>
<!-- Feature 3 -->
<div class="modern-card p-8 group">
<div class="w-12 h-12 rounded-xl bg-red-500/10 flex items-center justify-center text-red-400 mb-6 group-hover:scale-110 transition-transform">
<iconify-icon icon="mdi:database" class="text-3xl"></iconify-icon>
</div>
<h3 class="text-xl font-bold text-white mb-3">Redis 深度集成</h3>
<p class="text-gray-400 text-sm leading-relaxed">
内置 Redis 连接池。自动缓存群信息、好友列表等高频数据,减少 API 调用延迟,让响应速度快人一步。
</p>
</div>
<!-- Feature 4 -->
<div class="modern-card p-8 group">
<div class="w-12 h-12 rounded-xl bg-yellow-500/10 flex items-center justify-center text-yellow-400 mb-6 group-hover:scale-110 transition-transform">
<iconify-icon icon="mdi:shield-check" class="text-3xl"></iconify-icon>
</div>
<h3 class="text-xl font-bold text-white mb-3">类型安全</h3>
<p class="text-gray-400 text-sm leading-relaxed">
全面采用 Pydantic 和 Dataclasses。为所有事件和数据模型提供完整的类型注解IDE 智能补全,减少运行时错误。
</p>
</div>
<!-- Feature 5 -->
<div class="modern-card p-8 group">
<div class="w-12 h-12 rounded-xl bg-purple-500/10 flex items-center justify-center text-purple-400 mb-6 group-hover:scale-110 transition-transform">
<iconify-icon icon="mdi:account-key" class="text-3xl"></iconify-icon>
</div>
<h3 class="text-xl font-bold text-white mb-3">精细权限管理</h3>
<p class="text-gray-400 text-sm leading-relaxed">
内置 Admin/Op/User 三级权限体系。支持动态添加管理员,通过装饰器即可轻松控制每个指令的访问权限。
</p>
</div>
<!-- Feature 6 -->
<div class="modern-card p-8 group">
<div class="w-12 h-12 rounded-xl bg-cyan-500/10 flex items-center justify-center text-cyan-400 mb-6 group-hover:scale-110 transition-transform">
<iconify-icon icon="mdi:api" class="text-3xl"></iconify-icon>
</div>
<h3 class="text-xl font-bold text-white mb-3">标准 OneBot 11</h3>
<p class="text-gray-400 text-sm leading-relaxed">
完美兼容 OneBot v11 协议标准。支持 NapCatQQ、LLOneBot 等主流实现端,无缝对接,开箱即用。
</p>
</div>
</div>
</section>
<!-- 终端展示 -->
<section class="max-w-4xl mx-auto">
<div class="flex items-center gap-2 mb-4">
<iconify-icon icon="mdi:console" class="text-brand-accent text-xl"></iconify-icon>
<h3 class="font-mono text-sm text-gray-400">TERMINAL OUTPUT</h3>
</div>
<div class="bg-black rounded-lg border border-white/10 p-1 shadow-2xl">
<div class="bg-[#0a0a0a] rounded p-6 h-64 overflow-y-auto font-mono text-sm leading-relaxed" id="terminal">
<!-- JS 注入 -->
</div>
</div>
</section>
<!-- 性能建议 Banner -->
<section class="rounded-2xl bg-gradient-to-r from-yellow-500/10 to-orange-500/10 border border-yellow-500/20 p-8 flex flex-col md:flex-row items-start md:items-center gap-6">
<div class="w-12 h-12 rounded-full bg-yellow-500/20 flex items-center justify-center text-yellow-500 shrink-0">
<iconify-icon icon="mdi:speedometer" class="text-2xl"></iconify-icon>
</div>
<div>
<h3 class="text-lg font-bold text-white mb-1">性能建议:使用 PyPy</h3>
<p class="text-gray-400 text-sm">
为了获得最佳性能,我们强烈推荐使用 <span class="text-yellow-400">PyPy JIT 编译器</span> 来运行 NEO 框架。在处理高并发消息时PyPy 相比标准 CPython 能提供显著的性能提升。
</p>
</div>
</section>
</div>
</main>
<footer class="border-t border-white/5 bg-brand-dark py-12">
<div class="max-w-7xl mx-auto px-6 flex flex-col items-center justify-center gap-4 text-center">
<div class="flex items-center gap-2 text-brand-light font-mono text-sm">
<iconify-icon icon="mdi:code-tags"></iconify-icon>
<span>Designed by 镀铬酸钾 & baby2016</span>
</div>
<p class="text-xs text-gray-600">
&copy; 2026 FAIRY-ORACLE-SANCTUARY. All Rights Reserved.
</p>
</div>
</footer>
<script>
// 终端打字机效果
const terminalLines = [
{ text: "$ git clone https://github.com/Fairy-Oracle-Sanctuary/NEO-Bot-Framework.git", color: "text-white" },
{ text: "Cloning into 'NEO-Bot-Framework'...", color: "text-gray-500", delay: 300 },
{ text: "Unpacking objects: 100% (402/402), done.", color: "text-gray-500", delay: 300 },
{ text: "$ cd NEO-Bot-Framework", color: "text-white", delay: 100 },
{ text: "$ pip install -r requirements.txt", color: "text-white", delay: 100 },
{ text: "Successfully installed: watchdog loguru websockets redis pydantic", color: "text-green-400", delay: 500 },
{ text: "$ python main.py", color: "text-white", delay: 200 },
{ text: "[INFO] Initializing AdminManager...", color: "text-gray-400", delay: 200 },
{ text: "[INFO] Redis Connection Pool Created (Size: 10)", color: "text-gray-400", delay: 200 },
{ text: "[INFO] Watchdog Monitoring: /plugins (recursive=True)", color: "text-yellow-400", delay: 300 },
{ text: "[SUCCESS] WebSocket Connected to ws://127.0.0.1:30004", color: "text-green-500", delay: 200 },
{ text: "NEOBOT System is ready. Waiting for events...", color: "text-white animate-pulse" }
];
const term = document.getElementById('terminal');
let lineIndex = 0;
function runTerminal() {
if (lineIndex >= terminalLines.length) return;
const line = terminalLines[lineIndex];
const div = document.createElement('div');
div.className = `mb-1 ${line.color}`;
div.textContent = line.text;
term.appendChild(div);
term.scrollTop = term.scrollHeight;
lineIndex++;
setTimeout(runTerminal, line.delay || 100);
}
// 剪贴板功能
function copyClone() {
navigator.clipboard.writeText('git clone https://github.com/Fairy-Oracle-Sanctuary/NEO-Bot-Framework.git');
alert('Git Clone 指令已复制到剪贴板');
}
function copyInstall() {
navigator.clipboard.writeText('pip install -r requirements.txt');
alert('安装依赖指令已复制');
}
// 启动
window.onload = runTerminal;
</script>
</body>
</html>

13
main.py
View File

@@ -13,8 +13,11 @@ from watchdog.events import FileSystemEventHandler
# 初始化日志系统,必须在其他 core 模块导入之前执行 # 初始化日志系统,必须在其他 core 模块导入之前执行
from core.logger import logger from core.logger import logger
from core.admin_manager import admin_manager
from core.ws import WS from core.ws import WS
from core.plugin_manager import load_all_plugins from core.plugin_manager import load_all_plugins
from core.redis_manager import redis_manager
from core.executor import run_in_thread_pool
class PluginReloadHandler(FileSystemEventHandler): class PluginReloadHandler(FileSystemEventHandler):
@@ -62,7 +65,7 @@ class PluginReloadHandler(FileSystemEventHandler):
try: try:
# 重新扫描并加载插件 # 重新扫描并加载插件
load_all_plugins() run_in_thread_pool(load_all_plugins)
logger.success("插件重载完成") logger.success("插件重载完成")
except Exception as e: except Exception as e:
logger.exception(f"重载失败: {e}") logger.exception(f"重载失败: {e}")
@@ -78,7 +81,13 @@ async def main():
3. 建立连接并保持运行 3. 建立连接并保持运行
""" """
# 首次加载插件 # 首次加载插件
load_all_plugins() await run_in_thread_pool(load_all_plugins)
# 初始化 Redis 连接
await redis_manager.initialize()
# 初始化管理员管理器
await admin_manager.initialize()
# 启动文件监控 # 启动文件监控
# 监控 plugins 目录 # 监控 plugins 目录

View File

@@ -6,6 +6,7 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Optional from typing import List, Optional
from core.permission_manager import ADMIN, OP, USER
from models.message import MessageSegment from models.message import MessageSegment
from models.sender import Sender from models.sender import Sender
from .base import OneBotEvent, EventType from .base import OneBotEvent, EventType
@@ -32,6 +33,11 @@ class MessageEvent(OneBotEvent):
消息事件基类 消息事件基类
""" """
# 权限级别常量,用于装饰器参数
ADMIN = ADMIN
OP = OP
USER = USER
message_type: str message_type: str
"""消息类型: private (私聊), group (群聊)""" """消息类型: private (私聊), group (群聊)"""

View File

@@ -1,115 +1,74 @@
from core import PluginDataManager """
管理员管理插件
提供通过聊天指令动态添加或移除机器人管理员的功能。
"""
from core.bot import Bot from core.bot import Bot
from core.command_manager import matcher from core.command_manager import matcher
from models import GroupMessageEvent from core.admin_manager import admin_manager
from models.events.message import MessageEvent
__plugin_meta__ = { __plugin_meta__ = {
"name": "admin", "name": "管理员管理",
"description": "机器人权限管理插件", "description": "管理机器人的全局管理员",
"usage": "/admin", "usage": (
"/admin list - 列出所有管理员\n"
"/admin add <QQ号> - 添加管理员\n"
"/admin remove <QQ号> - 移除管理员"
),
} }
data = PluginDataManager("admin")
@matcher.command("admin", permission=MessageEvent.ADMIN)
async def handle_admin_command(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理 /admin 指令
@matcher.command("admin") :param bot: Bot 实例
async def handle_permission(bot: Bot, event: GroupMessageEvent, args: list[str]): :param event: 消息事件实例
:param args: 指令参数列表
"""
if not args: if not args:
await event.reply( await event.reply(__plugin_meta__["usage"])
"机器人权限管理插件指令:\n/admin list 列出所有权限\n/admin add member <QQ号> 添加群成员权限\n/admin remove member <QQ号> 删除群成员权限\n/admin add group <群号> 添加群权限\n/admin remove group <群号> 删除群权限\n/admin clear member 清空群成员权限\n/admin clear group 清空群权限\n/admin clear all 清空所有权限"
)
return
if str(event.user_id) not in data.get("members", []):
await event.reply("你没有权限使用此命令。")
return
if str(event.group_id) not in data.get("groups", []):
await event.reply("群聊不在权限中")
return return
action = args[0].lower() action = args[0].lower()
# ensure storage keys exist
members = data.get("members", []) or []
groups = data.get("groups", []) or []
if action == "list": if action == "list":
msg_lines = ["当前权限列表:"] admins = await admin_manager.get_all_admins()
msg_lines.append( if not admins:
f"群成员权限 ({len(members)}): {', '.join(members) if members else ''}" await event.reply("当前没有设置任何管理员。")
) return
msg_lines.append(
f"群权限 ({len(groups)}): {', '.join(groups) if groups else ''}" admin_list_str = "\n".join(str(admin_id) for admin_id in admins)
) await event.reply(f"当前管理员列表 ({len(admins)}):\n{admin_list_str}")
await event.reply("\n".join(msg_lines))
return return
if action in ("add", "remove"): if action in ("add", "remove"):
if len(args) < 3: if len(args) < 2 or not args[1].isdigit():
await event.reply("参数错误,示例:/admin add member 123456") await event.reply("参数错误,请提供一个有效的 QQ 号。\n示例: /admin add 123456")
return return
target = args[1].lower() try:
value = args[2] user_id = int(args[1])
except ValueError:
if target == "member": await event.reply("无效的 QQ 号,请输入纯数字。")
# operate on members list
if action == "add":
if str(value) in members:
await event.reply(f"成员 {value} 已存在,无需重复添加。")
return
members.append(str(value))
data.set("members", members)
await event.reply(f"已添加群成员权限:{value}")
return
else: # remove
if str(value) not in members:
await event.reply(f"成员 {value} 不在权限列表中。")
return
members = [m for m in members if m != str(value)]
data.set("members", members)
await event.reply(f"已移除群成员权限:{value}")
return
if target == "group":
if action == "add":
if str(value) in groups:
await event.reply(f"{value} 已存在,无需重复添加。")
return
groups.append(str(value))
data.set("groups", groups)
await event.reply(f"已添加群权限:{value}")
return
else: # remove
if str(value) not in groups:
await event.reply(f"{value} 不在权限列表中。")
return
groups = [g for g in groups if g != str(value)]
data.set("groups", groups)
await event.reply(f"已移除群权限:{value}")
return
await event.reply("未知目标类型,请使用 member 或 group")
return
if action == "clear":
if len(args) < 2:
await event.reply("参数错误,示例:/admin clear member")
return return
target = args[1].lower()
if target == "member":
data.set("members", [])
await event.reply("已清空群成员权限。")
return
if target == "group":
data.set("groups", [])
await event.reply("已清空群权限。")
return
if target == "all":
data.clear()
await event.reply("已清空所有权限。")
return
await event.reply("未知清空目标,请使用 member/group/all")
return
await event.reply("未知指令,使用 /admin 查看帮助") if action == "add":
success = await admin_manager.add_admin(user_id)
if success:
await event.reply(f"成功添加管理员: {user_id}")
else:
await event.reply(f"管理员 {user_id} 已存在,无需重复添加。")
return
elif action == "remove":
success = await admin_manager.remove_admin(user_id)
if success:
await event.reply(f"成功移除管理员: {user_id}")
else:
await event.reply(f"管理员 {user_id} 不存在。")
return
await event.reply(f"未知的指令: {action}\n\n{__plugin_meta__['usage']}")

View File

@@ -13,6 +13,7 @@ from typing import Tuple, Set
from core.bot import Bot from core.bot import Bot
from core.command_manager import matcher from core.command_manager import matcher
from core.executor import run_in_thread_pool
from models import MessageEvent from models import MessageEvent
__plugin_meta__ = { __plugin_meta__ = {
@@ -38,9 +39,11 @@ def is_code_safe(code: str) -> Tuple[bool, str]:
statements = STATEMENT_SPLIT_PATTERN.split(code) statements = STATEMENT_SPLIT_PATTERN.split(code)
for statement in statements: for statement in statements:
statement = statement.strip() statement = statement.strip()
if not statement: continue if not statement:
continue
parts = statement.split() parts = statement.split()
if not parts: continue if not parts:
continue
if parts[0] == 'from' and len(parts) > 1: if parts[0] == 'from' and len(parts) > 1:
module_name = parts[1].strip() module_name = parts[1].strip()
if module_name in DANGEROUS_MODULES: if module_name in DANGEROUS_MODULES:
@@ -83,7 +86,7 @@ async def process_and_reply(bot: Bot, event: MessageEvent, code: str):
""" """
核心处理逻辑:安全检查、执行代码并回复结果。 核心处理逻辑:安全检查、执行代码并回复结果。
""" """
safe, message = is_code_safe(code) safe, message = await run_in_thread_pool(is_code_safe, code)
if not safe: if not safe:
await event.reply(f"代码安全检查未通过:\n{message}") await event.reply(f"代码安全检查未通过:\n{message}")
return return
@@ -150,11 +153,11 @@ async def handle_code_input(bot: Bot, event: MessageEvent):
# 处理取消操作 # 处理取消操作
if event.raw_message.strip() == "取消": if event.raw_message.strip() == "取消":
await event.reply("已取消输入。") await event.reply("已取消输入。")
return True # 消费事件 return True # 消费事件
# 执行代码 # 执行代码
await process_and_reply(bot, event, event.raw_message) await process_and_reply(bot, event, event.raw_message)
return True # 消费事件,防止被其他指令匹配 return True # 消费事件,防止被其他指令匹配
# 如果用户不在等待状态,则不处理 # 如果用户不在等待状态,则不处理
return False return False

View File

@@ -1 +0,0 @@
{}

View File

@@ -29,18 +29,26 @@ async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
await event.reply(reply_msg) await event.reply(reply_msg)
@matcher.command("赞我") @matcher.command(
async def handle_poke(bot: Bot, event: MessageEvent, args: list[str]): "赞我",
permission=MessageEvent.ADMIN,
override_permission_check=True
)
async def handle_poke(bot: Bot, event: MessageEvent, permission_granted: bool):
""" """
处理 赞我 指令,发送点赞 处理 赞我 指令,发送点赞
:param bot: Bot 实例 :param bot: Bot 实例
:param event: 消息事件对象 :param event: 消息事件对象
:param args: 指令参数列表(本指令不使用参数) :param permission_granted: 权限检查结果
""" """
if not permission_granted:
await event.reply("只有我的操作员才能让我点赞哦!(。•ˇ‸ˇ•。)")
return
try: try:
# 尝试发送赞 # 尝试发送赞
await bot.send_like(event.user_id, times=10) await bot.send_like(event.user_id, times=10)
await event.reply("戳一戳发送成功!") await event.reply("好感度+10(〃''〃)")
except Exception as e: except Exception as e:
await event.reply(f"戳一戳发送失败:{str(e)}") await event.reply(f"点赞失败了 >_<: {str(e)}")

View File

@@ -9,6 +9,7 @@ from datetime import datetime
from core.bot import Bot from core.bot import Bot
from core.command_manager import matcher from core.command_manager import matcher
from core.executor import run_in_thread_pool
from models import MessageEvent, MessageSegment from models import MessageEvent, MessageSegment
__plugin_meta__ = { __plugin_meta__ = {
@@ -77,7 +78,7 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
:param args: 指令参数列表(未使用)。 :param args: 指令参数列表(未使用)。
""" """
user_id = event.user_id user_id = event.user_id
jrcd = get_jrcd(user_id) jrcd = await run_in_thread_pool(get_jrcd, user_id)
msg = [MessageSegment.at(user_id)] msg = [MessageSegment.at(user_id)]
if jrcd <= 9: if jrcd <= 9:
msg.append(MessageSegment.text(random.choice(JRCDMSG_1) % jrcd)) msg.append(MessageSegment.text(random.choice(JRCDMSG_1) % jrcd))
@@ -112,8 +113,8 @@ async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]):
await event.reply("不能和自己比!") await event.reply("不能和自己比!")
return return
jrcd1 = get_jrcd(user_id1) jrcd1 = await run_in_thread_pool(get_jrcd, user_id1)
jrcd2 = get_jrcd(user_id2) jrcd2 = await run_in_thread_pool(get_jrcd, user_id2)
jrcz = jrcd1 - jrcd2 jrcz = jrcd1 - jrcd2

View File

@@ -0,0 +1,88 @@
"""
同步/异步函数测试插件
用于演示 SyncHandlerError 异常以及如何将同步函数放入线程池执行。
"""
import time
from typing import Any
from core.command_manager import matcher
from core.executor import run_in_thread_pool
from core.bot import Bot
from core.logger import logger
# 插件元数据
__plugin_meta__ = {
"name": "SyncAsyncTestPlugin",
"description": "用于测试同步/异步函数处理的插件。",
"usage": (
"/test_sync_error - 尝试注册一个同步函数作为异步处理器,会触发错误。\n"
"/test_blocking_task <duration> - 演示将同步阻塞任务放入线程池执行。"
),
}
# --- 示例 1: 触发 SyncHandlerError (此函数不会被成功注册) ---
# 这是一个同步函数,如果直接用 @matcher.message_handler 装饰,
# 并且 event_handler 检查到它是同步的,就会抛出 SyncHandlerError。
# 注意:为了演示错误,我们不会真正注册它,因为注册会失败。
def _sync_function_that_should_fail(bot: Bot, event: Any):
"""
一个同步函数,如果直接作为异步事件处理器注册,会触发 SyncHandlerError。
"""
logger.info("这个同步函数不应该被直接调用。")
return "这是一个同步函数的结果。"
# --- 示例 2: 将同步阻塞任务放入线程池运行 ---
def _blocking_task(duration: int) -> str:
"""
一个模拟耗时操作的同步函数。
Args:
duration (int): 模拟阻塞的秒数。
Returns:
str: 任务完成消息。
"""
logger.info(f"同步阻塞任务开始,持续 {duration} 秒...")
time.sleep(duration)
logger.info("同步阻塞任务结束。")
return f"阻塞任务完成,耗时 {duration} 秒。"
@matcher.message_handler.command("test_blocking_task")
async def test_blocking_task_handler(bot: Bot, event: Any, args: list):
"""
处理 /test_blocking_task 命令,将同步阻塞任务放入线程池执行。
Args:
bot (Bot): 机器人实例。
event (Any): 接收到的事件对象。
args (list): 命令参数列表。
"""
if not args:
await bot.send(event, "请提供阻塞时长,例如:/test_blocking_task 5")
return
try:
duration = int(args[0])
if duration <= 0:
raise ValueError("时长必须是正整数。")
except ValueError:
await bot.send(event, "无效的时长,请提供一个正整数。")
return
await bot.send(event, f"开始执行同步阻塞任务,预计耗时 {duration} 秒...")
# 将同步函数放入线程池执行
result = await run_in_thread_pool(_blocking_task, duration)
await bot.send(event, f"同步阻塞任务已完成:{result}")
# --- 示例 3: 尝试注册一个同步函数作为异步处理器 (会失败) ---
# 这个函数不会被成功注册,因为 event_handler 会检测到它是同步的并抛出 SyncHandlerError。
# 插件管理器会捕获这个错误并跳过加载此插件。
# 为了演示,我们故意尝试注册它。
# @matcher.message_handler.command("test_sync_error")
# def test_sync_error_handler(bot: Bot, event: Any):
# """
# 这个同步函数尝试作为异步处理器注册,会触发 SyncHandlerError。
# """
# logger.error("这个同步函数不应该被直接注册为异步处理器。")
# return "这个消息不应该被看到。"