添加注释,增加redis支持,添加了聊天记录构建支持

This commit is contained in:
2026-01-02 17:10:42 +08:00
parent 3163bbf8c1
commit 01b83803c1
24 changed files with 965 additions and 313 deletions

109
README.md
View File

@@ -39,6 +39,42 @@ NEO 框架的设计遵循以下核心理念:
* **异步核心**:基于 `asyncio``websockets` 的高性能异步核心。 * **异步核心**:基于 `asyncio``websockets` 的高性能异步核心。
* **自动重连**:内置 WebSocket 断线重连机制。 * **自动重连**:内置 WebSocket 断线重连机制。
## ⚡️ 性能优化
### Redis 缓存机制
为了提升响应速度并减少对 OneBot API 的重复调用,框架核心集成了一套基于 Redis 的缓存系统。对于一些不频繁变更的数据(如群信息、好友列表等),首次查询后会将其缓存至 Redis在缓存有效期内默认为 1 小时),后续请求将直接从 Redis 读取,极大提升了性能。
#### 工作原理
- **自动缓存**:框架会自动缓存特定 API 的调用结果。
- **缓存键**:缓存键根据 API 名称和关键参数(如 `group_id`, `user_id`)生成,确保唯一性。
- **过期时间**:默认缓存 1 小时,之后会自动失效,下次调用时将重新从 OneBot 实现端获取最新数据。
#### 受影响的 API
以下核心 API 已默认启用缓存:
- `get_group_info`
- `get_group_member_info`
- `get_friend_list`
- `get_stranger_info`
- `get_login_info`
#### 如何绕过缓存
在某些场景下,你可能需要获取实时数据而非缓存数据。为此,所有受缓存影响的 API 方法都增加了一个 `no_cache: bool = False` 的可选参数。
当你需要强制从服务器获取最新信息时,只需在调用时传入 `no_cache=True` 即可。
**示例:**
```python
# 正常调用,会使用缓存
group_info = await bot.get_group_info(group_id=12345)
# 强制获取最新信息,不使用缓存
latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True)
```
### `__slots__` 内存优化
框架内的所有数据模型包括事件、消息段、API 返回对象等)均已启用 `__slots__ = True` 优化。这可以显著减少每个对象实例的内存占用,特别是在处理大量事件和数据时,能够有效降低机器人的整体内存消耗。
## 📝 待办事项 (TODO) ## 📝 待办事项 (TODO)
### API 封装 ### API 封装
@@ -90,10 +126,11 @@ NEO 框架的设计遵循以下核心理念:
- [ ] **系统控制** - [ ] **系统控制**
- `set_restart` - `set_restart`
- [ ] **扩展功能** - [ ] **扩展功能**
- `send_forward_msg`: 发送合并转发消息 - [x] `send_forward_msg`: 发送合并转发消息
### 其他改进 ### 其他改进
- [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。 - [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。
- [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。
- [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制。 - [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制。
- [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。 - [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。
- [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。 - [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。
@@ -104,7 +141,10 @@ NEO 框架的设计遵循以下核心理念:
``` ```
NEO/ NEO/
├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载) ├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载)
── echo.py # 示例插件:实现 /echo 和 /赞我 指令 ── echo.py # 示例插件:实现 /echo 和 /赞我 指令
│ ├── forward_test.py # 示例插件:演示合并转发消息的构建和发送
│ ├── jrcd.py # 娱乐插件:提供 /jrcd 和 /bbcd 指令
│ └── thpic.py # 图片插件:提供 /thpic 指令,发送随机东方图片
├── core/ # 核心框架代码 ├── core/ # 核心框架代码
│ ├── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI) │ ├── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI)
│ │ ├── __init__.py │ │ ├── __init__.py
@@ -117,6 +157,7 @@ NEO/
│ ├── command_manager.py # 命令与事件分发器 │ ├── command_manager.py # 命令与事件分发器
│ ├── config_loader.py # 配置加载器 │ ├── config_loader.py # 配置加载器
│ ├── plugin_manager.py # 插件加载与管理 │ ├── plugin_manager.py # 插件加载与管理
│ ├── redis_manager.py # Redis 连接管理器
│ └── ws.py # WebSocket 客户端核心 │ └── ws.py # WebSocket 客户端核心
├── models/ # 数据模型 ├── models/ # 数据模型
│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta) │ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta)
@@ -378,6 +419,70 @@ async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]):
5. **性能考虑**:避免在插件中执行耗时同步操作 5. **性能考虑**:避免在插件中执行耗时同步操作
6. **资源清理**:必要时使用 `try...finally` 确保资源释放 6. **资源清理**:必要时使用 `try...finally` 确保资源释放
### 🚀 高性能插件开发规范 (避坑指南)
为了保证整个机器人框架的响应速度和稳定性,所有插件都必须遵循异步、非阻塞的开发原则。任何一个插件中的阻塞操作都可能导致整个机器人卡顿或无响应。
以下是必须遵守的核心规范:
#### 1. 禁止任何形式的同步网络请求
- **错误示范**: 使用 `requests` 库发起网络请求。
```python
import requests
# 错误!这会阻塞整个程序
response = requests.get("https://api.example.com")
```
- **正确做法**: 必须使用异步 HTTP 客户端,如 `aiohttp` 或 `httpx`。
```python
import httpx
# 正确,使用 async with 和 await
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
```
#### 2. 禁止使用 `time.sleep()`
- **错误示范**: 使用 `time.sleep()` 进行等待。
```python
import time
# 错误!这会阻塞事件循环
time.sleep(5)
```
- **正确做法**: 必须使用 `asyncio.sleep()`。
```python
import asyncio
# 正确,这会将控制权交还给事件循环
await asyncio.sleep(5)
```
#### 3. 谨慎处理文件 I/O
- 对于读写小型、本地文件,直接使用 `with open(...)` 通常是可接受的。
- 但对于大型文件或网络文件系统NFS上的文件同步 I/O 可能会导致明显的阻塞。
- **推荐做法**: 对于可能耗时较长的文件操作,使用 `aiofiles` 库。
```python
import aiofiles
async with aiofiles.open('large_file.dat', mode='rb') as f:
contents = await f.read()
```
#### 4. 将 CPU 密集型任务移出事件循环
- 如果插件需要执行复杂的计算(例如,图像处理、视频转码、数据分析),这些任务会长时间占用 CPU同样会阻塞事件循环。
- **正确做法**: 使用 `loop.run_in_executor()` 将这类任务抛到独立的线程池或进程池中执行。
```python
import asyncio
def cpu_bound_task(data):
# 这是一个耗时的同步函数
# ... 进行大量计算 ...
return "计算结果"
# 在异步的事件处理器中调用
loop = asyncio.get_running_loop()
# `None` 表示使用默认的线程池
result = await loop.run_in_executor(None, cpu_bound_task, "一些数据")
await event.reply(result)
```
遵循以上规范,可以确保您开发的插件不会成为整个机器人应用的性能瓶颈。
### 示例:完整插件模板 ### 示例:完整插件模板
```python ```python
""" """

View File

@@ -5,3 +5,9 @@ reconnect_interval = 5
[bot] [bot]
command = ["/"] command = ["/"]
[redis]
host = "114.66.58.203"
port = 1931
db = 0
password = "redis_5dxyJG"

View File

@@ -1,78 +1,106 @@
""" """
账号相关 API 模块 账号与状态相关 API 模块
该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、
状态设置等相关的 OneBot v11 API 封装。
""" """
import json
from typing import Dict, Any from typing import Dict, Any
from .base import BaseAPI from .base import BaseAPI
from models.objects import LoginInfo, VersionInfo, Status from models.objects import LoginInfo, VersionInfo, Status
from core.redis_manager import redis_client as redis_manager
class AccountAPI(BaseAPI): class AccountAPI(BaseAPI):
""" """
账号相关 API Mixin `AccountAPI` Mixin 类,提供了所有与机器人账号、状态相关的 API 方法。
""" """
async def get_login_info(self) -> LoginInfo: async def get_login_info(self, no_cache: bool = False) -> LoginInfo:
""" """
获取登录号信息 获取当前登录的机器人账号信息
:return: 登录信息对象 Args:
no_cache (bool, optional): 是否不使用缓存直接从服务器获取最新信息。Defaults to False.
Returns:
LoginInfo: 包含登录号 QQ 和昵称的 `LoginInfo` 数据对象。
""" """
cache_key = f"neobot:cache:get_login_info:{self.self_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return LoginInfo(**json.loads(cached_data))
res = await self.call_api("get_login_info") res = await self.call_api("get_login_info")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return LoginInfo(**res) return LoginInfo(**res)
async def get_version_info(self) -> VersionInfo: async def get_version_info(self) -> VersionInfo:
""" """
获取版本信息 获取 OneBot v11 实现的版本信息
:return: 版本信息对象 Returns:
VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。
""" """
res = await self.call_api("get_version_info") res = await self.call_api("get_version_info")
return VersionInfo(**res) return VersionInfo(**res)
async def get_status(self) -> Status: async def get_status(self) -> Status:
""" """
获取状态 获取 OneBot v11 实现的状态信息。
:return: 状态对象 Returns:
Status: 包含 OneBot 状态信息的 `Status` 数据对象。
""" """
res = await self.call_api("get_status") res = await self.call_api("get_status")
return Status(**res) return Status(**res)
async def bot_exit(self) -> Dict[str, Any]: async def bot_exit(self) -> Dict[str, Any]:
""" """
退出机器人 机器人进程退出(需要实现端支持)。
:return: API 响应结果 Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("bot_exit") return await self.call_api("bot_exit")
async def set_self_longnick(self, long_nick: str) -> Dict[str, Any]: async def set_self_longnick(self, long_nick: str) -> Dict[str, Any]:
""" """
设置个性签名 设置机器人账号的个性签名
:param long_nick: 个性签名内容 Args:
:return: API 响应结果 long_nick (str): 要设置的个性签名内容。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_self_longnick", {"longNick": long_nick}) return await self.call_api("set_self_longnick", {"longNick": long_nick})
async def set_input_status(self, user_id: int, event_type: int) -> Dict[str, Any]: async def set_input_status(self, user_id: int, event_type: int) -> Dict[str, Any]:
""" """
设置输入状态 设置 "对方正在输入..." 状态提示。
:param user_id: 用户 ID Args:
:param event_type: 事件类型 user_id (int): 目标用户的 QQ 号。
:return: API 响应结果 event_type (int): 事件类型。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_input_status", {"user_id": user_id, "event_type": event_type}) return await self.call_api("set_input_status", {"user_id": user_id, "event_type": event_type})
async def set_diy_online_status(self, face_id: int, face_type: int, wording: str) -> Dict[str, Any]: async def set_diy_online_status(self, face_id: int, face_type: int, wording: str) -> Dict[str, Any]:
""" """
设置自定义在线状态 设置自定义"在线状态"
:param face_id: 状态 ID Args:
:param face_type: 状态类型 face_id (int): 状态的表情 ID。
:param wording: 状态描述 face_type (int): 状态的表情类型。
:return: API 响应结果 wording (str): 状态的描述文本。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_diy_online_status", { return await self.call_api("set_diy_online_status", {
"face_id": face_id, "face_id": face_id,
@@ -82,43 +110,55 @@ class AccountAPI(BaseAPI):
async def set_online_status(self, status_code: int) -> Dict[str, Any]: async def set_online_status(self, status_code: int) -> Dict[str, Any]:
""" """
设置在线状态 设置在线状态(如在线、离开、摸鱼等)。
:param status_code: 状态码 Args:
:return: API 响应结果 status_code (int): 目标在线状态的状态码。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_online_status", {"status_code": status_code}) return await self.call_api("set_online_status", {"status_code": status_code})
async def set_qq_profile(self, **kwargs) -> Dict[str, Any]: async def set_qq_profile(self, **kwargs) -> Dict[str, Any]:
""" """
设置 QQ 资料 设置机器人账号的个人资料
:param kwargs: 个人资料相关参数 Args:
:return: API 响应结果 **kwargs: 个人资料的相关参数,具体字段请参考 OneBot v11 规范。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_qq_profile", kwargs) return await self.call_api("set_qq_profile", kwargs)
async def set_qq_avatar(self, **kwargs) -> Dict[str, Any]: async def set_qq_avatar(self, **kwargs) -> Dict[str, Any]:
""" """
设置 QQ 头像 设置机器人账号的头像
:param kwargs: 头像相关参数 Args:
:return: API 响应结果 **kwargs: 头像的相关参数,具体字段请参考 OneBot v11 规范。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_qq_avatar", kwargs) return await self.call_api("set_qq_avatar", kwargs)
async def get_clientkey(self) -> Dict[str, Any]: async def get_clientkey(self) -> Dict[str, Any]:
""" """
获取客户端密钥 获取客户端密钥(通常用于 QQ 登录相关操作)。
:return: API 响应结果 Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("get_clientkey") return await self.call_api("get_clientkey")
async def clean_cache(self) -> Dict[str, Any]: async def clean_cache(self) -> Dict[str, Any]:
""" """
清理缓存 清理 OneBot v11 实现端的缓存
:return: API 响应结果 Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("clean_cache") return await self.call_api("clean_cache")

View File

@@ -1,53 +1,86 @@
""" """
好友相关 API 模块 好友与陌生人相关 API 模块
该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息
等相关的 OneBot v11 API 封装。
""" """
import json
from typing import List, Dict, Any from typing import List, Dict, Any
from .base import BaseAPI from .base import BaseAPI
from models.objects import FriendInfo, StrangerInfo from models.objects import FriendInfo, StrangerInfo
from core.redis_manager import redis_client as redis_manager
class FriendAPI(BaseAPI): class FriendAPI(BaseAPI):
""" """
好友相关 API Mixin `FriendAPI` Mixin 类,提供了所有与好友、陌生人操作相关 API 方法。
""" """
async def send_like(self, user_id: int, times: int = 1) -> Dict[str, Any]: async def send_like(self, user_id: int, times: int = 1) -> Dict[str, Any]:
""" """
发送点赞 向指定用户发送 "戳一戳" (点赞)。
:param user_id: 对方 QQ 号 Args:
:param times: 点赞次数 user_id (int): 目标用户的 QQ 号。
:return: API 响应结果 times (int, optional): 点赞次数,建议不超过 10 次。Defaults to 1.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("send_like", {"user_id": user_id, "times": times}) return await self.call_api("send_like", {"user_id": user_id, "times": times})
async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> StrangerInfo: async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> StrangerInfo:
""" """
获取陌生人信息 获取陌生人信息
:param user_id: QQ 号 Args:
:param no_cache: 是否不使用缓存 user_id (int): 目标用户的 QQ 号。
:return: 陌生人信息对象 no_cache (bool, optional): 是否不使用缓存直接从服务器获取。Defaults to False.
Returns:
StrangerInfo: 包含陌生人信息的 `StrangerInfo` 数据对象。
""" """
cache_key = f"neobot:cache:get_stranger_info:{user_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return StrangerInfo(**json.loads(cached_data))
res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache})
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return StrangerInfo(**res) return StrangerInfo(**res)
async def get_friend_list(self) -> List[FriendInfo]: async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]:
""" """
获取好友列表 获取机器人账号的好友列表
:return: 好友信息对象列表 Args:
no_cache (bool, optional): 是否不使用缓存直接从服务器获取最新信息。Defaults to False.
Returns:
List[FriendInfo]: 包含所有好友信息的 `FriendInfo` 对象列表。
""" """
cache_key = f"neobot:cache:get_friend_list:{self.self_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return [FriendInfo(**item) for item in json.loads(cached_data)]
res = await self.call_api("get_friend_list") res = await self.call_api("get_friend_list")
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return [FriendInfo(**item) for item in res] return [FriendInfo(**item) for item in res]
async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]: async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]:
""" """
处理加好友请求 处理收到的加好友请求
:param flag: 加好友请求的 flag需从上报的数据中获取 Args:
:param approve: 是否同意请求 flag (str): 请求的标识,需要从 `request` 事件中获取。
:param remark: 添加后的好友备注(仅在同意时有效) approve (bool, optional): 是否同意该好友请求。Defaults to True.
:return: API 响应结果 remark (str, optional): 在同意请求时为该好友设置的备注。Defaults to "".
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark}) return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark})

