From 01b83803c14ded6b95c00d0b2bab6ee1c8fb3e5f Mon Sep 17 00:00:00 2001 From: K2cr2O1 <2221577113@qq.com> Date: Fri, 2 Jan 2026 17:10:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B3=A8=E9=87=8A=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0redis=E6=94=AF=E6=8C=81=EF=BC=8C=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BA=86=E8=81=8A=E5=A4=A9=E8=AE=B0=E5=BD=95=E6=9E=84?= =?UTF-8?q?=E5=BB=BA=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 109 ++++++++++++++++++- config.toml | 8 +- core/api/account.py | 112 ++++++++++++------- core/api/friend.py | 69 ++++++++---- core/api/group.py | 228 ++++++++++++++++++++++++++------------- core/api/message.py | 136 +++++++++++++++++------ core/bot.py | 102 +++++++++++++++--- core/command_manager.py | 162 ++++++++++++++++++---------- core/config_loader.py | 9 ++ core/redis_manager.py | 60 +++++++++++ core/ws.py | 58 +++++++--- main.py | 1 + models/events/base.py | 59 ++++++---- models/events/message.py | 2 +- models/events/meta.py | 2 +- models/events/notice.py | 2 +- models/events/request.py | 2 +- models/message.py | 74 ++++++++----- models/objects.py | 2 +- models/sender.py | 2 +- plugins/forward_test.py | 42 ++++++++ plugins/jrcd.py | 29 +++-- plugins/thpic.py | 7 ++ requirements.txt | 1 + 24 files changed, 965 insertions(+), 313 deletions(-) create mode 100644 core/redis_manager.py create mode 100644 plugins/forward_test.py diff --git a/README.md b/README.md index f74b384..f8ee6ff 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,42 @@ NEO 框架的设计遵循以下核心理念: * **异步核心**:基于 `asyncio` 和 `websockets` 的高性能异步核心。 * **自动重连**:内置 WebSocket 断线重连机制。 +## ⚡️ 性能优化 + +### Redis 缓存机制 +为了提升响应速度并减少对 OneBot API 的重复调用,框架核心集成了一套基于 Redis 的缓存系统。对于一些不频繁变更的数据(如群信息、好友列表等),首次查询后会将其缓存至 Redis,在缓存有效期内(默认为 1 小时),后续请求将直接从 Redis 读取,极大提升了性能。 + +#### 工作原理 +- **自动缓存**:框架会自动缓存特定 API 的调用结果。 +- **缓存键**:缓存键根据 API 名称和关键参数(如 `group_id`, `user_id`)生成,确保唯一性。 +- **过期时间**:默认缓存 1 小时,之后会自动失效,下次调用时将重新从 OneBot 实现端获取最新数据。 + +#### 受影响的 API +以下核心 API 已默认启用缓存: +- `get_group_info` +- `get_group_member_info` +- `get_friend_list` +- `get_stranger_info` +- `get_login_info` + +#### 如何绕过缓存 +在某些场景下,你可能需要获取实时数据而非缓存数据。为此,所有受缓存影响的 API 方法都增加了一个 `no_cache: bool = False` 的可选参数。 + +当你需要强制从服务器获取最新信息时,只需在调用时传入 `no_cache=True` 即可。 + +**示例:** +```python +# 正常调用,会使用缓存 +group_info = await bot.get_group_info(group_id=12345) + +# 强制获取最新信息,不使用缓存 +latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True) +``` + +### `__slots__` 内存优化 +框架内的所有数据模型(包括事件、消息段、API 返回对象等)均已启用 `__slots__ = True` 优化。这可以显著减少每个对象实例的内存占用,特别是在处理大量事件和数据时,能够有效降低机器人的整体内存消耗。 + + ## 📝 待办事项 (TODO) ### API 封装 @@ -90,10 +126,11 @@ NEO 框架的设计遵循以下核心理念: - [ ] **系统控制** - `set_restart` - [ ] **扩展功能** - - `send_forward_msg`: 发送合并转发消息 + - [x] `send_forward_msg`: 发送合并转发消息 ### 其他改进 - [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。 +- [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。 - [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制。 - [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。 - [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。 @@ -104,7 +141,10 @@ NEO 框架的设计遵循以下核心理念: ``` NEO/ ├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载) -│ └── echo.py # 示例插件:实现 /echo 和 /赞我 指令 +│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令 +│ ├── forward_test.py # 示例插件:演示合并转发消息的构建和发送 +│ ├── jrcd.py # 娱乐插件:提供 /jrcd 和 /bbcd 指令 +│ └── thpic.py # 图片插件:提供 /thpic 指令,发送随机东方图片 ├── core/ # 核心框架代码 │ ├── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI) │ │ ├── __init__.py @@ -117,6 +157,7 @@ NEO/ │ ├── command_manager.py # 命令与事件分发器 │ ├── config_loader.py # 配置加载器 │ ├── plugin_manager.py # 插件加载与管理 +│ ├── redis_manager.py # Redis 连接管理器 │ └── ws.py # WebSocket 客户端核心 ├── models/ # 数据模型 │ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta) @@ -378,6 +419,70 @@ async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]): 5. **性能考虑**:避免在插件中执行耗时同步操作 6. **资源清理**:必要时使用 `try...finally` 确保资源释放 +### 🚀 高性能插件开发规范 (避坑指南) +为了保证整个机器人框架的响应速度和稳定性,所有插件都必须遵循异步、非阻塞的开发原则。任何一个插件中的阻塞操作都可能导致整个机器人卡顿或无响应。 + +以下是必须遵守的核心规范: + +#### 1. 禁止任何形式的同步网络请求 +- **错误示范**: 使用 `requests` 库发起网络请求。 + ```python + import requests + # 错误!这会阻塞整个程序 + response = requests.get("https://api.example.com") + ``` +- **正确做法**: 必须使用异步 HTTP 客户端,如 `aiohttp` 或 `httpx`。 + ```python + import httpx + # 正确,使用 async with 和 await + async with httpx.AsyncClient() as client: + response = await client.get("https://api.example.com") + ``` + +#### 2. 禁止使用 `time.sleep()` +- **错误示范**: 使用 `time.sleep()` 进行等待。 + ```python + import time + # 错误!这会阻塞事件循环 + time.sleep(5) + ``` +- **正确做法**: 必须使用 `asyncio.sleep()`。 + ```python + import asyncio + # 正确,这会将控制权交还给事件循环 + await asyncio.sleep(5) + ``` + +#### 3. 谨慎处理文件 I/O +- 对于读写小型、本地文件,直接使用 `with open(...)` 通常是可接受的。 +- 但对于大型文件或网络文件系统(NFS)上的文件,同步 I/O 可能会导致明显的阻塞。 +- **推荐做法**: 对于可能耗时较长的文件操作,使用 `aiofiles` 库。 + ```python + import aiofiles + async with aiofiles.open('large_file.dat', mode='rb') as f: + contents = await f.read() + ``` + +#### 4. 将 CPU 密集型任务移出事件循环 +- 如果插件需要执行复杂的计算(例如,图像处理、视频转码、数据分析),这些任务会长时间占用 CPU,同样会阻塞事件循环。 +- **正确做法**: 使用 `loop.run_in_executor()` 将这类任务抛到独立的线程池或进程池中执行。 + ```python + import asyncio + + def cpu_bound_task(data): + # 这是一个耗时的同步函数 + # ... 进行大量计算 ... + return "计算结果" + + # 在异步的事件处理器中调用 + loop = asyncio.get_running_loop() + # `None` 表示使用默认的线程池 + result = await loop.run_in_executor(None, cpu_bound_task, "一些数据") + await event.reply(result) + ``` + +遵循以上规范,可以确保您开发的插件不会成为整个机器人应用的性能瓶颈。 + ### 示例:完整插件模板 ```python """ diff --git a/config.toml b/config.toml index ff7cf3e..1c9df7e 100644 --- a/config.toml +++ b/config.toml @@ -4,4 +4,10 @@ token = "&d_VTfksE%}ul?_Y" reconnect_interval = 5 [bot] -command = ["/"] \ No newline at end of file +command = ["/"] + +[redis] +host = "114.66.58.203" +port = 1931 +db = 0 +password = "redis_5dxyJG" diff --git a/core/api/account.py b/core/api/account.py index 8845451..0a5ef0d 100644 --- a/core/api/account.py +++ b/core/api/account.py @@ -1,78 +1,106 @@ """ -账号相关 API 模块 +账号与状态相关 API 模块 + +该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、 +状态设置等相关的 OneBot v11 API 封装。 """ +import json from typing import Dict, Any from .base import BaseAPI from models.objects import LoginInfo, VersionInfo, Status +from core.redis_manager import redis_client as redis_manager class AccountAPI(BaseAPI): """ - 账号相关 API Mixin + `AccountAPI` Mixin 类,提供了所有与机器人账号、状态相关的 API 方法。 """ - async def get_login_info(self) -> LoginInfo: + async def get_login_info(self, no_cache: bool = False) -> LoginInfo: """ - 获取登录号信息 + 获取当前登录的机器人账号信息。 - :return: 登录信息对象 + Args: + no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False. + + Returns: + LoginInfo: 包含登录号 QQ 和昵称的 `LoginInfo` 数据对象。 """ + cache_key = f"neobot:cache:get_login_info:{self.self_id}" + if not no_cache: + cached_data = await redis_manager.get(cache_key) + if cached_data: + return LoginInfo(**json.loads(cached_data)) + res = await self.call_api("get_login_info") + await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return LoginInfo(**res) async def get_version_info(self) -> VersionInfo: """ - 获取版本信息 + 获取 OneBot v11 实现的版本信息。 - :return: 版本信息对象 + Returns: + VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。 """ res = await self.call_api("get_version_info") return VersionInfo(**res) async def get_status(self) -> Status: """ - 获取状态 + 获取 OneBot v11 实现的状态信息。 - :return: 状态对象 + Returns: + Status: 包含 OneBot 状态信息的 `Status` 数据对象。 """ res = await self.call_api("get_status") return Status(**res) async def bot_exit(self) -> Dict[str, Any]: """ - 退出机器人 + 让机器人进程退出(需要实现端支持)。 - :return: API 响应结果 + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("bot_exit") async def set_self_longnick(self, long_nick: str) -> Dict[str, Any]: """ - 设置个性签名 + 设置机器人账号的个性签名。 - :param long_nick: 个性签名内容 - :return: API 响应结果 + Args: + long_nick (str): 要设置的个性签名内容。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_self_longnick", {"longNick": long_nick}) async def set_input_status(self, user_id: int, event_type: int) -> Dict[str, Any]: """ - 设置输入状态 + 设置 "对方正在输入..." 状态提示。 - :param user_id: 用户 ID - :param event_type: 事件类型 - :return: API 响应结果 + Args: + user_id (int): 目标用户的 QQ 号。 + event_type (int): 事件类型。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_input_status", {"user_id": user_id, "event_type": event_type}) async def set_diy_online_status(self, face_id: int, face_type: int, wording: str) -> Dict[str, Any]: """ - 设置自定义在线状态 + 设置自定义的 "在线状态"。 - :param face_id: 状态 ID - :param face_type: 状态类型 - :param wording: 状态描述 - :return: API 响应结果 + Args: + face_id (int): 状态的表情 ID。 + face_type (int): 状态的表情类型。 + wording (str): 状态的描述文本。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_diy_online_status", { "face_id": face_id, @@ -82,43 +110,55 @@ class AccountAPI(BaseAPI): async def set_online_status(self, status_code: int) -> Dict[str, Any]: """ - 设置在线状态 + 设置在线状态(如在线、离开、摸鱼等)。 - :param status_code: 状态码 - :return: API 响应结果 + Args: + status_code (int): 目标在线状态的状态码。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_online_status", {"status_code": status_code}) async def set_qq_profile(self, **kwargs) -> Dict[str, Any]: """ - 设置 QQ 资料 + 设置机器人账号的个人资料。 - :param kwargs: 个人资料相关参数 - :return: API 响应结果 + Args: + **kwargs: 个人资料的相关参数,具体字段请参考 OneBot v11 规范。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_qq_profile", kwargs) async def set_qq_avatar(self, **kwargs) -> Dict[str, Any]: """ - 设置 QQ 头像 + 设置机器人账号的头像。 - :param kwargs: 头像相关参数 - :return: API 响应结果 + Args: + **kwargs: 头像的相关参数,具体字段请参考 OneBot v11 规范。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_qq_avatar", kwargs) async def get_clientkey(self) -> Dict[str, Any]: """ - 获取客户端密钥 + 获取客户端密钥(通常用于 QQ 登录相关操作)。 - :return: API 响应结果 + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("get_clientkey") async def clean_cache(self) -> Dict[str, Any]: """ - 清理缓存 + 清理 OneBot v11 实现端的缓存。 - :return: API 响应结果 + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("clean_cache") + diff --git a/core/api/friend.py b/core/api/friend.py index 8bb4453..76e696b 100644 --- a/core/api/friend.py +++ b/core/api/friend.py @@ -1,53 +1,86 @@ """ -好友相关 API 模块 +好友与陌生人相关 API 模块 + +该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息 +等相关的 OneBot v11 API 封装。 """ +import json from typing import List, Dict, Any from .base import BaseAPI from models.objects import FriendInfo, StrangerInfo +from core.redis_manager import redis_client as redis_manager class FriendAPI(BaseAPI): """ - 好友相关 API Mixin + `FriendAPI` Mixin 类,提供了所有与好友、陌生人操作相关的 API 方法。 """ async def send_like(self, user_id: int, times: int = 1) -> Dict[str, Any]: """ - 发送点赞 + 向指定用户发送 "戳一戳" (点赞)。 - :param user_id: 对方 QQ 号 - :param times: 点赞次数 - :return: API 响应结果 + Args: + user_id (int): 目标用户的 QQ 号。 + times (int, optional): 点赞次数,建议不超过 10 次。Defaults to 1. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("send_like", {"user_id": user_id, "times": times}) async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> StrangerInfo: """ - 获取陌生人信息 + 获取陌生人的信息。 - :param user_id: QQ 号 - :param no_cache: 是否不使用缓存 - :return: 陌生人信息对象 + Args: + user_id (int): 目标用户的 QQ 号。 + no_cache (bool, optional): 是否不使用缓存,直接从服务器获取。Defaults to False. + + Returns: + StrangerInfo: 包含陌生人信息的 `StrangerInfo` 数据对象。 """ + cache_key = f"neobot:cache:get_stranger_info:{user_id}" + if not no_cache: + cached_data = await redis_manager.get(cache_key) + if cached_data: + return StrangerInfo(**json.loads(cached_data)) + res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) + await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return StrangerInfo(**res) - async def get_friend_list(self) -> List[FriendInfo]: + async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]: """ - 获取好友列表 + 获取机器人账号的好友列表。 - :return: 好友信息对象列表 + Args: + no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False. + + Returns: + List[FriendInfo]: 包含所有好友信息的 `FriendInfo` 对象列表。 """ + cache_key = f"neobot:cache:get_friend_list:{self.self_id}" + if not no_cache: + cached_data = await redis_manager.get(cache_key) + if cached_data: + return [FriendInfo(**item) for item in json.loads(cached_data)] + res = await self.call_api("get_friend_list") + await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return [FriendInfo(**item) for item in res] async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]: """ - 处理加好友请求 + 处理收到的加好友请求。 - :param flag: 加好友请求的 flag(需从上报的数据中获取) - :param approve: 是否同意请求 - :param remark: 添加后的好友备注(仅在同意时有效) - :return: API 响应结果 + Args: + flag (str): 请求的标识,需要从 `request` 事件中获取。 + approve (bool, optional): 是否同意该好友请求。Defaults to True. + remark (str, optional): 在同意请求时,为该好友设置的备注。Defaults to "". + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark}) + diff --git a/core/api/group.py b/core/api/group.py index a3d2dd6..d4eb5ae 100644 --- a/core/api/group.py +++ b/core/api/group.py @@ -1,47 +1,64 @@ """ 群组相关 API 模块 + +该模块定义了 `GroupAPI` Mixin 类,提供了所有与群组管理、成员操作 +等相关的 OneBot v11 API 封装。 """ from typing import List, Dict, Any, Optional +import json +from core.redis_manager import redis_client as redis_manager from .base import BaseAPI from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo class GroupAPI(BaseAPI): """ - 群组相关 API Mixin + `GroupAPI` Mixin 类,提供了所有与群组操作相关的 API 方法。 """ async def set_group_kick(self, group_id: int, user_id: int, reject_add_request: bool = False) -> Dict[str, Any]: """ - 群组踢人 + 将指定成员踢出群组。 - :param group_id: 群号 - :param user_id: 要踢的 QQ 号 - :param reject_add_request: 拒绝此人的加群请求 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + user_id (int): 要踢出的成员的 QQ 号。 + reject_add_request (bool, optional): 是否拒绝该用户此后的加群请求。Defaults to False. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_kick", {"group_id": group_id, "user_id": user_id, "reject_add_request": reject_add_request}) - async def set_group_ban(self, group_id: int, user_id: int, duration: int = 30 * 60) -> Dict[str, Any]: + async def set_group_ban(self, group_id: int, user_id: int, duration: int = 1800) -> Dict[str, Any]: """ - 群组单人禁言 + 禁言群组中的指定成员。 - :param group_id: 群号 - :param user_id: 要禁言的 QQ 号 - :param duration: 禁言时长(秒),0 表示解除禁言 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + user_id (int): 要禁言的成员的 QQ 号。 + duration (int, optional): 禁言时长,单位为秒。设置为 0 表示解除禁言。 + Defaults to 1800 (30 分钟). + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_ban", {"group_id": group_id, "user_id": user_id, "duration": duration}) - async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 30 * 60, flag: str = None) -> Dict[str, Any]: + async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 1800, flag: str = None) -> Dict[str, Any]: """ - 群组匿名禁言 + 禁言群组中的匿名用户。 - :param group_id: 群号 - :param anonymous: 可选,要禁言的匿名用户对象(群消息事件的 anonymous 字段) - :param duration: 禁言时长(秒) - :param flag: 可选,要禁言的匿名用户的 flag(需从群消息事件的 anonymous 字段中获取) - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + anonymous (Dict[str, Any], optional): 要禁言的匿名用户对象, + 可从群消息事件的 `anonymous` 字段中获取。Defaults to None. + duration (int, optional): 禁言时长,单位为秒。Defaults to 1800. + flag (str, optional): 要禁言的匿名用户的 flag 标识, + 可从群消息事件的 `anonymous` 字段中获取。Defaults to None. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ params = {"group_id": group_id, "duration": duration} if anonymous: @@ -52,139 +69,196 @@ class GroupAPI(BaseAPI): async def set_group_whole_ban(self, group_id: int, enable: bool = True) -> Dict[str, Any]: """ - 群组全员禁言 + 开启或关闭群组全员禁言。 - :param group_id: 群号 - :param enable: 是否开启 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + enable (bool, optional): True 表示开启全员禁言,False 表示关闭。Defaults to True. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_whole_ban", {"group_id": group_id, "enable": enable}) async def set_group_admin(self, group_id: int, user_id: int, enable: bool = True) -> Dict[str, Any]: """ - 群组设置管理员 + 设置或取消群组成员的管理员权限。 - :param group_id: 群号 - :param user_id: 要设置的 QQ 号 - :param enable: True 为设置,False 为取消 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + user_id (int): 目标成员的 QQ 号。 + enable (bool, optional): True 表示设为管理员,False 表示取消管理员。Defaults to True. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_admin", {"group_id": group_id, "user_id": user_id, "enable": enable}) async def set_group_anonymous(self, group_id: int, enable: bool = True) -> Dict[str, Any]: """ - 群组匿名 + 开启或关闭群组的匿名聊天功能。 - :param group_id: 群号 - :param enable: 是否开启 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + enable (bool, optional): True 表示开启匿名,False 表示关闭。Defaults to True. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_anonymous", {"group_id": group_id, "enable": enable}) async def set_group_card(self, group_id: int, user_id: int, card: str = "") -> Dict[str, Any]: """ - 设置群名片(群备注) + 设置群组成员的群名片。 - :param group_id: 群号 - :param user_id: 要设置的 QQ 号 - :param card: 群名片内容,不填或空字符串表示删除群名片 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + user_id (int): 目标成员的 QQ 号。 + card (str, optional): 要设置的群名片内容。 + 传入空字符串 `""` 或 `None` 表示删除该成员的群名片。Defaults to "". + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_card", {"group_id": group_id, "user_id": user_id, "card": card}) async def set_group_name(self, group_id: int, group_name: str) -> Dict[str, Any]: """ - 设置群名 + 设置群组的名称。 - :param group_id: 群号 - :param group_name: 新群名 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + group_name (str): 新的群组名称。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_name", {"group_id": group_id, "group_name": group_name}) async def set_group_leave(self, group_id: int, is_dismiss: bool = False) -> Dict[str, Any]: """ - 退出群组 + 退出或解散一个群组。 - :param group_id: 群号 - :param is_dismiss: 是否解散,如果登录号是群主,则仅在此项为 True 时能够解散 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + is_dismiss (bool, optional): 是否解散群组。 + 仅当机器人是群主时,此项设为 True 才能解散群。Defaults to False. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_leave", {"group_id": group_id, "is_dismiss": is_dismiss}) async def set_group_special_title(self, group_id: int, user_id: int, special_title: str = "", duration: int = -1) -> Dict[str, Any]: """ - 设置群组专属头衔 + 为群组成员设置专属头衔。 - :param group_id: 群号 - :param user_id: 要设置的 QQ 号 - :param special_title: 专属头衔,不填或空字符串表示删除 - :param duration: 有效期(秒),-1 表示永久 - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + user_id (int): 目标成员的 QQ 号。 + special_title (str, optional): 专属头衔内容。 + 传入空字符串 `""` 或 `None` 表示删除头衔。Defaults to "". + duration (int, optional): 头衔有效期,单位为秒。-1 表示永久。Defaults to -1. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_special_title", {"group_id": group_id, "user_id": user_id, "special_title": special_title, "duration": duration}) async def get_group_info(self, group_id: int, no_cache: bool = False) -> GroupInfo: """ - 获取群信息 + 获取群组的详细信息。 - :param group_id: 群号 - :param no_cache: 是否不使用缓存 - :return: 群信息对象 + Args: + group_id (int): 目标群组的群号。 + no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False. + + Returns: + GroupInfo: 包含群组信息的 `GroupInfo` 数据对象。 """ - res = await self.call_api("get_group_info", {"group_id": group_id, "no_cache": no_cache}) + cache_key = f"neobot:cache:get_group_info:{group_id}" + if not no_cache: + cached_data = await redis_manager.get(cache_key) + if cached_data: + return GroupInfo(**json.loads(cached_data)) + + res = await self.call_api("get_group_info", {"group_id": group_id}) + await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return GroupInfo(**res) async def get_group_list(self) -> List[GroupInfo]: """ - 获取群列表 + 获取机器人加入的所有群组的列表。 - :return: 群信息对象列表 + Returns: + List[GroupInfo]: 包含所有群组信息的 `GroupInfo` 对象列表。 """ res = await self.call_api("get_group_list") return [GroupInfo(**item) for item in res] async def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> GroupMemberInfo: """ - 获取群成员信息 + 获取指定群组成员的详细信息。 - :param group_id: 群号 - :param user_id: QQ 号 - :param no_cache: 是否不使用缓存 - :return: 群成员信息对象 + Args: + group_id (int): 目标群组的群号。 + user_id (int): 目标成员的 QQ 号。 + no_cache (bool, optional): 是否不使用缓存。Defaults to False. + + Returns: + GroupMemberInfo: 包含群成员信息的 `GroupMemberInfo` 数据对象。 """ - res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id, "no_cache": no_cache}) + cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}" + if not no_cache: + cached_data = await redis_manager.get(cache_key) + if cached_data: + return GroupMemberInfo(**json.loads(cached_data)) + + res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id}) + await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时 return GroupMemberInfo(**res) async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]: """ - 获取群成员列表 + 获取一个群组的所有成员列表。 - :param group_id: 群号 - :return: 群成员信息对象列表 + Args: + group_id (int): 目标群组的群号。 + + Returns: + List[GroupMemberInfo]: 包含所有群成员信息的 `GroupMemberInfo` 对象列表。 """ res = await self.call_api("get_group_member_list", {"group_id": group_id}) return [GroupMemberInfo(**item) for item in res] async def get_group_honor_info(self, group_id: int, type: str) -> GroupHonorInfo: """ - 获取群荣誉信息 + 获取群组的荣誉信息(如龙王、群聊之火等)。 - :param group_id: 群号 - :param type: 要获取的群荣誉类型,可传入 talkative, performer, legend, strong_newbie, emotion 等 - :return: 群荣誉信息对象 + Args: + group_id (int): 目标群组的群号。 + type (str): 要获取的荣誉类型。 + 可选值: "talkative", "performer", "legend", "strong_newbie", "emotion" 等。 + + Returns: + GroupHonorInfo: 包含群荣誉信息的 `GroupHonorInfo` 数据对象。 """ res = await self.call_api("get_group_honor_info", {"group_id": group_id, "type": type}) return GroupHonorInfo(**res) async def set_group_add_request(self, flag: str, sub_type: str, approve: bool = True, reason: str = "") -> Dict[str, Any]: """ - 处理加群请求/邀请 + 处理加群请求或邀请。 - :param flag: 加群请求的 flag(需从上报的数据中获取) - :param sub_type: add 或 invite,请求类型(需要与上报消息中的 sub_type 字段相符) - :param approve: 是否同意请求/邀请 - :param reason: 拒绝理由(仅在拒绝时有效) - :return: API 响应结果 + Args: + flag (str): 请求的标识,需要从 `request` 事件中获取。 + sub_type (str): 请求的子类型,`add` 或 `invite`, + 需要与 `request` 事件中的 `sub_type` 字段相符。 + approve (bool, optional): 是否同意请求或邀请。Defaults to True. + reason (str, optional): 拒绝加群的理由(仅在 `approve` 为 False 时有效)。Defaults to "". + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason}) + diff --git a/core/api/message.py b/core/api/message.py index b712215..25458a4 100644 --- a/core/api/message.py +++ b/core/api/message.py @@ -1,5 +1,8 @@ """ 消息相关 API 模块 + +该模块定义了 `MessageAPI` Mixin 类,提供了所有与消息发送、撤回、 +转发等相关的 OneBot v11 API 封装。 """ from typing import Union, List, Dict, Any, TYPE_CHECKING from .base import BaseAPI @@ -10,17 +13,22 @@ if TYPE_CHECKING: class MessageAPI(BaseAPI): """ - 消息相关 API Mixin + `MessageAPI` Mixin 类,提供了所有与消息操作相关的 API 方法。 """ async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]: """ - 发送群消息 + 发送群消息。 - :param group_id: 群号 - :param message: 消息内容,可以是字符串、MessageSegment 对象或 MessageSegment 列表 - :param auto_escape: 是否自动转义(仅当 message 为字符串时有效) - :return: API 响应结果 + Args: + group_id (int): 目标群组的群号。 + message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。 + 可以是纯文本字符串、单个消息段对象或消息段列表。 + auto_escape (bool, optional): 仅当 `message` 为字符串时有效, + 是否对消息内容进行 CQ 码转义。Defaults to False. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api( "send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape} @@ -28,12 +36,15 @@ class MessageAPI(BaseAPI): async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]: """ - 发送私聊消息 + 发送私聊消息。 - :param user_id: 用户 QQ 号 - :param message: 消息内容,可以是字符串、MessageSegment 对象或 MessageSegment 列表 - :param auto_escape: 是否自动转义(仅当 message 为字符串时有效) - :return: API 响应结果 + Args: + user_id (int): 目标用户的 QQ 号。 + message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。 + auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api( "send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape} @@ -41,12 +52,18 @@ class MessageAPI(BaseAPI): async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]: """ - 智能发送消息,根据事件类型自动选择发送方式 + 智能发送消息。 - :param event: 触发事件对象 - :param message: 消息内容 - :param auto_escape: 是否自动转义 - :return: API 响应结果 + 该方法会根据传入的事件对象 `event` 自动判断是私聊还是群聊, + 并调用相应的发送函数。如果事件是消息事件,则优先使用 `reply` 方法。 + + Args: + event (OneBotEvent): 触发该发送行为的事件对象。 + message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。 + auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ # 如果是消息事件,直接调用 reply if hasattr(event, "reply"): @@ -66,53 +83,98 @@ class MessageAPI(BaseAPI): async def delete_msg(self, message_id: int) -> Dict[str, Any]: """ - 撤回消息 + 撤回一条消息。 - :param message_id: 消息 ID - :return: API 响应结果 + Args: + message_id (int): 要撤回的消息的 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("delete_msg", {"message_id": message_id}) async def get_msg(self, message_id: int) -> Dict[str, Any]: """ - 获取消息 + 获取一条消息的详细信息。 - :param message_id: 消息 ID - :return: API 响应结果 + Args: + message_id (int): 要获取的消息的 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据,包含消息详情。 """ return await self.call_api("get_msg", {"message_id": message_id}) async def get_forward_msg(self, id: str) -> Dict[str, Any]: """ - 获取合并转发消息 + 获取合并转发消息的内容。 - :param id: 合并转发 ID - :return: API 响应结果 + Args: + id (str): 合并转发消息的 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据,包含转发消息的节点列表。 """ return await self.call_api("get_forward_msg", {"id": id}) + async def send_group_forward_msg(self, group_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + 发送群聊合并转发消息。 + + Args: + group_id (int): 目标群组的群号。 + messages (List[Dict[str, Any]]): 消息节点列表。 + 推荐使用 `bot.build_forward_node` 来构建节点。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("send_group_forward_msg", {"group_id": group_id, "messages": messages}) + + async def send_private_forward_msg(self, user_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + 发送私聊合并转发消息。 + + Args: + user_id (int): 目标用户的 QQ 号。 + messages (List[Dict[str, Any]]): 消息节点列表。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("send_private_forward_msg", {"user_id": user_id, "messages": messages}) + async def can_send_image(self) -> Dict[str, Any]: """ - 检查是否可以发送图片 + 检查当前机器人账号是否可以发送图片。 - :return: API 响应结果 + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("can_send_image") async def can_send_record(self) -> Dict[str, Any]: """ - 检查是否可以发送语音 + 检查当前机器人账号是否可以发送语音。 - :return: API 响应结果 + Returns: + Dict[str, Any]: OneBot API 的响应数据。 """ return await self.call_api("can_send_record") def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]: """ - 处理消息内容,将其转换为 API 可接受的格式 + 内部方法:将消息内容处理成 OneBot API 可接受的格式。 - :param message: 原始消息内容 - :return: 处理后的消息内容 + - `str` -> `str` + - `MessageSegment` -> `List[Dict]` + - `List[MessageSegment]` -> `List[Dict]` + + Args: + message: 原始消息内容。 + + Returns: + 处理后的消息内容。 """ if isinstance(message, str): return message @@ -130,12 +192,16 @@ class MessageAPI(BaseAPI): def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]: """ - 将 MessageSegment 对象转换为字典 + 内部方法:将 `MessageSegment` 对象转换为字典。 - :param segment: MessageSegment 对象 - :return: 字典格式的消息段 + Args: + segment (MessageSegment): 消息段对象。 + + Returns: + Dict[str, Any]: 符合 OneBot 规范的消息段字典。 """ return { "type": segment.type, "data": segment.data } + diff --git a/core/bot.py b/core/bot.py index fb41a51..2af4efc 100644 --- a/core/bot.py +++ b/core/bot.py @@ -1,9 +1,18 @@ """ -Bot 抽象模块 +Bot 核心抽象模块 -定义了 Bot 类,封装了 OneBot API 的调用逻辑,提供了便捷的消息发送方法。 +该模块定义了 `Bot` 类,它是与 OneBot v11 API 进行交互的主要接口。 +`Bot` 类通过继承 `api` 目录下的各个 Mixin 类,将不同类别的 API 调用 +整合在一起,提供了一个统一、便捷的调用入口。 + +主要职责包括: +- 封装 WebSocket 通信,提供 `call_api` 方法。 +- 提供高级消息发送功能,如 `send_forwarded_messages`。 +- 整合所有细分的 API 调用(消息、群组、好友等)。 """ -from typing import TYPE_CHECKING, Dict, Any +from typing import TYPE_CHECKING, Dict, Any, List, Union +from models.events.base import OneBotEvent +from models.message import MessageSegment if TYPE_CHECKING: from .WS import WS @@ -13,24 +22,93 @@ from .api import MessageAPI, GroupAPI, FriendAPI, AccountAPI class Bot(MessageAPI, GroupAPI, FriendAPI, AccountAPI): """ - Bot 抽象类,封装 API 调用和常用操作 - 继承各个 API Mixin 以提高代码的可维护性 + 机器人核心类,封装了所有与 OneBot API 的交互。 + + 通过 Mixin 模式继承了所有 API 功能,使得结构清晰且易于扩展。 + 实例由 `WS` 客户端在连接成功后创建,并传递给所有事件处理器和插件。 """ def __init__(self, ws_client: "WS"): """ - 初始化 Bot 实例 + 初始化 Bot 实例。 - :param ws_client: WebSocket 客户端实例,用于底层通信 + Args: + ws_client (WS): WebSocket 客户端实例,负责底层的 API 请求和响应处理。 """ self.ws = ws_client async def call_api(self, action: str, params: Dict[str, Any] = None) -> Any: """ - 调用 OneBot API + 底层 API 调用方法。 - :param action: API 动作名称 - :param params: API 参数 - :return: API 响应结果 + 所有具体的 API 实现最终都会调用此方法,通过 WebSocket 发送请求。 + + Args: + action (str): API 的动作名称,例如 "send_group_msg"。 + params (Dict[str, Any], optional): API 请求的参数字典。Defaults to None. + + Returns: + Any: OneBot API 的响应数据。 """ - return await self.ws.call_api(action, params) \ No newline at end of file + return await self.ws.call_api(action, params) + + def build_forward_node(self, user_id: int, nickname: str, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Dict[str, Any]: + """ + 构建一个用于合并转发的消息节点 (Node)。 + + 这是一个辅助方法,用于方便地创建符合 OneBot v11 规范的消息节点, + 以便在 `send_forwarded_messages` 中使用。 + + Args: + user_id (int): 发送者的 QQ 号。 + nickname (str): 发送者在消息中显示的昵称。 + message (Union[str, MessageSegment, List[MessageSegment]]): 该节点的消息内容, + 可以是纯文本、单个消息段或消息段列表。 + + Returns: + Dict[str, Any]: 构造好的消息节点字典。 + """ + return { + "type": "node", + "data": { + "uin": user_id, + "name": nickname, + "content": self._process_message(message) + } + } + + async def send_forwarded_messages(self, target: Union[int, "OneBotEvent"], nodes: List[Dict[str, Any]]): + """ + 发送合并转发消息。 + + 该方法实现了智能判断,可以根据 `target` 的类型自动发送群聊合并转发 + 或私聊合并转发消息。 + + Args: + target (Union[int, OneBotEvent]): 发送目标。 + - 如果是 `OneBotEvent` 对象,则自动判断是群聊还是私聊。 + - 如果是 `int`,则默认为群号,发送群聊合并转发。 + nodes (List[Dict[str, Any]]): 消息节点列表。 + 推荐使用 `build_forward_node` 方法来构建列表中的每个节点。 + + Raises: + ValueError: 如果事件对象中既没有 `group_id` 也没有 `user_id`。 + """ + if isinstance(target, OneBotEvent): + group_id = getattr(target, "group_id", None) + user_id = getattr(target, "user_id", None) + + if group_id: + # 直接发送群聊合并转发 + await self.send_group_forward_msg(group_id, nodes) + elif user_id: + # 发送私聊合并转发 + await self.send_private_forward_msg(user_id, nodes) + else: + raise ValueError("Event has neither group_id nor user_id") + + else: + # 默认行为是发送到群聊 + group_id = target + await self.send_group_forward_msg(group_id, nodes) + diff --git a/core/command_manager.py b/core/command_manager.py index e2bb131..5f29ff2 100644 --- a/core/command_manager.py +++ b/core/command_manager.py @@ -1,7 +1,17 @@ """ -命令管理器模块 +命令与事件管理器模块 -提供装饰器用于注册消息指令、通知处理器和请求处理器,并负责事件的分发。 +该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心。 +它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和 +请求事件处理器的能力。 + +主要职责: +- 提供 `@matcher.command()` 装饰器来注册命令。 +- 提供 `@matcher.on_notice()` 装饰器来注册通知处理器。 +- 提供 `@matcher.on_request()` 装饰器来注册请求处理器。 +- 负责解析收到的消息,匹配命令前缀并分发给对应的处理器。 +- 统一处理所有类型的事件,并将其分发给所有已注册的处理器。 +- 内置一个 `/help` 命令,用于展示所有已加载插件的帮助信息。 """ import inspect from typing import Any, Callable, Dict, List, Tuple @@ -14,22 +24,25 @@ comm_prefixes = global_config.bot.get("command", ("/",)) class CommandManager: """ - 命令管理器,负责注册和分发指令、通知和请求事件 + 命令管理器,负责注册和分发所有类型的事件。 + + 这是一个单例对象(`matcher`),在整个应用中共享。 """ - def __init__(self, prefixes: Tuple[str, ...] = ("/",)): + def __init__(self, prefixes: Tuple[str, ...]): """ - 初始化命令管理器 + 初始化命令管理器。 - :param prefixes: 命令前缀元组 + Args: + prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。 """ self.prefixes = prefixes - self.commands: Dict[str, Callable] = {} # 存储消息指令 - self.notice_handlers: List[Dict] = [] # 存储通知处理器 - self.request_handlers: List[Dict] = [] # 存储请求处理器 - self.plugins: Dict[str, Dict[str, Any]] = {} # 存储插件元数据 + self.commands: Dict[str, Callable] = {} # 存储消息指令处理器 + self.notice_handlers: List[Dict] = [] # 存储通知事件处理器 + self.request_handlers: List[Dict] = [] # 存储请求事件处理器 + self.plugins: Dict[str, Dict[str, Any]] = {} # 存储已加载插件的元数据 - # --- 内置 help 指令 --- + # --- 注册内置 help 指令 --- self.commands["help"] = self._help_command self.plugins["core.help"] = { "name": "帮助", @@ -39,10 +52,13 @@ class CommandManager: async def _help_command(self, bot, event): """ - 内置的 /help 指令处理器 + 内置的 `/help` 命令的实现。 - :param bot: Bot 实例 - :param event: 消息事件对象 + 该命令会遍历所有已加载插件的元数据,并生成一段格式化的帮助文本。 + + Args: + bot: Bot 实例。 + event: 消息事件对象。 """ help_text = "--- 可用指令列表 ---\n" @@ -57,58 +73,78 @@ class CommandManager: await bot.send(event, help_text.strip()) - # --- 1. 消息指令装饰器 --- - def command(self, name: str): + def command(self, name: str) -> Callable: """ - 装饰器:注册消息指令 + 装饰器:用于注册一个消息指令处理器。 - :param name: 指令名称(不含前缀) - :return: 装饰器函数 + Example: + @matcher.command("echo") + async def handle_echo(bot, event, args): + await bot.send(event, " ".join(args)) + + Args: + name (str): 指令的名称(不包含命令前缀)。 + + Returns: + Callable: 原函数,使其可以继续被调用。 """ - def decorator(func): + def decorator(func: Callable) -> Callable: self.commands[name] = func return func return decorator - # --- 2. 通知事件装饰器 --- - def on_notice(self, notice_type: str = None): + def on_notice(self, notice_type: str = None) -> Callable: """ - 装饰器:注册通知处理器 + 装饰器:用于注册一个通知事件处理器。 - :param notice_type: 通知类型,如果为 None 则处理所有通知 - :return: 装饰器函数 + 如果 `notice_type` 未指定,则该处理器会接收所有类型的通知事件。 + + Args: + notice_type (str, optional): 要处理的通知类型 (e.g., "group_increase")。 + Defaults to None. + + Returns: + Callable: 原函数。 """ - def decorator(func): + def decorator(func: Callable) -> Callable: self.notice_handlers.append({"type": notice_type, "func": func}) return func return decorator - # --- 3. 请求事件装饰器 --- - def on_request(self, request_type: str = None): + def on_request(self, request_type: str = None) -> Callable: """ - 装饰器:注册请求处理器 + 装饰器:用于注册一个请求事件处理器。 - :param request_type: 请求类型,如果为 None 则处理所有请求 - :return: 装饰器函数 + 如果 `request_type` 未指定,则该处理器会接收所有类型的请求事件。 + + Args: + request_type (str, optional): 要处理的请求类型 (e.g., "friend", "group")。 + Defaults to None. + + Returns: + Callable: 原函数。 """ - def decorator(func): + def decorator(func: Callable) -> Callable: self.request_handlers.append({"type": request_type, "func": func}) return func return decorator - # --- 统一事件分发入口 --- async def handle_event(self, bot, event): """ - 统一事件分发入口 + 统一的事件分发入口。 - :param bot: Bot 实例 - :param event: 事件对象 + 由 `WS` 客户端在接收到事件后调用。该方法会根据事件的 `post_type` + 将其分发给对应的具体处理方法。 + + Args: + bot: Bot 实例。 + event: 已解析的事件对象。 """ post_type = event.post_type @@ -119,13 +155,16 @@ class CommandManager: elif post_type == 'request': await self.handle_request(bot, event) - # --- 消息分发逻辑 --- async def handle_message(self, bot, event): """ - 解析并分发消息指令 + 处理消息事件,解析并分发指令。 - :param bot: Bot 实例 - :param event: 消息事件对象 + 该方法会检查消息是否以已配置的命令前缀开头,如果是,则解析出 + 指令名称和参数,并调用对应的处理器。 + + Args: + bot: Bot 实例。 + event: 消息事件对象。 """ if not event.raw_message: return @@ -155,39 +194,43 @@ class CommandManager: func = self.commands[cmd_name] await self._run_handler(func, bot, event, args) - # --- 通知分发逻辑 --- async def handle_notice(self, bot, event): """ - 分发通知事件 + 分发通知事件给所有匹配的处理器。 - :param bot: Bot 实例 - :param event: 通知事件对象 + Args: + bot: Bot 实例。 + event: 通知事件对象。 """ for handler in self.notice_handlers: if handler["type"] is None or handler["type"] == event.notice_type: await self._run_handler(handler["func"], bot, event) - # --- 请求分发逻辑 --- async def handle_request(self, bot, event): """ - 分发请求事件 + 分发请求事件给所有匹配的处理器。 - :param bot: Bot 实例 - :param event: 请求事件对象 + Args: + bot: Bot 实例。 + event: 请求事件对象。 """ for handler in self.request_handlers: if handler["type"] is None or handler["type"] == event.request_type: await self._run_handler(handler["func"], bot, event) - # --- 通用执行器:自动注入参数 --- - async def _run_handler(self, func, bot, event, args=None): + async def _run_handler(self, func: Callable, bot, event, args: List[str] = None): """ - 根据函数签名自动注入 bot, event 或 args + 智能执行事件处理器。 - :param func: 目标处理函数 - :param bot: Bot 实例 - :param event: 事件对象 - :param args: 指令参数(仅消息指令有效) + 该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参数 + (如 `bot`, `event`, `args`),实现了依赖注入。 + + Args: + func (Callable): 目标处理器函数。 + bot: Bot 实例。 + event: 事件对象。 + args (List[str], optional): 指令参数列表(仅对消息事件有效)。 + Defaults to None. """ sig = inspect.signature(func) params = sig.parameters @@ -204,11 +247,14 @@ class CommandManager: await func(**kwargs) -# 确保前缀是元组格式 +# --- 全局单例 --- + +# 确保前缀配置是元组格式 if isinstance(comm_prefixes, list): - comm_prefixes = tuple[Any, ...](comm_prefixes) + comm_prefixes = tuple(comm_prefixes) elif isinstance(comm_prefixes, str): comm_prefixes = (comm_prefixes,) -# 实例化全局管理器 +# 实例化全局唯一的命令管理器 matcher = CommandManager(prefixes=comm_prefixes) + diff --git a/core/config_loader.py b/core/config_loader.py index 0882266..776ddcb 100644 --- a/core/config_loader.py +++ b/core/config_loader.py @@ -64,6 +64,15 @@ class Config: """ return self._data.get("features", {}) + @property + def redis(self) -> dict: + """ + 获取 Redis 配置 + + :return: 配置字典 + """ + return self._data.get("redis", {}) + # 实例化全局配置对象 global_config = Config() diff --git a/core/redis_manager.py b/core/redis_manager.py new file mode 100644 index 0000000..2b232d2 --- /dev/null +++ b/core/redis_manager.py @@ -0,0 +1,60 @@ +import redis +from .config_loader import global_config as config + +class RedisManager: + """ + Redis 连接管理器 + """ + _pool = None + _client = None + + @classmethod + def initialize(cls): + """ + 初始化 Redis 连接并进行健康检查 + """ + if cls._pool is None: + try: + host = config.redis['host'] + port = config.redis['port'] + db = config.redis['db'] + password = config.redis.get('password') + + print(f" 正在尝试连接 Redis: {host}:{port}, DB: {db}") + + cls._pool = redis.ConnectionPool( + host=host, + port=port, + db=db, + password=password, + decode_responses=True + ) + cls._client = redis.Redis(connection_pool=cls._pool) + if cls._client.ping(): + print(" Redis 连接成功!") + else: + print(" Redis 连接失败: PING 命令无响应") + except redis.exceptions.ConnectionError as e: + print(f" Redis 连接失败: {e}") + cls._pool = None + cls._client = None + except Exception as e: + print(f" Redis 初始化时发生未知错误: {e}") + cls._pool = None + cls._client = None + + @classmethod + def get_redis(cls): + """ + 获取 Redis 连接 + + :return: Redis 连接实例 + """ + if cls._client is None: + # 理论上 initialize 应该在程序启动时被调用,这里作为备用 + cls.initialize() + return cls._client + +# 在模块加载时直接初始化 +RedisManager.initialize() +redis_client = RedisManager.get_redis() diff --git a/core/ws.py b/core/ws.py index 7a6206d..e35317f 100644 --- a/core/ws.py +++ b/core/ws.py @@ -1,7 +1,15 @@ """ -WebSocket 核心模块 +WebSocket 核心通信模块 -负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。 +该模块定义了 `WS` 类,负责与 OneBot v11 实现(如 NapCat)建立和管理 +WebSocket 连接。它是整个机器人框架的底层通信基础。 + +主要职责包括: +- 建立 WebSocket 连接并处理认证。 +- 实现断线自动重连机制。 +- 监听并接收来自 OneBot 的事件和 API 响应。 +- 分发事件给 `CommandManager` 进行处理。 +- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。 """ import asyncio import json @@ -20,12 +28,14 @@ from .config_loader import global_config class WS: """ - WebSocket 客户端类,负责与 OneBot 实现端建立连接并处理通信 + WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。 """ def __init__(self): """ - 初始化 WebSocket 客户端 + 初始化 WebSocket 客户端。 + + 从全局配置中读取 WebSocket URI、访问令牌(Token)和重连间隔。 """ # 读取参数 cfg = global_config.napcat_ws @@ -39,7 +49,10 @@ class WS: async def connect(self): """ - 主连接循环,负责建立连接和自动重连 + 启动并管理 WebSocket 连接。 + + 这是一个无限循环,负责建立连接。如果连接断开,它会根据配置的 + `reconnect_interval` 时间间隔后自动尝试重新连接。 """ headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} @@ -67,9 +80,13 @@ class WS: async def _listen_loop(self, websocket): """ - 核心监听循环,处理接收到的 WebSocket 消息 + 核心监听循环,处理所有接收到的 WebSocket 消息。 - :param websocket: WebSocket 连接对象 + 此循环会持续从 WebSocket 连接中读取消息,并根据消息内容 + 判断是 API 响应还是上报的事件,然后分发给相应的处理逻辑。 + + Args: + websocket: 当前活动的 WebSocket 连接对象。 """ async for message in websocket: try: @@ -96,9 +113,16 @@ class WS: async def on_event(self, raw_data: dict): """ - 事件分发层:根据 post_type 调用 matcher 对应的处理器 + 事件处理和分发层。 - :param raw_data: 原始事件数据字典 + 当接收到一个 OneBot 事件时,此方法负责: + 1. 使用 `EventFactory` 将原始 JSON 数据解析成对应的事件对象。 + 2. 为事件对象注入 `Bot` 实例,以便在插件中可以调用 API。 + 3. 打印格式化的事件日志。 + 4. 将事件对象传递给 `CommandManager` (`matcher`) 进行后续处理。 + + Args: + raw_data (dict): 从 WebSocket 接收到的原始事件字典。 """ try: # 使用工厂创建事件对象 @@ -124,11 +148,18 @@ class WS: async def call_api(self, action: str, params: dict = None): """ - 调用 OneBot API + 向 OneBot v11 实现端发送一个 API 请求。 - :param action: API 动作名称 - :param params: API 参数 - :return: API 响应结果 + 该方法通过 WebSocket 发送请求,并使用 `echo` 字段来匹配对应的响应。 + 它创建了一个 `Future` 对象来异步等待响应,并设置了超时机制。 + + Args: + action (str): API 的动作名称,例如 "send_group_msg"。 + params (dict, optional): API 请求的参数字典。 Defaults to None. + + Returns: + dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个 + 表示失败的字典。 """ if not self.ws: return {"status": "failed", "msg": "websocket not initialized"} @@ -152,3 +183,4 @@ class WS: except asyncio.TimeoutError: self._pending_requests.pop(echo_id, None) return {"status": "failed", "retcode": -1, "msg": "api timeout"} + diff --git a/main.py b/main.py index 99e9a8d..50b87cc 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,7 @@ from watchdog.events import FileSystemEventHandler from core import WS from core.plugin_manager import load_all_plugins +#from core.redis_manager import redis_client class PluginReloadHandler(FileSystemEventHandler): diff --git a/models/events/base.py b/models/events/base.py index 5bbe8d6..2a83962 100644 --- a/models/events/base.py +++ b/models/events/base.py @@ -1,7 +1,8 @@ """ 基础事件模型模块 -定义了所有 OneBot 11 事件的基类和事件类型枚举。 +该模块定义了所有 OneBot v11 事件模型的基类 `OneBotEvent` 和 +事件类型常量 `EventType`。所有具体的事件模型都应继承自 `OneBotEvent`。 """ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Optional @@ -13,46 +14,60 @@ if TYPE_CHECKING: class EventType: """ - 事件类型枚举 + OneBot v11 事件类型常量。 + + 用于标识不同种类的事件上报。 """ - META = 'meta_event' # 元事件 - REQUEST = 'request' # 请求事件 - NOTICE = 'notice' # 通知事件 - MESSAGE = 'message' # 消息事件 - MESSAGE_SENT = 'message_sent' # 消息发送事件 + META = 'meta_event' + """元事件 (meta_event): 如心跳、生命周期等。""" + REQUEST = 'request' + """请求事件 (request): 如加好友请求、加群请求等。""" + NOTICE = 'notice' + """通知事件 (notice): 如群成员增加、文件上传等。""" + MESSAGE = 'message' + """消息事件 (message): 如私聊消息、群消息等。""" + MESSAGE_SENT = 'message_sent' + """消息发送事件 (message_sent): 机器人自己发送消息的上报。""" -@dataclass +@dataclass(slots=True) class OneBotEvent(ABC): """ - OneBot 事件基类 - 所有具体的事件类型都应该继承自此类 + OneBot v11 事件的抽象基类。 + + 所有具体的事件模型都必须继承此类,并实现 `post_type` 属性。 + + Attributes: + time (int): 事件发生的时间戳 (秒)。 + self_id (int): 收到事件的机器人 QQ 号。 + _bot (Optional[Bot]): 内部持有的 Bot 实例引用,用于快捷 API 调用。 """ time: int - """事件发生的时间戳""" - self_id: int - """收到事件的机器人 QQ 号""" - _bot: Optional["Bot"] = field(default=None, init=False) - """Bot 实例引用,用于快捷调用 API""" @property @abstractmethod def post_type(self) -> str: """ - 上报类型 + 抽象属性,代表事件的上报类型。 + + 子类必须重写此属性,并返回对应的 `EventType` 常量值。 + 例如: `return EventType.MESSAGE` """ pass @property def bot(self) -> "Bot": """ - 获取 Bot 实例 + 获取与此事件关联的 `Bot` 实例,以便快捷调用 API。 - :return: Bot 实例 - :raises ValueError: 如果 Bot 实例未设置 + Returns: + Bot: 当前事件所对应的 `Bot` 实例。 + + Raises: + ValueError: 如果 `Bot` 实例尚未被设置到事件对象中。 """ if self._bot is None: raise ValueError("Bot instance not set for this event") @@ -61,8 +76,10 @@ class OneBotEvent(ABC): @bot.setter def bot(self, value: "Bot"): """ - 设置 Bot 实例 + 为事件对象设置关联的 `Bot` 实例。 - :param value: Bot 实例 + Args: + value (Bot): 要设置的 `Bot` 实例。 """ self._bot = value + diff --git a/models/events/message.py b/models/events/message.py index e2e0bf1..3ede724 100644 --- a/models/events/message.py +++ b/models/events/message.py @@ -11,7 +11,7 @@ from models.sender import Sender from .base import OneBotEvent, EventType -@dataclass +@dataclass(slots=True) class Anonymous: """ 匿名信息 diff --git a/models/events/meta.py b/models/events/meta.py index 91b44d8..b2c720f 100644 --- a/models/events/meta.py +++ b/models/events/meta.py @@ -8,7 +8,7 @@ from typing import Optional from .base import OneBotEvent, EventType -@dataclass +@dataclass(slots=True) class HeartbeatStatus: """ 心跳状态接口 diff --git a/models/events/notice.py b/models/events/notice.py index 8dcf56c..82cbbfc 100644 --- a/models/events/notice.py +++ b/models/events/notice.py @@ -7,7 +7,7 @@ from dataclasses import dataclass, field from .base import OneBotEvent, EventType -@dataclass +@dataclass(slots=True) class NoticeEvent(OneBotEvent): """ 通知事件基类 diff --git a/models/events/request.py b/models/events/request.py index 87930b6..6f7d82d 100644 --- a/models/events/request.py +++ b/models/events/request.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from .base import OneBotEvent, EventType -@dataclass +@dataclass(slots=True) class RequestEvent(OneBotEvent): """ 请求事件基类 diff --git a/models/message.py b/models/message.py index 914a149..ee7f701 100644 --- a/models/message.py +++ b/models/message.py @@ -1,48 +1,56 @@ """ 消息段模型模块 -定义了 MessageSegment 类,用于封装 OneBot 11 的消息段。 +该模块定义了 `MessageSegment` 类,用于构建和表示 OneBot v11 协议中的消息段。 +通过此类,可以方便地创建文本、图片、At 等不同类型的消息内容,并支持链式操作。 """ from dataclasses import dataclass from typing import Any, Dict -@dataclass +@dataclass(slots=True) class MessageSegment: """ - 消息段,对应 OneBot 11 标准中的消息段对象 + 表示一个 OneBot v11 消息段。 + + Attributes: + type (str): 消息段的类型,例如 'text', 'image', 'at'。 + data (Dict[str, Any]): 消息段的具体数据,是一个键值对字典。 """ type: str - """消息段类型,如 text, image, at 等""" - data: Dict[str, Any] - """消息段数据""" @property def text(self) -> str: """ - 获取文本内容(仅当 type 为 text 时有效) + 当消息段类型为 'text' 时,快速获取其文本内容。 - :return: 文本内容 + Returns: + str: 消息段的文本内容。如果类型不是 'text',则返回空字符串。 """ return self.data.get("text", "") if self.type == "text" else "" @property def image_url(self) -> str: """ - 获取图片 URL(仅当 type 为 image 时有效) + 当消息段类型为 'image' 时,快速获取其图片 URL。 - :return: 图片 URL + Returns: + str: 图片的 URL。如果类型不是 'image' 或数据中不含 'url',则返回空字符串。 """ return self.data.get("url", "") if self.type == "image" else "" def is_at(self, user_id: int = None) -> bool: """ - 判断是否为 @某人 + 检查当前消息段是否是一个 'at' (提及) 消息段。 - :param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型 - :return: 是否匹配 + Args: + user_id (int, optional): 如果提供,则进一步检查被提及的 QQ 号是否匹配。 + Defaults to None. + + Returns: + bool: 如果消息段是 'at' 类型且 user_id 匹配 (如果提供),则返回 True。 """ if self.type != "at": return False @@ -51,6 +59,9 @@ class MessageSegment: return str(self.data.get("qq")) == str(user_id) def __repr__(self): + """ + 返回消息段对象的字符串表示形式,便于调试。 + """ return f"[MS:{self.type}:{self.data}]" # --- 快捷构造方法 --- @@ -58,39 +69,52 @@ class MessageSegment: @staticmethod def text(text: str) -> "MessageSegment": """ - 构造文本消息段 + 创建一个文本消息段。 - :param text: 文本内容 - :return: MessageSegment 对象 + Args: + text (str): 文本内容。 + + Returns: + MessageSegment: 一个类型为 'text' 的消息段对象。 """ return MessageSegment(type="text", data={"text": text}) @staticmethod def at(user_id: int | str) -> "MessageSegment": """ - 构造 @某人 消息段 + 创建一个 @某人 的消息段。 - :param user_id: 目标 QQ 号,"all" 表示 @全体成员 - :return: MessageSegment 对象 + Args: + user_id (int | str): 要提及的 QQ 号。若为 "all",则表示 @全体成员。 + + Returns: + MessageSegment: 一个类型为 'at' 的消息段对象。 """ return MessageSegment(type="at", data={"qq": str(user_id)}) @staticmethod def image(file: str) -> "MessageSegment": """ - 构造图片消息段 + 创建一个图片消息段。 - :param file: 图片文件名、URL 或 Base64 - :return: MessageSegment 对象 + Args: + file (str): 图片的路径、URL 或 Base64 编码的字符串。 + + Returns: + MessageSegment: 一个类型为 'image' 的消息段对象。 """ return MessageSegment(type="image", data={"file": file}) @staticmethod def face(id: int) -> "MessageSegment": """ - 构造表情消息段 + 创建一个 QQ 表情消息段。 - :param id: 表情 ID - :return: MessageSegment 对象 + Args: + id (int): QQ 表情的 ID。 + + Returns: + MessageSegment: 一个类型为 'face' 的消息段对象。 """ return MessageSegment(type="face", data={"id": str(id)}) + diff --git a/models/objects.py b/models/objects.py index 49d1769..2f20c51 100644 --- a/models/objects.py +++ b/models/objects.py @@ -7,7 +7,7 @@ from dataclasses import dataclass, field from typing import List, Optional -@dataclass +@dataclass(slots=True) class GroupInfo: """ 群信息 diff --git a/models/sender.py b/models/sender.py index 3d63821..df81dfb 100644 --- a/models/sender.py +++ b/models/sender.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from typing import Optional -@dataclass +@dataclass(slots=True) class Sender: """ 发送者信息类,对应 OneBot 11 标准中的 sender 字段 diff --git a/plugins/forward_test.py b/plugins/forward_test.py new file mode 100644 index 0000000..429037f --- /dev/null +++ b/plugins/forward_test.py @@ -0,0 +1,42 @@ +""" +合并转发消息测试插件 +""" +from core.command_manager import matcher +from core.bot import Bot +from models import MessageEvent +from models.message import MessageSegment + +__plugin_meta__ = { + "name": "furry", + "description": "处理 /furry 指令,发送furry图片,同时也是bot.build_forward_node演示", + "usage": "/furry - 发送一条furry图", +} + +@matcher.command("furry") +async def handle_forward_test(bot: Bot, event: MessageEvent, args: list[str]): + """ + 处理 /furry 指令,发送furry图片,同时也是bot.build_forward_node实例 + + :param bot: Bot 实例 + :param event: 消息事件对象 + :param args: 指令参数 + """ + # 1. 构建消息节点列表 + nodes = [ + bot.build_forward_node(user_id=event.self_id, nickname="机器人", message="你要的furry来了"), + bot.build_forward_node(user_id=event.user_id, nickname=event.sender.nickname, message="让我看看"), + bot.build_forward_node( + user_id=event.self_id, + nickname="机器人", + message=[ + MessageSegment.text("你要的福瑞图"), + MessageSegment.image("https://api.furry.ist/furry-img/") + ] + ) + ] + + try: + # 2. 发送合并转发消息 + await bot.send_forwarded_messages(event, nodes) + except Exception as e: + await event.reply(f"发送失败: {e}") diff --git a/plugins/jrcd.py b/plugins/jrcd.py index 2528643..4b7e318 100644 --- a/plugins/jrcd.py +++ b/plugins/jrcd.py @@ -1,3 +1,8 @@ +""" +今日人品插件 + +提供 /jrcd 和 /bbcd 指令,用于娱乐。 +""" import random from datetime import datetime @@ -19,7 +24,7 @@ JRCDMSG_2 = [ JRCDMSG_3 = [ "今天的长度是%scm,哦豁?听说你很勇哦?(✧◡✧)", "今天的长度是%scm,嘶哈嘶哈(((o(*°▽°*)o)))...", - "今天的长度是%scm,我靠,让哥哥爽一爽吧!(((o(*°▽°*)o)))...", + "今天的长度是%scm,我靠,让哥哥爽一-爽吧!(((o(*°▽°*)o)))...", "今天的长度是%scm,单是看到哥哥的长度就....(〃w〃)", ] @@ -38,6 +43,12 @@ BBCDMSG7 = ["试试刺刀看看谁能赢吧!"] def get_jrcd(user_id: int) -> int: + """ + 根据用户ID和当前日期生成一个伪随机的“长度”值。 + + :param user_id: 用户QQ号。 + :return: 返回一个1到30之间的整数。 + """ current_time = ( datetime.now().year * 100 + datetime.now().month * 100 + datetime.now().day ) @@ -52,11 +63,11 @@ def get_jrcd(user_id: int) -> int: @matcher.command("jrcd") async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]): """ - 处理 jrcd 指令,来看看你的长度吧! + 处理 jrcd 指令,回复用户的“今日长度”。 - :param bot: Bot 实例 - :param event: 消息事件对象 - :param args: 指令参数列表 + :param bot: Bot 实例。 + :param event: 消息事件对象。 + :param args: 指令参数列表(未使用)。 """ user_id = event.user_id jrcd = get_jrcd(user_id) @@ -73,11 +84,11 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]): @matcher.command("bbcd") async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]): """ - 处理 bbcd 指令,和别人比比长度吧! + 处理 bbcd 指令,比较两位用户的“长度”。 - :param bot: Bot 实例 - :param event: 消息事件对象 - :param args: 指令参数列表 + :param bot: Bot 实例。 + :param event: 消息事件对象。 + :param args: 指令参数列表(未使用)。 """ message = event.message print(message) diff --git a/plugins/thpic.py b/plugins/thpic.py index caa752d..a586533 100644 --- a/plugins/thpic.py +++ b/plugins/thpic.py @@ -12,4 +12,11 @@ from models import MessageEvent, MessageSegment @matcher.command("thpic") async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): + """ + 处理 thpic 指令,发送一张随机的东方Project图片。 + + :param bot: Bot 实例(未使用)。 + :param event: 消息事件对象。 + :param args: 指令参数列表(未使用)。 + """ await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random")) diff --git a/requirements.txt b/requirements.txt index 5fff61d..a0a58de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ urllib3==2.6.2 websockets==15.0.1 yarg==0.1.10 watchdog==6.0.0 +redis==5.0.7