View File

@@ -1,47 +1,64 @@
""" """
群组相关 API 模块 群组相关 API 模块
该模块定义了 `GroupAPI` Mixin 类,提供了所有与群组管理、成员操作
等相关的 OneBot v11 API 封装。
""" """
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
import json
from core.redis_manager import redis_client as redis_manager
from .base import BaseAPI from .base import BaseAPI
from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo
class GroupAPI(BaseAPI): class GroupAPI(BaseAPI):
""" """
群组相关 API Mixin `GroupAPI` Mixin 类,提供了所有与群组操作相关的 API 方法。
""" """
async def set_group_kick(self, group_id: int, user_id: int, reject_add_request: bool = False) -> Dict[str, Any]: async def set_group_kick(self, group_id: int, user_id: int, reject_add_request: bool = False) -> Dict[str, Any]:
""" """
群组踢人 将指定成员踢出群组。
:param group_id: 群号 Args:
:param user_id: 要踢的 QQ 号 group_id (int): 目标群组的群号。
:param reject_add_request: 拒绝此人的加群请求 user_id (int): 要踢出的成员的 QQ 号。
:return: API 响应结果 reject_add_request (bool, optional): 是否拒绝该用户此后的加群请求。Defaults to False.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_kick", {"group_id": group_id, "user_id": user_id, "reject_add_request": reject_add_request}) return await self.call_api("set_group_kick", {"group_id": group_id, "user_id": user_id, "reject_add_request": reject_add_request})
async def set_group_ban(self, group_id: int, user_id: int, duration: int = 30 * 60) -> Dict[str, Any]: async def set_group_ban(self, group_id: int, user_id: int, duration: int = 1800) -> Dict[str, Any]:
""" """
群组单人禁言 禁言群组中的指定成员。
:param group_id: 群号 Args:
:param user_id: 要禁言的 QQ 号 group_id (int): 目标群组的群号。
:param duration: 禁言时长0 表示解除禁言 user_id (int): 禁言的成员的 QQ 号。
:return: API 响应结果 duration (int, optional): 禁言时长,单位为秒。设置为 0 表示解除禁言。
Defaults to 1800 (30 分钟).
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_ban", {"group_id": group_id, "user_id": user_id, "duration": duration}) return await self.call_api("set_group_ban", {"group_id": group_id, "user_id": user_id, "duration": duration})
async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 30 * 60, flag: str = None) -> Dict[str, Any]: async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 1800, flag: str = None) -> Dict[str, Any]:
""" """
群组匿名禁言 禁言群组中的匿名用户。
:param group_id: 群号 Args:
:param anonymous: 可选,要禁言的匿名用户对象(群消息事件的 anonymous 字段) group_id (int): 目标群组的群号。
:param duration: 禁言时长(秒) anonymous (Dict[str, Any], optional): 禁言的匿名用户对象,
:param flag: 可选,要禁言的匿名用户的 flag从群消息事件的 anonymous 字段中获取 从群消息事件的 `anonymous` 字段中获取。Defaults to None.
:return: API 响应结果 duration (int, optional): 禁言时长单位为秒。Defaults to 1800.
flag (str, optional): 要禁言的匿名用户的 flag 标识,
可从群消息事件的 `anonymous` 字段中获取。Defaults to None.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
params = {"group_id": group_id, "duration": duration} params = {"group_id": group_id, "duration": duration}
if anonymous: if anonymous:
@@ -52,139 +69,196 @@ class GroupAPI(BaseAPI):
async def set_group_whole_ban(self, group_id: int, enable: bool = True) -> Dict[str, Any]: async def set_group_whole_ban(self, group_id: int, enable: bool = True) -> Dict[str, Any]:
""" """
群组全员禁言 开启或关闭群组全员禁言
:param group_id: 群号 Args:
:param enable: 是否开启 group_id (int): 目标群组的群号。
:return: API 响应结果 enable (bool, optional): True 表示开启全员禁言False 表示关闭。Defaults to True.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_whole_ban", {"group_id": group_id, "enable": enable}) return await self.call_api("set_group_whole_ban", {"group_id": group_id, "enable": enable})
async def set_group_admin(self, group_id: int, user_id: int, enable: bool = True) -> Dict[str, Any]: async def set_group_admin(self, group_id: int, user_id: int, enable: bool = True) -> Dict[str, Any]:
""" """
群组设置管理员 设置或取消群组成员的管理员权限。
:param group_id: 群号 Args:
:param user_id: 要设置的 QQ 号 group_id (int): 目标群组的群号。
:param enable: True 为设置False 为取消 user_id (int): 目标成员的 QQ 号。
:return: API 响应结果 enable (bool, optional): True 表示设为管理员False 表示取消管理员。Defaults to True.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_admin", {"group_id": group_id, "user_id": user_id, "enable": enable}) return await self.call_api("set_group_admin", {"group_id": group_id, "user_id": user_id, "enable": enable})
async def set_group_anonymous(self, group_id: int, enable: bool = True) -> Dict[str, Any]: async def set_group_anonymous(self, group_id: int, enable: bool = True) -> Dict[str, Any]:
""" """
群组匿名 开启或关闭群组匿名聊天功能。
:param group_id: 群号 Args:
:param enable: 是否开启 group_id (int): 目标群组的群号。
:return: API 响应结果 enable (bool, optional): True 表示开启匿名False 表示关闭。Defaults to True.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_anonymous", {"group_id": group_id, "enable": enable}) return await self.call_api("set_group_anonymous", {"group_id": group_id, "enable": enable})
async def set_group_card(self, group_id: int, user_id: int, card: str = "") -> Dict[str, Any]: async def set_group_card(self, group_id: int, user_id: int, card: str = "") -> Dict[str, Any]:
""" """
设置群名片(群备注) 设置群组成员的群名片。
:param group_id: 群号 Args:
:param user_id: 要设置的 QQ 号 group_id (int): 目标群组的群号。
:param card: 群名片内容,不填或空字符串表示删除群名片 user_id (int): 目标成员的 QQ 号。
:return: API 响应结果 card (str, optional): 要设置的群名片内容。
传入空字符串 `""` 或 `None` 表示删除该成员的群名片。Defaults to "".
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_card", {"group_id": group_id, "user_id": user_id, "card": card}) return await self.call_api("set_group_card", {"group_id": group_id, "user_id": user_id, "card": card})
async def set_group_name(self, group_id: int, group_name: str) -> Dict[str, Any]: async def set_group_name(self, group_id: int, group_name: str) -> Dict[str, Any]:
""" """
设置群 设置群组的名称。
:param group_id: 群号 Args:
:param group_name: 新群名 group_id (int): 目标群组的群号。
:return: API 响应结果 group_name (str): 新的群组名称。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_name", {"group_id": group_id, "group_name": group_name}) return await self.call_api("set_group_name", {"group_id": group_id, "group_name": group_name})
async def set_group_leave(self, group_id: int, is_dismiss: bool = False) -> Dict[str, Any]: async def set_group_leave(self, group_id: int, is_dismiss: bool = False) -> Dict[str, Any]:
""" """
退出群组 退出或解散一个群组
:param group_id: 群号 Args:
:param is_dismiss: 是否解散,如果登录号是群主,则仅在此项为 True 时能够解散 group_id (int): 目标群组的群号。
:return: API 响应结果 is_dismiss (bool, optional): 是否解散群组。
仅当机器人是群主时,此项设为 True 才能解散群。Defaults to False.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_leave", {"group_id": group_id, "is_dismiss": is_dismiss}) return await self.call_api("set_group_leave", {"group_id": group_id, "is_dismiss": is_dismiss})
async def set_group_special_title(self, group_id: int, user_id: int, special_title: str = "", duration: int = -1) -> Dict[str, Any]: async def set_group_special_title(self, group_id: int, user_id: int, special_title: str = "", duration: int = -1) -> Dict[str, Any]:
""" """
设置群组专属头衔 为群组成员设置专属头衔
:param group_id: 群号 Args:
:param user_id: 要设置的 QQ 号 group_id (int): 目标群组的群号。
:param special_title: 专属头衔,不填或空字符串表示删除 user_id (int): 目标成员的 QQ 号。
:param duration: 有效期(秒),-1 表示永久 special_title (str, optional): 专属头衔内容。
:return: API 响应结果 传入空字符串 `""` 或 `None` 表示删除头衔。Defaults to "".
duration (int, optional): 头衔有效期,单位为秒。-1 表示永久。Defaults to -1.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_special_title", {"group_id": group_id, "user_id": user_id, "special_title": special_title, "duration": duration}) return await self.call_api("set_group_special_title", {"group_id": group_id, "user_id": user_id, "special_title": special_title, "duration": duration})
async def get_group_info(self, group_id: int, no_cache: bool = False) -> GroupInfo: async def get_group_info(self, group_id: int, no_cache: bool = False) -> GroupInfo:
""" """
获取群信息 获取群组的详细信息
:param group_id: 群号 Args:
:param no_cache: 是否不使用缓存 group_id (int): 目标群组的群号。
:return: 群信息对象 no_cache (bool, optional): 是否不使用缓存直接从服务器获取最新信息。Defaults to False.
Returns:
GroupInfo: 包含群组信息的 `GroupInfo` 数据对象。
""" """
res = await self.call_api("get_group_info", {"group_id": group_id, "no_cache": no_cache}) cache_key = f"neobot:cache:get_group_info:{group_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return GroupInfo(**json.loads(cached_data))
res = await self.call_api("get_group_info", {"group_id": group_id})
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return GroupInfo(**res) return GroupInfo(**res)
async def get_group_list(self) -> List[GroupInfo]: async def get_group_list(self) -> List[GroupInfo]:
""" """
获取列表 获取机器人加入的所有群组的列表
:return: 群信息对象列表 Returns:
List[GroupInfo]: 包含所有群组信息的 `GroupInfo` 对象列表。
""" """
res = await self.call_api("get_group_list") res = await self.call_api("get_group_list")
return [GroupInfo(**item) for item in res] return [GroupInfo(**item) for item in res]
async def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> GroupMemberInfo: async def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> GroupMemberInfo:
""" """
获取群成员信息 获取指定群组成员的详细信息
:param group_id: 群号 Args:
:param user_id: QQ 号 group_id (int): 目标群组的群号。
:param no_cache: 是否不使用缓存 user_id (int): 目标成员的 QQ 号。
:return: 群成员信息对象 no_cache (bool, optional): 是否不使用缓存。Defaults to False.
Returns:
GroupMemberInfo: 包含群成员信息的 `GroupMemberInfo` 数据对象。
""" """
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id, "no_cache": no_cache}) cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return GroupMemberInfo(**json.loads(cached_data))
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id})
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return GroupMemberInfo(**res) return GroupMemberInfo(**res)
async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]: async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]:
""" """
获取成员列表 获取一个群组的所有成员列表
:param group_id: 群号 Args:
:return: 群成员信息对象列表 group_id (int): 目标群组的群号。
Returns:
List[GroupMemberInfo]: 包含所有群成员信息的 `GroupMemberInfo` 对象列表。
""" """
res = await self.call_api("get_group_member_list", {"group_id": group_id}) res = await self.call_api("get_group_member_list", {"group_id": group_id})
return [GroupMemberInfo(**item) for item in res] return [GroupMemberInfo(**item) for item in res]
async def get_group_honor_info(self, group_id: int, type: str) -> GroupHonorInfo: async def get_group_honor_info(self, group_id: int, type: str) -> GroupHonorInfo:
""" """
获取群荣誉信息 获取群组的荣誉信息(如龙王、群聊之火等)。
:param group_id: 群号 Args:
:param type: 要获取的群荣誉类型,可传入 talkative, performer, legend, strong_newbie, emotion 等 group_id (int): 目标群组的群号。
:return: 群荣誉信息对象 type (str): 要获取的荣誉类型。
可选值: "talkative", "performer", "legend", "strong_newbie", "emotion" 等。
Returns:
GroupHonorInfo: 包含群荣誉信息的 `GroupHonorInfo` 数据对象。
""" """
res = await self.call_api("get_group_honor_info", {"group_id": group_id, "type": type}) res = await self.call_api("get_group_honor_info", {"group_id": group_id, "type": type})
return GroupHonorInfo(**res) return GroupHonorInfo(**res)
async def set_group_add_request(self, flag: str, sub_type: str, approve: bool = True, reason: str = "") -> Dict[str, Any]: async def set_group_add_request(self, flag: str, sub_type: str, approve: bool = True, reason: str = "") -> Dict[str, Any]:
""" """
处理加群请求/邀请 处理加群请求邀请
:param flag: 加群请求的 flag需从上报的数据中获取 Args:
:param sub_type: add 或 invite请求类型需要与上报消息中的 sub_type 字段相符) flag (str): 请求的标识,需要从 `request` 事件中获取。
:param approve: 是否同意请求/邀请 sub_type (str): 请求的子类型,`add` 或 `invite`
:param reason: 拒绝理由(仅在拒绝时有效) 需要与 `request` 事件中的 `sub_type` 字段相符。
:return: API 响应结果 approve (bool, optional): 是否同意请求或邀请。Defaults to True.
reason (str, optional): 拒绝加群的理由(仅在 `approve` 为 False 时有效。Defaults to "".
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason}) return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason})

View File

@@ -1,5 +1,8 @@
""" """
消息相关 API 模块 消息相关 API 模块
该模块定义了 `MessageAPI` Mixin 类,提供了所有与消息发送、撤回、
转发等相关的 OneBot v11 API 封装。
""" """
from typing import Union, List, Dict, Any, TYPE_CHECKING from typing import Union, List, Dict, Any, TYPE_CHECKING
from .base import BaseAPI from .base import BaseAPI
@@ -10,17 +13,22 @@ if TYPE_CHECKING:
class MessageAPI(BaseAPI): class MessageAPI(BaseAPI):
""" """
消息相关 API Mixin `MessageAPI` Mixin 类,提供了所有与消息操作相关的 API 方法。
""" """
async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]: async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
""" """
发送群消息 发送群消息
:param group_id: 群号 Args:
:param message: 消息内容可以是字符串、MessageSegment 对象或 MessageSegment 列表 group_id (int): 目标群组的群号。
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效) message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
:return: API 响应结果 可以是纯文本字符串、单个消息段对象或消息段列表。
auto_escape (bool, optional): 仅当 `message` 为字符串时有效,
是否对消息内容进行 CQ 码转义。Defaults to False.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api( return await self.call_api(
"send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape} "send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape}
@@ -28,12 +36,15 @@ class MessageAPI(BaseAPI):
async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]: async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
""" """
发送私聊消息 发送私聊消息
:param user_id: 用户 QQ 号 Args:
:param message: 消息内容可以是字符串、MessageSegment 对象或 MessageSegment 列表 user_id (int): 目标用户的 QQ 号。
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效) message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
:return: API 响应结果 auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api( return await self.call_api(
"send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape} "send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape}
@@ -41,12 +52,18 @@ class MessageAPI(BaseAPI):
async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]: async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
""" """
智能发送消息,根据事件类型自动选择发送方式 智能发送消息
:param event: 触发事件对象 该方法会根据传入的事件对象 `event` 自动判断是私聊还是群聊,
:param message: 消息内容 并调用相应的发送函数。如果事件是消息事件,则优先使用 `reply` 方法。
:param auto_escape: 是否自动转义
:return: API 响应结果 Args:
event (OneBotEvent): 触发该发送行为的事件对象。
message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
# 如果是消息事件,直接调用 reply # 如果是消息事件,直接调用 reply
if hasattr(event, "reply"): if hasattr(event, "reply"):
@@ -66,53 +83,98 @@ class MessageAPI(BaseAPI):
async def delete_msg(self, message_id: int) -> Dict[str, Any]: async def delete_msg(self, message_id: int) -> Dict[str, Any]:
""" """
撤回消息 撤回一条消息
:param message_id: 消息 ID Args:
:return: API 响应结果 message_id (int): 要撤回的消息的 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("delete_msg", {"message_id": message_id}) return await self.call_api("delete_msg", {"message_id": message_id})
async def get_msg(self, message_id: int) -> Dict[str, Any]: async def get_msg(self, message_id: int) -> Dict[str, Any]:
""" """
获取消息 获取一条消息的详细信息。
:param message_id: 消息 ID Args:
:return: API 响应结果 message_id (int): 要获取的消息的 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据,包含消息详情。
""" """
return await self.call_api("get_msg", {"message_id": message_id}) return await self.call_api("get_msg", {"message_id": message_id})
async def get_forward_msg(self, id: str) -> Dict[str, Any]: async def get_forward_msg(self, id: str) -> Dict[str, Any]:
""" """
获取合并转发消息 获取合并转发消息的内容。
:param id: 合并转发 ID Args:
:return: API 响应结果 id (str): 合并转发消息的 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据,包含转发消息的节点列表。
""" """
return await self.call_api("get_forward_msg", {"id": id}) return await self.call_api("get_forward_msg", {"id": id})
async def send_group_forward_msg(self, group_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
发送群聊合并转发消息。
Args:
group_id (int): 目标群组的群号。
messages (List[Dict[str, Any]]): 消息节点列表。
推荐使用 `bot.build_forward_node` 来构建节点。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("send_group_forward_msg", {"group_id": group_id, "messages": messages})
async def send_private_forward_msg(self, user_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
发送私聊合并转发消息。
Args:
user_id (int): 目标用户的 QQ 号。
messages (List[Dict[str, Any]]): 消息节点列表。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("send_private_forward_msg", {"user_id": user_id, "messages": messages})
async def can_send_image(self) -> Dict[str, Any]: async def can_send_image(self) -> Dict[str, Any]:
""" """
检查是否可以发送图片 检查当前机器人账号是否可以发送图片
:return: API 响应结果 Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("can_send_image") return await self.call_api("can_send_image")
async def can_send_record(self) -> Dict[str, Any]: async def can_send_record(self) -> Dict[str, Any]:
""" """
检查是否可以发送语音 检查当前机器人账号是否可以发送语音
:return: API 响应结果 Returns:
Dict[str, Any]: OneBot API 的响应数据。
""" """
return await self.call_api("can_send_record") return await self.call_api("can_send_record")
def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]: def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]:
""" """
处理消息内容,将其转换为 API 可接受的格式 内部方法:将消息内容处理成 OneBot API 可接受的格式
:param message: 原始消息内容 - `str` -> `str`
:return: 处理后的消息内容 - `MessageSegment` -> `List[Dict]`
- `List[MessageSegment]` -> `List[Dict]`
Args:
message: 原始消息内容。
Returns:
处理后的消息内容。
""" """
if isinstance(message, str): if isinstance(message, str):
return message return message
@@ -130,12 +192,16 @@ class MessageAPI(BaseAPI):
def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]: def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]:
""" """
将 MessageSegment 对象转换为字典 内部方法:`MessageSegment` 对象转换为字典
:param segment: MessageSegment 对象 Args:
:return: 字典格式的消息段 segment (MessageSegment): 消息段对象。
Returns:
Dict[str, Any]: 符合 OneBot 规范的消息段字典。
""" """
return { return {
"type": segment.type, "type": segment.type,
"data": segment.data "data": segment.data
} }

View File

@@ -1,9 +1,18 @@
""" """
Bot 抽象模块 Bot 核心抽象模块
定义了 Bot 类,封装了 OneBot API 的调用逻辑,提供了便捷的消息发送方法 该模块定义了 `Bot` 类,它是与 OneBot v11 API 进行交互的主要接口
`Bot` 类通过继承 `api` 目录下的各个 Mixin 类,将不同类别的 API 调用
整合在一起,提供了一个统一、便捷的调用入口。
主要职责包括:
- 封装 WebSocket 通信,提供 `call_api` 方法。
- 提供高级消息发送功能,如 `send_forwarded_messages`。
- 整合所有细分的 API 调用(消息、群组、好友等)。
""" """
from typing import TYPE_CHECKING, Dict, Any from typing import TYPE_CHECKING, Dict, Any, List, Union
from models.events.base import OneBotEvent
from models.message import MessageSegment
if TYPE_CHECKING: if TYPE_CHECKING:
from .WS import WS from .WS import WS
@@ -13,24 +22,93 @@ from .api import MessageAPI, GroupAPI, FriendAPI, AccountAPI
class Bot(MessageAPI, GroupAPI, FriendAPI, AccountAPI): class Bot(MessageAPI, GroupAPI, FriendAPI, AccountAPI):
""" """
Bot 抽象类,封装 API 调用和常用操作 机器人核心类,封装了所有与 OneBot API 的交互。
继承各个 API Mixin 以提高代码的可维护性
通过 Mixin 模式继承了所有 API 功能,使得结构清晰且易于扩展。
实例由 `WS` 客户端在连接成功后创建,并传递给所有事件处理器和插件。
""" """
def __init__(self, ws_client: "WS"): def __init__(self, ws_client: "WS"):
""" """
初始化 Bot 实例 初始化 Bot 实例
:param ws_client: WebSocket 客户端实例,用于底层通信 Args:
ws_client (WS): WebSocket 客户端实例,负责底层的 API 请求和响应处理。
""" """
self.ws = ws_client self.ws = ws_client
async def call_api(self, action: str, params: Dict[str, Any] = None) -> Any: async def call_api(self, action: str, params: Dict[str, Any] = None) -> Any:
""" """
调用 OneBot API 底层 API 调用方法。
:param action: API 动作名称 所有具体的 API 实现最终都会调用此方法,通过 WebSocket 发送请求。
:param params: API 参数
:return: API 响应结果 Args:
action (str): API 的动作名称,例如 "send_group_msg"
params (Dict[str, Any], optional): API 请求的参数字典。Defaults to None.
Returns:
Any: OneBot API 的响应数据。
""" """
return await self.ws.call_api(action, params) return await self.ws.call_api(action, params)
def build_forward_node(self, user_id: int, nickname: str, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Dict[str, Any]:
"""
构建一个用于合并转发的消息节点 (Node)。
这是一个辅助方法,用于方便地创建符合 OneBot v11 规范的消息节点,
以便在 `send_forwarded_messages` 中使用。
Args:
user_id (int): 发送者的 QQ 号。
nickname (str): 发送者在消息中显示的昵称。
message (Union[str, MessageSegment, List[MessageSegment]]): 该节点的消息内容,
可以是纯文本、单个消息段或消息段列表。
Returns:
Dict[str, Any]: 构造好的消息节点字典。
"""
return {
"type": "node",
"data": {
"uin": user_id,
"name": nickname,
"content": self._process_message(message)
}
}
async def send_forwarded_messages(self, target: Union[int, "OneBotEvent"], nodes: List[Dict[str, Any]]):
"""
发送合并转发消息。
该方法实现了智能判断,可以根据 `target` 的类型自动发送群聊合并转发
或私聊合并转发消息。
Args:
target (Union[int, OneBotEvent]): 发送目标。
- 如果是 `OneBotEvent` 对象,则自动判断是群聊还是私聊。
- 如果是 `int`,则默认为群号,发送群聊合并转发。
nodes (List[Dict[str, Any]]): 消息节点列表。
推荐使用 `build_forward_node` 方法来构建列表中的每个节点。
Raises:
ValueError: 如果事件对象中既没有 `group_id` 也没有 `user_id`。
"""
if isinstance(target, OneBotEvent):
group_id = getattr(target, "group_id", None)
user_id = getattr(target, "user_id", None)
if group_id:
# 直接发送群聊合并转发
await self.send_group_forward_msg(group_id, nodes)
elif user_id:
# 发送私聊合并转发
await self.send_private_forward_msg(user_id, nodes)
else:
raise ValueError("Event has neither group_id nor user_id")
else:
# 默认行为是发送到群聊
group_id = target
await self.send_group_forward_msg(group_id, nodes)

View File

@@ -1,7 +1,17 @@
""" """
命令管理器模块 命令与事件管理器模块
提供装饰器用于注册消息指令、通知处理器和请求处理器,并负责事件的分发 该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心
它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和
请求事件处理器的能力。
主要职责:
- 提供 `@matcher.command()` 装饰器来注册命令。
- 提供 `@matcher.on_notice()` 装饰器来注册通知处理器。
- 提供 `@matcher.on_request()` 装饰器来注册请求处理器。
- 负责解析收到的消息,匹配命令前缀并分发给对应的处理器。
- 统一处理所有类型的事件,并将其分发给所有已注册的处理器。
- 内置一个 `/help` 命令,用于展示所有已加载插件的帮助信息。
""" """
import inspect import inspect
from typing import Any, Callable, Dict, List, Tuple from typing import Any, Callable, Dict, List, Tuple
@@ -14,22 +24,25 @@ comm_prefixes = global_config.bot.get("command", ("/",))
class CommandManager: class CommandManager:
""" """
命令管理器,负责注册和分发指令、通知和请求事件 命令管理器,负责注册和分发所有类型的事件
这是一个单例对象(`matcher`),在整个应用中共享。
""" """
def __init__(self, prefixes: Tuple[str, ...] = ("/",)): def __init__(self, prefixes: Tuple[str, ...]):
""" """
初始化命令管理器 初始化命令管理器
:param prefixes: 命令前缀元组 Args:
prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。
""" """
self.prefixes = prefixes self.prefixes = prefixes
self.commands: Dict[str, Callable] = {} # 存储消息指令 self.commands: Dict[str, Callable] = {} # 存储消息指令处理器
self.notice_handlers: List[Dict] = [] # 存储通知处理器 self.notice_handlers: List[Dict] = [] # 存储通知事件处理器
self.request_handlers: List[Dict] = [] # 存储请求处理器 self.request_handlers: List[Dict] = [] # 存储请求事件处理器
self.plugins: Dict[str, Dict[str, Any]] = {} # 存储插件元数据 self.plugins: Dict[str, Dict[str, Any]] = {} # 存储已加载插件元数据
# --- 内置 help 指令 --- # --- 注册内置 help 指令 ---
self.commands["help"] = self._help_command self.commands["help"] = self._help_command
self.plugins["core.help"] = { self.plugins["core.help"] = {
"name": "帮助", "name": "帮助",
@@ -39,10 +52,13 @@ class CommandManager:
async def _help_command(self, bot, event): async def _help_command(self, bot, event):
""" """
内置的 /help 指令处理器 内置的 `/help` 命令的实现。
:param bot: Bot 实例 该命令会遍历所有已加载插件的元数据,并生成一段格式化的帮助文本。
:param event: 消息事件对象
Args:
bot: Bot 实例。
event: 消息事件对象。
""" """
help_text = "--- 可用指令列表 ---\n" help_text = "--- 可用指令列表 ---\n"
@@ -57,58 +73,78 @@ class CommandManager:
await bot.send(event, help_text.strip()) await bot.send(event, help_text.strip())
# --- 1. 消息指令装饰器 --- def command(self, name: str) -> Callable:
def command(self, name: str):
""" """
装饰器:注册消息指令 装饰器:用于注册一个消息指令处理器。
:param name: 指令名称(不含前缀) Example:
:return: 装饰器函数 @matcher.command("echo")
async def handle_echo(bot, event, args):
await bot.send(event, " ".join(args))
Args:
name (str): 指令的名称(不包含命令前缀)。
Returns:
Callable: 原函数,使其可以继续被调用。
""" """
def decorator(func): def decorator(func: Callable) -> Callable:
self.commands[name] = func self.commands[name] = func
return func return func
return decorator return decorator
# --- 2. 通知事件装饰器 --- def on_notice(self, notice_type: str = None) -> Callable:
def on_notice(self, notice_type: str = None):
""" """
装饰器:注册通知处理器 装饰器:用于注册一个通知事件处理器
:param notice_type: 通知类型,如果为 None 则处理所有通知 如果 `notice_type` 未指定,则该处理器会接收所有类型的通知事件。
:return: 装饰器函数
Args:
notice_type (str, optional): 要处理的通知类型 (e.g., "group_increase")。
Defaults to None.
Returns:
Callable: 原函数。
""" """
def decorator(func): def decorator(func: Callable) -> Callable:
self.notice_handlers.append({"type": notice_type, "func": func}) self.notice_handlers.append({"type": notice_type, "func": func})
return func return func
return decorator return decorator
# --- 3. 请求事件装饰器 --- def on_request(self, request_type: str = None) -> Callable:
def on_request(self, request_type: str = None):
""" """
装饰器:注册请求处理器 装饰器:用于注册一个请求事件处理器
:param request_type: 请求类型,如果为 None 则处理所有请求 如果 `request_type` 未指定,则该处理器会接收所有类型的请求事件。
:return: 装饰器函数
Args:
request_type (str, optional): 要处理的请求类型 (e.g., "friend", "group")。
Defaults to None.
Returns:
Callable: 原函数。
""" """
def decorator(func): def decorator(func: Callable) -> Callable:
self.request_handlers.append({"type": request_type, "func": func}) self.request_handlers.append({"type": request_type, "func": func})
return func return func
return decorator return decorator
# --- 统一事件分发入口 ---
async def handle_event(self, bot, event): async def handle_event(self, bot, event):
""" """
统一事件分发入口 统一事件分发入口
:param bot: Bot 实例 由 `WS` 客户端在接收到事件后调用。该方法会根据事件的 `post_type`
:param event: 事件对象 将其分发给对应的具体处理方法。
Args:
bot: Bot 实例。
event: 已解析的事件对象。
""" """
post_type = event.post_type post_type = event.post_type
@@ -119,13 +155,16 @@ class CommandManager:
elif post_type == 'request': elif post_type == 'request':
await self.handle_request(bot, event) await self.handle_request(bot, event)
# --- 消息分发逻辑 ---
async def handle_message(self, bot, event): async def handle_message(self, bot, event):
""" """
解析并分发消息指令 处理消息事件,解析并分发指令
:param bot: Bot 实例 该方法会检查消息是否以已配置的命令前缀开头,如果是,则解析出
:param event: 消息事件对象 指令名称和参数,并调用对应的处理器。
Args:
bot: Bot 实例。
event: 消息事件对象。
""" """
if not event.raw_message: if not event.raw_message:
return return
@@ -155,39 +194,43 @@ class CommandManager:
func = self.commands[cmd_name] func = self.commands[cmd_name]
await self._run_handler(func, bot, event, args) await self._run_handler(func, bot, event, args)
# --- 通知分发逻辑 ---
async def handle_notice(self, bot, event): async def handle_notice(self, bot, event):
""" """
分发通知事件 分发通知事件给所有匹配的处理器。
:param bot: Bot 实例 Args:
:param event: 通知事件对象 bot: Bot 实例。
event: 通知事件对象。
""" """
for handler in self.notice_handlers: for handler in self.notice_handlers:
if handler["type"] is None or handler["type"] == event.notice_type: if handler["type"] is None or handler["type"] == event.notice_type:
await self._run_handler(handler["func"], bot, event) await self._run_handler(handler["func"], bot, event)
# --- 请求分发逻辑 ---
async def handle_request(self, bot, event): async def handle_request(self, bot, event):
""" """
分发请求事件 分发请求事件给所有匹配的处理器。
:param bot: Bot 实例 Args:
:param event: 请求事件对象 bot: Bot 实例。
event: 请求事件对象。
""" """
for handler in self.request_handlers: for handler in self.request_handlers:
if handler["type"] is None or handler["type"] == event.request_type: if handler["type"] is None or handler["type"] == event.request_type:
await self._run_handler(handler["func"], bot, event) await self._run_handler(handler["func"], bot, event)
# --- 通用执行器:自动注入参数 --- async def _run_handler(self, func: Callable, bot, event, args: List[str] = None):
async def _run_handler(self, func, bot, event, args=None):
""" """
根据函数签名自动注入 bot, event 或 args 智能执行事件处理器。
:param func: 目标处理函 该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参
:param bot: Bot 实例 (如 `bot`, `event`, `args`),实现了依赖注入。
:param event: 事件对象
:param args: 指令参数(仅消息指令有效) Args:
func (Callable): 目标处理器函数。
bot: Bot 实例。
event: 事件对象。
args (List[str], optional): 指令参数列表(仅对消息事件有效)。
Defaults to None.
""" """
sig = inspect.signature(func) sig = inspect.signature(func)
params = sig.parameters params = sig.parameters
@@ -204,11 +247,14 @@ class CommandManager:
await func(**kwargs) await func(**kwargs)
# 确保前缀是元组格式 # --- 全局单例 ---
# 确保前缀配置是元组格式
if isinstance(comm_prefixes, list): if isinstance(comm_prefixes, list):
comm_prefixes = tuple[Any, ...](comm_prefixes) comm_prefixes = tuple(comm_prefixes)
elif isinstance(comm_prefixes, str): elif isinstance(comm_prefixes, str):
comm_prefixes = (comm_prefixes,) comm_prefixes = (comm_prefixes,)
# 实例化全局管理器 # 实例化全局唯一的命令管理器
matcher = CommandManager(prefixes=comm_prefixes) matcher = CommandManager(prefixes=comm_prefixes)

View File

@@ -64,6 +64,15 @@ class Config:
""" """
return self._data.get("features", {}) return self._data.get("features", {})
@property
def redis(self) -> dict:
"""
获取 Redis 配置
:return: 配置字典
"""
return self._data.get("redis", {})
# 实例化全局配置对象 # 实例化全局配置对象
global_config = Config() global_config = Config()

60
core/redis_manager.py Normal file
View File

@@ -0,0 +1,60 @@
import redis
from .config_loader import global_config as config
class RedisManager:
"""
Redis 连接管理器
"""
_pool = None
_client = None
@classmethod
def initialize(cls):
"""
初始化 Redis 连接并进行健康检查
"""
if cls._pool is None:
try:
host = config.redis['host']
port = config.redis['port']
db = config.redis['db']
password = config.redis.get('password')
print(f" 正在尝试连接 Redis: {host}:{port}, DB: {db}")
cls._pool = redis.ConnectionPool(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
cls._client = redis.Redis(connection_pool=cls._pool)
if cls._client.ping():
print(" Redis 连接成功!")
else:
print(" Redis 连接失败: PING 命令无响应")
except redis.exceptions.ConnectionError as e:
print(f" Redis 连接失败: {e}")
cls._pool = None
cls._client = None
except Exception as e:
print(f" Redis 初始化时发生未知错误: {e}")
cls._pool = None
cls._client = None
@classmethod
def get_redis(cls):
"""
获取 Redis 连接
:return: Redis 连接实例
"""
if cls._client is None:
# 理论上 initialize 应该在程序启动时被调用,这里作为备用
cls.initialize()
return cls._client
# 在模块加载时直接初始化
RedisManager.initialize()
redis_client = RedisManager.get_redis()

View File

@@ -1,7 +1,15 @@
""" """
WebSocket 核心模块 WebSocket 核心通信模块
负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。 该模块定义了 `WS` 类,负责与 OneBot v11 实现(如 NapCat建立和管理
WebSocket 连接。它是整个机器人框架的底层通信基础。
主要职责包括:
- 建立 WebSocket 连接并处理认证。
- 实现断线自动重连机制。
- 监听并接收来自 OneBot 的事件和 API 响应。
- 分发事件给 `CommandManager` 进行处理。
- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
""" """
import asyncio import asyncio
import json import json
@@ -20,12 +28,14 @@ from .config_loader import global_config
class WS: class WS:
""" """
WebSocket 客户端,负责与 OneBot 实现端建立连接并处理通信 WebSocket 客户端,负责与 OneBot v11 实现进行底层通信
""" """
def __init__(self): def __init__(self):
""" """
初始化 WebSocket 客户端 初始化 WebSocket 客户端
从全局配置中读取 WebSocket URI、访问令牌Token和重连间隔。
""" """
# 读取参数 # 读取参数
cfg = global_config.napcat_ws cfg = global_config.napcat_ws
@@ -39,7 +49,10 @@ class WS:
async def connect(self): async def connect(self):
""" """
主连接循环,负责建立连接和自动重连 启动并管理 WebSocket 连接。
这是一个无限循环,负责建立连接。如果连接断开,它会根据配置的
`reconnect_interval` 时间间隔后自动尝试重新连接。
""" """
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
@@ -67,9 +80,13 @@ class WS:
async def _listen_loop(self, websocket): async def _listen_loop(self, websocket):
""" """
核心监听循环,处理接收到的 WebSocket 消息 核心监听循环,处理所有接收到的 WebSocket 消息
:param websocket: WebSocket 连接对象 此循环会持续从 WebSocket 连接中读取消息,并根据消息内容
判断是 API 响应还是上报的事件,然后分发给相应的处理逻辑。
Args:
websocket: 当前活动的 WebSocket 连接对象。
""" """
async for message in websocket: async for message in websocket:
try: try:
@@ -96,9 +113,16 @@ class WS:
async def on_event(self, raw_data: dict): async def on_event(self, raw_data: dict):
""" """
事件分发层:根据 post_type 调用 matcher 对应的处理器 事件处理和分发层。
:param raw_data: 原始事件数据字典 当接收到一个 OneBot 事件时,此方法负责:
1. 使用 `EventFactory` 将原始 JSON 数据解析成对应的事件对象。
2. 为事件对象注入 `Bot` 实例,以便在插件中可以调用 API。
3. 打印格式化的事件日志。
4. 将事件对象传递给 `CommandManager` (`matcher`) 进行后续处理。
Args:
raw_data (dict): 从 WebSocket 接收到的原始事件字典。
""" """
try: try:
# 使用工厂创建事件对象 # 使用工厂创建事件对象
@@ -124,11 +148,18 @@ class WS:
async def call_api(self, action: str, params: dict = None): async def call_api(self, action: str, params: dict = None):
""" """
调用 OneBot API OneBot v11 实现端发送一个 API 请求。
:param action: API 动作名称 该方法通过 WebSocket 发送请求,并使用 `echo` 字段来匹配对应的响应。
:param params: API 参数 它创建了一个 `Future` 对象来异步等待响应,并设置了超时机制。
:return: API 响应结果
Args:
action (str): API 的动作名称,例如 "send_group_msg"
params (dict, optional): API 请求的参数字典。 Defaults to None.
Returns:
dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
表示失败的字典。
""" """
if not self.ws: if not self.ws:
return {"status": "failed", "msg": "websocket not initialized"} return {"status": "failed", "msg": "websocket not initialized"}
@@ -152,3 +183,4 @@ class WS:
except asyncio.TimeoutError: except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None) self._pending_requests.pop(echo_id, None)
return {"status": "failed", "retcode": -1, "msg": "api timeout"} return {"status": "failed", "retcode": -1, "msg": "api timeout"}

View File

@@ -12,6 +12,7 @@ from watchdog.events import FileSystemEventHandler
from core import WS from core import WS
from core.plugin_manager import load_all_plugins from core.plugin_manager import load_all_plugins
#from core.redis_manager import redis_client
class PluginReloadHandler(FileSystemEventHandler): class PluginReloadHandler(FileSystemEventHandler):

View File

@@ -1,7 +1,8 @@
""" """
基础事件模型模块 基础事件模型模块
定义了所有 OneBot 11 事件的基类和事件类型枚举。 该模块定义了所有 OneBot v11 事件模型的基类 `OneBotEvent` 和
事件类型常量 `EventType`。所有具体的事件模型都应继承自 `OneBotEvent`。
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING, Optional
@@ -13,46 +14,60 @@ if TYPE_CHECKING:
class EventType: class EventType:
""" """
事件类型枚举 OneBot v11 事件类型常量。
用于标识不同种类的事件上报。
""" """
META = 'meta_event' # 元事件 META = 'meta_event'
REQUEST = 'request' # 请求事件 """元事件 (meta_event): 如心跳、生命周期等。"""
NOTICE = 'notice' # 通知事件 REQUEST = 'request'
MESSAGE = 'message' # 消息事件 """请求事件 (request): 如加好友请求、加群请求等。"""
MESSAGE_SENT = 'message_sent' # 消息发送事件 NOTICE = 'notice'
"""通知事件 (notice): 如群成员增加、文件上传等。"""
MESSAGE = 'message'
"""消息事件 (message): 如私聊消息、群消息等。"""
MESSAGE_SENT = 'message_sent'
"""消息发送事件 (message_sent): 机器人自己发送消息的上报。"""
@dataclass @dataclass(slots=True)
class OneBotEvent(ABC): class OneBotEvent(ABC):
""" """
OneBot 事件基类 OneBot v11 事件的抽象基类
所有具体的事件类型都应该继承自此类
所有具体的事件模型都必须继承此类,并实现 `post_type` 属性。
Attributes:
time (int): 事件发生的时间戳 (秒)。
self_id (int): 收到事件的机器人 QQ 号。
_bot (Optional[Bot]): 内部持有的 Bot 实例引用,用于快捷 API 调用。
""" """
time: int time: int
"""事件发生的时间戳"""
self_id: int self_id: int
"""收到事件的机器人 QQ 号"""
_bot: Optional["Bot"] = field(default=None, init=False) _bot: Optional["Bot"] = field(default=None, init=False)
"""Bot 实例引用,用于快捷调用 API"""
@property @property
@abstractmethod @abstractmethod
def post_type(self) -> str: def post_type(self) -> str:
""" """
上报类型 抽象属性,代表事件的上报类型
子类必须重写此属性,并返回对应的 `EventType` 常量值。
例如: `return EventType.MESSAGE`
""" """
pass pass
@property @property
def bot(self) -> "Bot": def bot(self) -> "Bot":
""" """
获取 Bot 实例 获取与此事件关联的 `Bot` 实例,以便快捷调用 API。
:return: Bot 实例 Returns:
:raises ValueError: 如果 Bot 实例未设置 Bot: 当前事件所对应的 `Bot` 实例
Raises:
ValueError: 如果 `Bot` 实例尚未被设置到事件对象中。
""" """
if self._bot is None: if self._bot is None:
raise ValueError("Bot instance not set for this event") raise ValueError("Bot instance not set for this event")
@@ -61,8 +76,10 @@ class OneBotEvent(ABC):
@bot.setter @bot.setter
def bot(self, value: "Bot"): def bot(self, value: "Bot"):
""" """
设置 Bot 实例 为事件对象设置关联的 `Bot` 实例
:param value: Bot 实例 Args:
value (Bot): 要设置的 `Bot` 实例。
""" """
self._bot = value self._bot = value

View File

@@ -11,7 +11,7 @@ from models.sender import Sender
from .base import OneBotEvent, EventType from .base import OneBotEvent, EventType
@dataclass @dataclass(slots=True)
class Anonymous: class Anonymous:
""" """
匿名信息 匿名信息

View File

@@ -8,7 +8,7 @@ from typing import Optional
from .base import OneBotEvent, EventType from .base import OneBotEvent, EventType
@dataclass @dataclass(slots=True)
class HeartbeatStatus: class HeartbeatStatus:
""" """
心跳状态接口 心跳状态接口

View File

@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
from .base import OneBotEvent, EventType from .base import OneBotEvent, EventType
@dataclass @dataclass(slots=True)
class NoticeEvent(OneBotEvent): class NoticeEvent(OneBotEvent):
""" """
通知事件基类 通知事件基类

View File

@@ -7,7 +7,7 @@ from dataclasses import dataclass
from .base import OneBotEvent, EventType from .base import OneBotEvent, EventType
@dataclass @dataclass(slots=True)
class RequestEvent(OneBotEvent): class RequestEvent(OneBotEvent):
""" """
请求事件基类 请求事件基类

View File

@@ -1,48 +1,56 @@
""" """
消息段模型模块 消息段模型模块
定义了 MessageSegment 类,用于封装 OneBot 11 的消息段。 该模块定义了 `MessageSegment` 类,用于构建和表示 OneBot v11 协议中的消息段。
通过此类可以方便地创建文本、图片、At 等不同类型的消息内容,并支持链式操作。
""" """
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Dict from typing import Any, Dict
@dataclass @dataclass(slots=True)
class MessageSegment: class MessageSegment:
""" """
消息段,对应 OneBot 11 标准中的消息段对象 表示一个 OneBot v11 消息段
Attributes:
type (str): 消息段的类型,例如 'text', 'image', 'at'
data (Dict[str, Any]): 消息段的具体数据,是一个键值对字典。
""" """
type: str type: str
"""消息段类型,如 text, image, at 等"""
data: Dict[str, Any] data: Dict[str, Any]
"""消息段数据"""
@property @property
def text(self) -> str: def text(self) -> str:
""" """
获取文本内容(仅当 type 为 text 时有效) 当消息段类型为 'text' 时,快速获取其文本内容。
:return: 文本内容 Returns:
str: 消息段的文本内容。如果类型不是 'text',则返回空字符串。
""" """
return self.data.get("text", "") if self.type == "text" else "" return self.data.get("text", "") if self.type == "text" else ""
@property @property
def image_url(self) -> str: def image_url(self) -> str:
""" """
获取图片 URL仅当 type 为 image 时有效) 当消息段类型为 'image' 时,快速获取其图片 URL。
:return: 图片 URL Returns:
str: 图片的 URL。如果类型不是 'image' 或数据中不含 'url',则返回空字符串。
""" """
return self.data.get("url", "") if self.type == "image" else "" return self.data.get("url", "") if self.type == "image" else ""
def is_at(self, user_id: int = None) -> bool: def is_at(self, user_id: int = None) -> bool:
""" """
判断是否为 @某人 检查当前消息段是否是一个 'at' (提及) 消息段。
:param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型 Args:
:return: 是否匹配 user_id (int, optional): 如果提供,则进一步检查被提及的 QQ 号是否匹配
Defaults to None.
Returns:
bool: 如果消息段是 'at' 类型且 user_id 匹配 (如果提供),则返回 True。
""" """
if self.type != "at": if self.type != "at":
return False return False
@@ -51,6 +59,9 @@ class MessageSegment:
return str(self.data.get("qq")) == str(user_id) return str(self.data.get("qq")) == str(user_id)
def __repr__(self): def __repr__(self):
"""
返回消息段对象的字符串表示形式,便于调试。
"""
return f"[MS:{self.type}:{self.data}]" return f"[MS:{self.type}:{self.data}]"
# --- 快捷构造方法 --- # --- 快捷构造方法 ---
@@ -58,39 +69,52 @@ class MessageSegment:
@staticmethod @staticmethod
def text(text: str) -> "MessageSegment": def text(text: str) -> "MessageSegment":
""" """
构造文本消息段 创建一个文本消息段
:param text: 文本内容 Args:
:return: MessageSegment 对象 text (str): 文本内容。
Returns:
MessageSegment: 一个类型为 'text' 的消息段对象。
""" """
return MessageSegment(type="text", data={"text": text}) return MessageSegment(type="text", data={"text": text})
@staticmethod @staticmethod
def at(user_id: int | str) -> "MessageSegment": def at(user_id: int | str) -> "MessageSegment":
""" """
构造 @某人 消息段 创建一个 @某人 消息段
:param user_id: 目标 QQ 号,"all" 表示 @全体成员 Args:
:return: MessageSegment 对象 user_id (int | str): 要提及的 QQ 号。若为 "all",则表示 @全体成员。
Returns:
MessageSegment: 一个类型为 'at' 的消息段对象。
""" """
return MessageSegment(type="at", data={"qq": str(user_id)}) return MessageSegment(type="at", data={"qq": str(user_id)})
@staticmethod @staticmethod
def image(file: str) -> "MessageSegment": def image(file: str) -> "MessageSegment":
""" """
构造图片消息段 创建一个图片消息段
:param file: 图片文件名、URL 或 Base64 Args:
:return: MessageSegment 对象 file (str): 图片的路径、URL 或 Base64 编码的字符串。
Returns:
MessageSegment: 一个类型为 'image' 的消息段对象。
""" """
return MessageSegment(type="image", data={"file": file}) return MessageSegment(type="image", data={"file": file})
@staticmethod @staticmethod
def face(id: int) -> "MessageSegment": def face(id: int) -> "MessageSegment":
""" """
构造表情消息段 创建一个 QQ 表情消息段
:param id: 表情 ID Args:
:return: MessageSegment 对象 id (int): QQ 表情的 ID。
Returns:
MessageSegment: 一个类型为 'face' 的消息段对象。
""" """
return MessageSegment(type="face", data={"id": str(id)}) return MessageSegment(type="face", data={"id": str(id)})

View File

@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
from typing import List, Optional from typing import List, Optional
@dataclass @dataclass(slots=True)
class GroupInfo: class GroupInfo:
""" """
群信息 群信息

View File

@@ -7,7 +7,7 @@ from dataclasses import dataclass
from typing import Optional from typing import Optional
@dataclass @dataclass(slots=True)
class Sender: class Sender:
""" """
发送者信息类,对应 OneBot 11 标准中的 sender 字段 发送者信息类,对应 OneBot 11 标准中的 sender 字段

42
plugins/forward_test.py Normal file
View File

@@ -0,0 +1,42 @@
"""
合并转发消息测试插件
"""
from core.command_manager import matcher
from core.bot import Bot
from models import MessageEvent
from models.message import MessageSegment
__plugin_meta__ = {
"name": "furry",
"description": "处理 /furry 指令发送furry图片同时也是bot.build_forward_node演示",
"usage": "/furry - 发送一条furry图",
}
@matcher.command("furry")
async def handle_forward_test(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理 /furry 指令发送furry图片同时也是bot.build_forward_node实例
:param bot: Bot 实例
:param event: 消息事件对象
:param args: 指令参数
"""
# 1. 构建消息节点列表
nodes = [
bot.build_forward_node(user_id=event.self_id, nickname="机器人", message="你要的furry来了"),
bot.build_forward_node(user_id=event.user_id, nickname=event.sender.nickname, message="让我看看"),
bot.build_forward_node(
user_id=event.self_id,
nickname="机器人",
message=[
MessageSegment.text("你要的福瑞图"),
MessageSegment.image("https://api.furry.ist/furry-img/")
]
)
]
try:
# 2. 发送合并转发消息
await bot.send_forwarded_messages(event, nodes)
except Exception as e:
await event.reply(f"发送失败: {e}")

View File

@@ -1,3 +1,8 @@
"""
今日人品插件
提供 /jrcd 和 /bbcd 指令,用于娱乐。
"""
import random import random
from datetime import datetime from datetime import datetime
@@ -19,7 +24,7 @@ JRCDMSG_2 = [
JRCDMSG_3 = [ JRCDMSG_3 = [
"今天的长度是%scm哦豁听说你很勇哦(✧◡✧)", "今天的长度是%scm哦豁听说你很勇哦(✧◡✧)",
"今天的长度是%scm嘶哈嘶哈(((o(*°▽°*)o)))...", "今天的长度是%scm嘶哈嘶哈(((o(*°▽°*)o)))...",
"今天的长度是%scm我靠让哥哥爽一爽吧(((o(*°▽°*)o)))...", "今天的长度是%scm我靠让哥哥爽一-爽吧!(((o(*°▽°*)o)))...",
"今天的长度是%scm单是看到哥哥的长度就....(〃w〃)", "今天的长度是%scm单是看到哥哥的长度就....(〃w〃)",
] ]
@@ -38,6 +43,12 @@ BBCDMSG7 = ["试试刺刀看看谁能赢吧!"]
def get_jrcd(user_id: int) -> int: def get_jrcd(user_id: int) -> int:
"""
根据用户ID和当前日期生成一个伪随机的“长度”值。
:param user_id: 用户QQ号。
:return: 返回一个1到30之间的整数。
"""
current_time = ( current_time = (
datetime.now().year * 100 + datetime.now().month * 100 + datetime.now().day datetime.now().year * 100 + datetime.now().month * 100 + datetime.now().day
) )
@@ -52,11 +63,11 @@ def get_jrcd(user_id: int) -> int:
@matcher.command("jrcd") @matcher.command("jrcd")
async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]): async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
""" """
处理 jrcd 指令,来看看你的长度吧! 处理 jrcd 指令,回复用户的“今日长度”。
:param bot: Bot 实例 :param bot: Bot 实例
:param event: 消息事件对象 :param event: 消息事件对象
:param args: 指令参数列表 :param args: 指令参数列表(未使用)。
""" """
user_id = event.user_id user_id = event.user_id
jrcd = get_jrcd(user_id) jrcd = get_jrcd(user_id)
@@ -73,11 +84,11 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
@matcher.command("bbcd") @matcher.command("bbcd")
async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]): async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]):
""" """
处理 bbcd 指令,和别人比比长度吧! 处理 bbcd 指令,比较两位用户的“长度”。
:param bot: Bot 实例 :param bot: Bot 实例
:param event: 消息事件对象 :param event: 消息事件对象
:param args: 指令参数列表 :param args: 指令参数列表(未使用)。
""" """
message = event.message message = event.message
print(message) print(message)

View File

@@ -12,4 +12,11 @@ from models import MessageEvent, MessageSegment
@matcher.command("thpic") @matcher.command("thpic")
async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理 thpic 指令发送一张随机的东方Project图片。
:param bot: Bot 实例(未使用)。
:param event: 消息事件对象。
:param args: 指令参数列表(未使用)。
"""
await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random")) await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random"))

View File

@@ -11,3 +11,4 @@ urllib3==2.6.2
websockets==15.0.1 websockets==15.0.1
yarg==0.1.10 yarg==0.1.10
watchdog==6.0.0 watchdog==6.0.0
redis==5.0.7