From e23607a7dcaf3424f218d8e9264dfa28d745b162 Mon Sep 17 00:00:00 2001
From: K2cr2O1 <2221577113@qq.com>
Date: Fri, 2 Jan 2026 15:55:20 +0800
Subject: [PATCH 1/4] 123
---
core/__init__.py | 2 +-
core/bot.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/__init__.py b/core/__init__.py
index dc3ae5d..41e8a92 100644
--- a/core/__init__.py
+++ b/core/__init__.py
@@ -1,5 +1,5 @@
from .command_manager import matcher
from .config_loader import global_config
-from .ws import WS
+from .WS import WS
__all__ = ["WS", "matcher", "global_config"]
diff --git a/core/bot.py b/core/bot.py
index e329cf7..fb41a51 100644
--- a/core/bot.py
+++ b/core/bot.py
@@ -6,7 +6,7 @@ Bot 抽象模块
from typing import TYPE_CHECKING, Dict, Any
if TYPE_CHECKING:
- from .ws import WS
+ from .WS import WS
from .api import MessageAPI, GroupAPI, FriendAPI, AccountAPI
From 3163bbf8c1b7f9ef5902b8063d6066961c2f1317 Mon Sep 17 00:00:00 2001
From: K2cr2O1 <2221577113@qq.com>
Date: Fri, 2 Jan 2026 16:05:00 +0800
Subject: [PATCH 2/4] =?UTF-8?q?=E5=B0=91=E5=AF=BC=E5=85=A5=E4=BA=86Message?=
=?UTF-8?q?Segment=E3=80=82=E3=80=82=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
models/__init__.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/models/__init__.py b/models/__init__.py
index 7103dd1..a25a0af 100644
--- a/models/__init__.py
+++ b/models/__init__.py
@@ -1,5 +1,5 @@
from .events.base import OneBotEvent
-from .events.message import MessageEvent, PrivateMessageEvent, GroupMessageEvent
+from .events.message import MessageEvent, PrivateMessageEvent, GroupMessageEvent, MessageSegment
from .events.notice import (
NoticeEvent, FriendAddNoticeEvent, FriendRecallNoticeEvent,
GroupRecallNoticeEvent, GroupIncreaseNoticeEvent,
From 01b83803c14ded6b95c00d0b2bab6ee1c8fb3e5f Mon Sep 17 00:00:00 2001
From: K2cr2O1 <2221577113@qq.com>
Date: Fri, 2 Jan 2026 17:10:42 +0800
Subject: [PATCH 3/4] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B3=A8=E9=87=8A?=
=?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0redis=E6=94=AF=E6=8C=81=EF=BC=8C?=
=?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E8=81=8A=E5=A4=A9=E8=AE=B0=E5=BD=95?=
=?UTF-8?q?=E6=9E=84=E5=BB=BA=E6=94=AF=E6=8C=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
README.md | 109 ++++++++++++++++++-
config.toml | 8 +-
core/api/account.py | 112 ++++++++++++-------
core/api/friend.py | 69 ++++++++----
core/api/group.py | 228 ++++++++++++++++++++++++++-------------
core/api/message.py | 136 +++++++++++++++++------
core/bot.py | 102 +++++++++++++++---
core/command_manager.py | 162 ++++++++++++++++++----------
core/config_loader.py | 9 ++
core/redis_manager.py | 60 +++++++++++
core/ws.py | 58 +++++++---
main.py | 1 +
models/events/base.py | 59 ++++++----
models/events/message.py | 2 +-
models/events/meta.py | 2 +-
models/events/notice.py | 2 +-
models/events/request.py | 2 +-
models/message.py | 74 ++++++++-----
models/objects.py | 2 +-
models/sender.py | 2 +-
plugins/forward_test.py | 42 ++++++++
plugins/jrcd.py | 29 +++--
plugins/thpic.py | 7 ++
requirements.txt | 1 +
24 files changed, 965 insertions(+), 313 deletions(-)
create mode 100644 core/redis_manager.py
create mode 100644 plugins/forward_test.py
diff --git a/README.md b/README.md
index f74b384..f8ee6ff 100644
--- a/README.md
+++ b/README.md
@@ -39,6 +39,42 @@ NEO 框架的设计遵循以下核心理念:
* **异步核心**:基于 `asyncio` 和 `websockets` 的高性能异步核心。
* **自动重连**:内置 WebSocket 断线重连机制。
+## ⚡️ 性能优化
+
+### Redis 缓存机制
+为了提升响应速度并减少对 OneBot API 的重复调用,框架核心集成了一套基于 Redis 的缓存系统。对于一些不频繁变更的数据(如群信息、好友列表等),首次查询后会将其缓存至 Redis,在缓存有效期内(默认为 1 小时),后续请求将直接从 Redis 读取,极大提升了性能。
+
+#### 工作原理
+- **自动缓存**:框架会自动缓存特定 API 的调用结果。
+- **缓存键**:缓存键根据 API 名称和关键参数(如 `group_id`, `user_id`)生成,确保唯一性。
+- **过期时间**:默认缓存 1 小时,之后会自动失效,下次调用时将重新从 OneBot 实现端获取最新数据。
+
+#### 受影响的 API
+以下核心 API 已默认启用缓存:
+- `get_group_info`
+- `get_group_member_info`
+- `get_friend_list`
+- `get_stranger_info`
+- `get_login_info`
+
+#### 如何绕过缓存
+在某些场景下,你可能需要获取实时数据而非缓存数据。为此,所有受缓存影响的 API 方法都增加了一个 `no_cache: bool = False` 的可选参数。
+
+当你需要强制从服务器获取最新信息时,只需在调用时传入 `no_cache=True` 即可。
+
+**示例:**
+```python
+# 正常调用,会使用缓存
+group_info = await bot.get_group_info(group_id=12345)
+
+# 强制获取最新信息,不使用缓存
+latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True)
+```
+
+### `__slots__` 内存优化
+框架内的所有数据模型(包括事件、消息段、API 返回对象等)均已启用 `__slots__ = True` 优化。这可以显著减少每个对象实例的内存占用,特别是在处理大量事件和数据时,能够有效降低机器人的整体内存消耗。
+
+
## 📝 待办事项 (TODO)
### API 封装
@@ -90,10 +126,11 @@ NEO 框架的设计遵循以下核心理念:
- [ ] **系统控制**
- `set_restart`
- [ ] **扩展功能**
- - `send_forward_msg`: 发送合并转发消息
+ - [x] `send_forward_msg`: 发送合并转发消息
### 其他改进
- [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。
+- [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。
- [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制。
- [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。
- [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。
@@ -104,7 +141,10 @@ NEO 框架的设计遵循以下核心理念:
```
NEO/
├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载)
-│ └── echo.py # 示例插件:实现 /echo 和 /赞我 指令
+│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令
+│ ├── forward_test.py # 示例插件:演示合并转发消息的构建和发送
+│ ├── jrcd.py # 娱乐插件:提供 /jrcd 和 /bbcd 指令
+│ └── thpic.py # 图片插件:提供 /thpic 指令,发送随机东方图片
├── core/ # 核心框架代码
│ ├── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI)
│ │ ├── __init__.py
@@ -117,6 +157,7 @@ NEO/
│ ├── command_manager.py # 命令与事件分发器
│ ├── config_loader.py # 配置加载器
│ ├── plugin_manager.py # 插件加载与管理
+│ ├── redis_manager.py # Redis 连接管理器
│ └── ws.py # WebSocket 客户端核心
├── models/ # 数据模型
│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta)
@@ -378,6 +419,70 @@ async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]):
5. **性能考虑**:避免在插件中执行耗时同步操作
6. **资源清理**:必要时使用 `try...finally` 确保资源释放
+### 🚀 高性能插件开发规范 (避坑指南)
+为了保证整个机器人框架的响应速度和稳定性,所有插件都必须遵循异步、非阻塞的开发原则。任何一个插件中的阻塞操作都可能导致整个机器人卡顿或无响应。
+
+以下是必须遵守的核心规范:
+
+#### 1. 禁止任何形式的同步网络请求
+- **错误示范**: 使用 `requests` 库发起网络请求。
+ ```python
+ import requests
+ # 错误!这会阻塞整个程序
+ response = requests.get("https://api.example.com")
+ ```
+- **正确做法**: 必须使用异步 HTTP 客户端,如 `aiohttp` 或 `httpx`。
+ ```python
+ import httpx
+ # 正确,使用 async with 和 await
+ async with httpx.AsyncClient() as client:
+ response = await client.get("https://api.example.com")
+ ```
+
+#### 2. 禁止使用 `time.sleep()`
+- **错误示范**: 使用 `time.sleep()` 进行等待。
+ ```python
+ import time
+ # 错误!这会阻塞事件循环
+ time.sleep(5)
+ ```
+- **正确做法**: 必须使用 `asyncio.sleep()`。
+ ```python
+ import asyncio
+ # 正确,这会将控制权交还给事件循环
+ await asyncio.sleep(5)
+ ```
+
+#### 3. 谨慎处理文件 I/O
+- 对于读写小型、本地文件,直接使用 `with open(...)` 通常是可接受的。
+- 但对于大型文件或网络文件系统(NFS)上的文件,同步 I/O 可能会导致明显的阻塞。
+- **推荐做法**: 对于可能耗时较长的文件操作,使用 `aiofiles` 库。
+ ```python
+ import aiofiles
+ async with aiofiles.open('large_file.dat', mode='rb') as f:
+ contents = await f.read()
+ ```
+
+#### 4. 将 CPU 密集型任务移出事件循环
+- 如果插件需要执行复杂的计算(例如,图像处理、视频转码、数据分析),这些任务会长时间占用 CPU,同样会阻塞事件循环。
+- **正确做法**: 使用 `loop.run_in_executor()` 将这类任务抛到独立的线程池或进程池中执行。
+ ```python
+ import asyncio
+
+ def cpu_bound_task(data):
+ # 这是一个耗时的同步函数
+ # ... 进行大量计算 ...
+ return "计算结果"
+
+ # 在异步的事件处理器中调用
+ loop = asyncio.get_running_loop()
+ # `None` 表示使用默认的线程池
+ result = await loop.run_in_executor(None, cpu_bound_task, "一些数据")
+ await event.reply(result)
+ ```
+
+遵循以上规范,可以确保您开发的插件不会成为整个机器人应用的性能瓶颈。
+
### 示例:完整插件模板
```python
"""
diff --git a/config.toml b/config.toml
index ff7cf3e..1c9df7e 100644
--- a/config.toml
+++ b/config.toml
@@ -4,4 +4,10 @@ token = "&d_VTfksE%}ul?_Y"
reconnect_interval = 5
[bot]
-command = ["/"]
\ No newline at end of file
+command = ["/"]
+
+[redis]
+host = "114.66.58.203"
+port = 1931
+db = 0
+password = "redis_5dxyJG"
diff --git a/core/api/account.py b/core/api/account.py
index 8845451..0a5ef0d 100644
--- a/core/api/account.py
+++ b/core/api/account.py
@@ -1,78 +1,106 @@
"""
-账号相关 API 模块
+账号与状态相关 API 模块
+
+该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、
+状态设置等相关的 OneBot v11 API 封装。
"""
+import json
from typing import Dict, Any
from .base import BaseAPI
from models.objects import LoginInfo, VersionInfo, Status
+from core.redis_manager import redis_client as redis_manager
class AccountAPI(BaseAPI):
"""
- 账号相关 API Mixin
+ `AccountAPI` Mixin 类,提供了所有与机器人账号、状态相关的 API 方法。
"""
- async def get_login_info(self) -> LoginInfo:
+ async def get_login_info(self, no_cache: bool = False) -> LoginInfo:
"""
- 获取登录号信息
+ 获取当前登录的机器人账号信息。
- :return: 登录信息对象
+ Args:
+ no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False.
+
+ Returns:
+ LoginInfo: 包含登录号 QQ 和昵称的 `LoginInfo` 数据对象。
"""
+ cache_key = f"neobot:cache:get_login_info:{self.self_id}"
+ if not no_cache:
+ cached_data = await redis_manager.get(cache_key)
+ if cached_data:
+ return LoginInfo(**json.loads(cached_data))
+
res = await self.call_api("get_login_info")
+ await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return LoginInfo(**res)
async def get_version_info(self) -> VersionInfo:
"""
- 获取版本信息
+ 获取 OneBot v11 实现的版本信息。
- :return: 版本信息对象
+ Returns:
+ VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。
"""
res = await self.call_api("get_version_info")
return VersionInfo(**res)
async def get_status(self) -> Status:
"""
- 获取状态
+ 获取 OneBot v11 实现的状态信息。
- :return: 状态对象
+ Returns:
+ Status: 包含 OneBot 状态信息的 `Status` 数据对象。
"""
res = await self.call_api("get_status")
return Status(**res)
async def bot_exit(self) -> Dict[str, Any]:
"""
- 退出机器人
+ 让机器人进程退出(需要实现端支持)。
- :return: API 响应结果
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("bot_exit")
async def set_self_longnick(self, long_nick: str) -> Dict[str, Any]:
"""
- 设置个性签名
+ 设置机器人账号的个性签名。
- :param long_nick: 个性签名内容
- :return: API 响应结果
+ Args:
+ long_nick (str): 要设置的个性签名内容。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_self_longnick", {"longNick": long_nick})
async def set_input_status(self, user_id: int, event_type: int) -> Dict[str, Any]:
"""
- 设置输入状态
+ 设置 "对方正在输入..." 状态提示。
- :param user_id: 用户 ID
- :param event_type: 事件类型
- :return: API 响应结果
+ Args:
+ user_id (int): 目标用户的 QQ 号。
+ event_type (int): 事件类型。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_input_status", {"user_id": user_id, "event_type": event_type})
async def set_diy_online_status(self, face_id: int, face_type: int, wording: str) -> Dict[str, Any]:
"""
- 设置自定义在线状态
+ 设置自定义的 "在线状态"。
- :param face_id: 状态 ID
- :param face_type: 状态类型
- :param wording: 状态描述
- :return: API 响应结果
+ Args:
+ face_id (int): 状态的表情 ID。
+ face_type (int): 状态的表情类型。
+ wording (str): 状态的描述文本。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_diy_online_status", {
"face_id": face_id,
@@ -82,43 +110,55 @@ class AccountAPI(BaseAPI):
async def set_online_status(self, status_code: int) -> Dict[str, Any]:
"""
- 设置在线状态
+ 设置在线状态(如在线、离开、摸鱼等)。
- :param status_code: 状态码
- :return: API 响应结果
+ Args:
+ status_code (int): 目标在线状态的状态码。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_online_status", {"status_code": status_code})
async def set_qq_profile(self, **kwargs) -> Dict[str, Any]:
"""
- 设置 QQ 资料
+ 设置机器人账号的个人资料。
- :param kwargs: 个人资料相关参数
- :return: API 响应结果
+ Args:
+ **kwargs: 个人资料的相关参数,具体字段请参考 OneBot v11 规范。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_qq_profile", kwargs)
async def set_qq_avatar(self, **kwargs) -> Dict[str, Any]:
"""
- 设置 QQ 头像
+ 设置机器人账号的头像。
- :param kwargs: 头像相关参数
- :return: API 响应结果
+ Args:
+ **kwargs: 头像的相关参数,具体字段请参考 OneBot v11 规范。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_qq_avatar", kwargs)
async def get_clientkey(self) -> Dict[str, Any]:
"""
- 获取客户端密钥
+ 获取客户端密钥(通常用于 QQ 登录相关操作)。
- :return: API 响应结果
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_clientkey")
async def clean_cache(self) -> Dict[str, Any]:
"""
- 清理缓存
+ 清理 OneBot v11 实现端的缓存。
- :return: API 响应结果
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("clean_cache")
+
diff --git a/core/api/friend.py b/core/api/friend.py
index 8bb4453..76e696b 100644
--- a/core/api/friend.py
+++ b/core/api/friend.py
@@ -1,53 +1,86 @@
"""
-好友相关 API 模块
+好友与陌生人相关 API 模块
+
+该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息
+等相关的 OneBot v11 API 封装。
"""
+import json
from typing import List, Dict, Any
from .base import BaseAPI
from models.objects import FriendInfo, StrangerInfo
+from core.redis_manager import redis_client as redis_manager
class FriendAPI(BaseAPI):
"""
- 好友相关 API Mixin
+ `FriendAPI` Mixin 类,提供了所有与好友、陌生人操作相关的 API 方法。
"""
async def send_like(self, user_id: int, times: int = 1) -> Dict[str, Any]:
"""
- 发送点赞
+ 向指定用户发送 "戳一戳" (点赞)。
- :param user_id: 对方 QQ 号
- :param times: 点赞次数
- :return: API 响应结果
+ Args:
+ user_id (int): 目标用户的 QQ 号。
+ times (int, optional): 点赞次数,建议不超过 10 次。Defaults to 1.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("send_like", {"user_id": user_id, "times": times})
async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> StrangerInfo:
"""
- 获取陌生人信息
+ 获取陌生人的信息。
- :param user_id: QQ 号
- :param no_cache: 是否不使用缓存
- :return: 陌生人信息对象
+ Args:
+ user_id (int): 目标用户的 QQ 号。
+ no_cache (bool, optional): 是否不使用缓存,直接从服务器获取。Defaults to False.
+
+ Returns:
+ StrangerInfo: 包含陌生人信息的 `StrangerInfo` 数据对象。
"""
+ cache_key = f"neobot:cache:get_stranger_info:{user_id}"
+ if not no_cache:
+ cached_data = await redis_manager.get(cache_key)
+ if cached_data:
+ return StrangerInfo(**json.loads(cached_data))
+
res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache})
+ await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return StrangerInfo(**res)
- async def get_friend_list(self) -> List[FriendInfo]:
+ async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]:
"""
- 获取好友列表
+ 获取机器人账号的好友列表。
- :return: 好友信息对象列表
+ Args:
+ no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False.
+
+ Returns:
+ List[FriendInfo]: 包含所有好友信息的 `FriendInfo` 对象列表。
"""
+ cache_key = f"neobot:cache:get_friend_list:{self.self_id}"
+ if not no_cache:
+ cached_data = await redis_manager.get(cache_key)
+ if cached_data:
+ return [FriendInfo(**item) for item in json.loads(cached_data)]
+
res = await self.call_api("get_friend_list")
+ await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return [FriendInfo(**item) for item in res]
async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]:
"""
- 处理加好友请求
+ 处理收到的加好友请求。
- :param flag: 加好友请求的 flag(需从上报的数据中获取)
- :param approve: 是否同意请求
- :param remark: 添加后的好友备注(仅在同意时有效)
- :return: API 响应结果
+ Args:
+ flag (str): 请求的标识,需要从 `request` 事件中获取。
+ approve (bool, optional): 是否同意该好友请求。Defaults to True.
+ remark (str, optional): 在同意请求时,为该好友设置的备注。Defaults to "".
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark})
+
diff --git a/core/api/group.py b/core/api/group.py
index a3d2dd6..d4eb5ae 100644
--- a/core/api/group.py
+++ b/core/api/group.py
@@ -1,47 +1,64 @@
"""
群组相关 API 模块
+
+该模块定义了 `GroupAPI` Mixin 类,提供了所有与群组管理、成员操作
+等相关的 OneBot v11 API 封装。
"""
from typing import List, Dict, Any, Optional
+import json
+from core.redis_manager import redis_client as redis_manager
from .base import BaseAPI
from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo
class GroupAPI(BaseAPI):
"""
- 群组相关 API Mixin
+ `GroupAPI` Mixin 类,提供了所有与群组操作相关的 API 方法。
"""
async def set_group_kick(self, group_id: int, user_id: int, reject_add_request: bool = False) -> Dict[str, Any]:
"""
- 群组踢人
+ 将指定成员踢出群组。
- :param group_id: 群号
- :param user_id: 要踢的 QQ 号
- :param reject_add_request: 拒绝此人的加群请求
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ user_id (int): 要踢出的成员的 QQ 号。
+ reject_add_request (bool, optional): 是否拒绝该用户此后的加群请求。Defaults to False.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_kick", {"group_id": group_id, "user_id": user_id, "reject_add_request": reject_add_request})
- async def set_group_ban(self, group_id: int, user_id: int, duration: int = 30 * 60) -> Dict[str, Any]:
+ async def set_group_ban(self, group_id: int, user_id: int, duration: int = 1800) -> Dict[str, Any]:
"""
- 群组单人禁言
+ 禁言群组中的指定成员。
- :param group_id: 群号
- :param user_id: 要禁言的 QQ 号
- :param duration: 禁言时长(秒),0 表示解除禁言
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ user_id (int): 要禁言的成员的 QQ 号。
+ duration (int, optional): 禁言时长,单位为秒。设置为 0 表示解除禁言。
+ Defaults to 1800 (30 分钟).
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_ban", {"group_id": group_id, "user_id": user_id, "duration": duration})
- async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 30 * 60, flag: str = None) -> Dict[str, Any]:
+ async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 1800, flag: str = None) -> Dict[str, Any]:
"""
- 群组匿名禁言
+ 禁言群组中的匿名用户。
- :param group_id: 群号
- :param anonymous: 可选,要禁言的匿名用户对象(群消息事件的 anonymous 字段)
- :param duration: 禁言时长(秒)
- :param flag: 可选,要禁言的匿名用户的 flag(需从群消息事件的 anonymous 字段中获取)
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ anonymous (Dict[str, Any], optional): 要禁言的匿名用户对象,
+ 可从群消息事件的 `anonymous` 字段中获取。Defaults to None.
+ duration (int, optional): 禁言时长,单位为秒。Defaults to 1800.
+ flag (str, optional): 要禁言的匿名用户的 flag 标识,
+ 可从群消息事件的 `anonymous` 字段中获取。Defaults to None.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
params = {"group_id": group_id, "duration": duration}
if anonymous:
@@ -52,139 +69,196 @@ class GroupAPI(BaseAPI):
async def set_group_whole_ban(self, group_id: int, enable: bool = True) -> Dict[str, Any]:
"""
- 群组全员禁言
+ 开启或关闭群组全员禁言。
- :param group_id: 群号
- :param enable: 是否开启
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ enable (bool, optional): True 表示开启全员禁言,False 表示关闭。Defaults to True.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_whole_ban", {"group_id": group_id, "enable": enable})
async def set_group_admin(self, group_id: int, user_id: int, enable: bool = True) -> Dict[str, Any]:
"""
- 群组设置管理员
+ 设置或取消群组成员的管理员权限。
- :param group_id: 群号
- :param user_id: 要设置的 QQ 号
- :param enable: True 为设置,False 为取消
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ user_id (int): 目标成员的 QQ 号。
+ enable (bool, optional): True 表示设为管理员,False 表示取消管理员。Defaults to True.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_admin", {"group_id": group_id, "user_id": user_id, "enable": enable})
async def set_group_anonymous(self, group_id: int, enable: bool = True) -> Dict[str, Any]:
"""
- 群组匿名
+ 开启或关闭群组的匿名聊天功能。
- :param group_id: 群号
- :param enable: 是否开启
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ enable (bool, optional): True 表示开启匿名,False 表示关闭。Defaults to True.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_anonymous", {"group_id": group_id, "enable": enable})
async def set_group_card(self, group_id: int, user_id: int, card: str = "") -> Dict[str, Any]:
"""
- 设置群名片(群备注)
+ 设置群组成员的群名片。
- :param group_id: 群号
- :param user_id: 要设置的 QQ 号
- :param card: 群名片内容,不填或空字符串表示删除群名片
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ user_id (int): 目标成员的 QQ 号。
+ card (str, optional): 要设置的群名片内容。
+ 传入空字符串 `""` 或 `None` 表示删除该成员的群名片。Defaults to "".
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_card", {"group_id": group_id, "user_id": user_id, "card": card})
async def set_group_name(self, group_id: int, group_name: str) -> Dict[str, Any]:
"""
- 设置群名
+ 设置群组的名称。
- :param group_id: 群号
- :param group_name: 新群名
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ group_name (str): 新的群组名称。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_name", {"group_id": group_id, "group_name": group_name})
async def set_group_leave(self, group_id: int, is_dismiss: bool = False) -> Dict[str, Any]:
"""
- 退出群组
+ 退出或解散一个群组。
- :param group_id: 群号
- :param is_dismiss: 是否解散,如果登录号是群主,则仅在此项为 True 时能够解散
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ is_dismiss (bool, optional): 是否解散群组。
+ 仅当机器人是群主时,此项设为 True 才能解散群。Defaults to False.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_leave", {"group_id": group_id, "is_dismiss": is_dismiss})
async def set_group_special_title(self, group_id: int, user_id: int, special_title: str = "", duration: int = -1) -> Dict[str, Any]:
"""
- 设置群组专属头衔
+ 为群组成员设置专属头衔。
- :param group_id: 群号
- :param user_id: 要设置的 QQ 号
- :param special_title: 专属头衔,不填或空字符串表示删除
- :param duration: 有效期(秒),-1 表示永久
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ user_id (int): 目标成员的 QQ 号。
+ special_title (str, optional): 专属头衔内容。
+ 传入空字符串 `""` 或 `None` 表示删除头衔。Defaults to "".
+ duration (int, optional): 头衔有效期,单位为秒。-1 表示永久。Defaults to -1.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_special_title", {"group_id": group_id, "user_id": user_id, "special_title": special_title, "duration": duration})
async def get_group_info(self, group_id: int, no_cache: bool = False) -> GroupInfo:
"""
- 获取群信息
+ 获取群组的详细信息。
- :param group_id: 群号
- :param no_cache: 是否不使用缓存
- :return: 群信息对象
+ Args:
+ group_id (int): 目标群组的群号。
+ no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False.
+
+ Returns:
+ GroupInfo: 包含群组信息的 `GroupInfo` 数据对象。
"""
- res = await self.call_api("get_group_info", {"group_id": group_id, "no_cache": no_cache})
+ cache_key = f"neobot:cache:get_group_info:{group_id}"
+ if not no_cache:
+ cached_data = await redis_manager.get(cache_key)
+ if cached_data:
+ return GroupInfo(**json.loads(cached_data))
+
+ res = await self.call_api("get_group_info", {"group_id": group_id})
+ await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return GroupInfo(**res)
async def get_group_list(self) -> List[GroupInfo]:
"""
- 获取群列表
+ 获取机器人加入的所有群组的列表。
- :return: 群信息对象列表
+ Returns:
+ List[GroupInfo]: 包含所有群组信息的 `GroupInfo` 对象列表。
"""
res = await self.call_api("get_group_list")
return [GroupInfo(**item) for item in res]
async def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> GroupMemberInfo:
"""
- 获取群成员信息
+ 获取指定群组成员的详细信息。
- :param group_id: 群号
- :param user_id: QQ 号
- :param no_cache: 是否不使用缓存
- :return: 群成员信息对象
+ Args:
+ group_id (int): 目标群组的群号。
+ user_id (int): 目标成员的 QQ 号。
+ no_cache (bool, optional): 是否不使用缓存。Defaults to False.
+
+ Returns:
+ GroupMemberInfo: 包含群成员信息的 `GroupMemberInfo` 数据对象。
"""
- res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id, "no_cache": no_cache})
+ cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}"
+ if not no_cache:
+ cached_data = await redis_manager.get(cache_key)
+ if cached_data:
+ return GroupMemberInfo(**json.loads(cached_data))
+
+ res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id})
+ await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
return GroupMemberInfo(**res)
async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]:
"""
- 获取群成员列表
+ 获取一个群组的所有成员列表。
- :param group_id: 群号
- :return: 群成员信息对象列表
+ Args:
+ group_id (int): 目标群组的群号。
+
+ Returns:
+ List[GroupMemberInfo]: 包含所有群成员信息的 `GroupMemberInfo` 对象列表。
"""
res = await self.call_api("get_group_member_list", {"group_id": group_id})
return [GroupMemberInfo(**item) for item in res]
async def get_group_honor_info(self, group_id: int, type: str) -> GroupHonorInfo:
"""
- 获取群荣誉信息
+ 获取群组的荣誉信息(如龙王、群聊之火等)。
- :param group_id: 群号
- :param type: 要获取的群荣誉类型,可传入 talkative, performer, legend, strong_newbie, emotion 等
- :return: 群荣誉信息对象
+ Args:
+ group_id (int): 目标群组的群号。
+ type (str): 要获取的荣誉类型。
+ 可选值: "talkative", "performer", "legend", "strong_newbie", "emotion" 等。
+
+ Returns:
+ GroupHonorInfo: 包含群荣誉信息的 `GroupHonorInfo` 数据对象。
"""
res = await self.call_api("get_group_honor_info", {"group_id": group_id, "type": type})
return GroupHonorInfo(**res)
async def set_group_add_request(self, flag: str, sub_type: str, approve: bool = True, reason: str = "") -> Dict[str, Any]:
"""
- 处理加群请求/邀请
+ 处理加群请求或邀请。
- :param flag: 加群请求的 flag(需从上报的数据中获取)
- :param sub_type: add 或 invite,请求类型(需要与上报消息中的 sub_type 字段相符)
- :param approve: 是否同意请求/邀请
- :param reason: 拒绝理由(仅在拒绝时有效)
- :return: API 响应结果
+ Args:
+ flag (str): 请求的标识,需要从 `request` 事件中获取。
+ sub_type (str): 请求的子类型,`add` 或 `invite`,
+ 需要与 `request` 事件中的 `sub_type` 字段相符。
+ approve (bool, optional): 是否同意请求或邀请。Defaults to True.
+ reason (str, optional): 拒绝加群的理由(仅在 `approve` 为 False 时有效)。Defaults to "".
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason})
+
diff --git a/core/api/message.py b/core/api/message.py
index b712215..25458a4 100644
--- a/core/api/message.py
+++ b/core/api/message.py
@@ -1,5 +1,8 @@
"""
消息相关 API 模块
+
+该模块定义了 `MessageAPI` Mixin 类,提供了所有与消息发送、撤回、
+转发等相关的 OneBot v11 API 封装。
"""
from typing import Union, List, Dict, Any, TYPE_CHECKING
from .base import BaseAPI
@@ -10,17 +13,22 @@ if TYPE_CHECKING:
class MessageAPI(BaseAPI):
"""
- 消息相关 API Mixin
+ `MessageAPI` Mixin 类,提供了所有与消息操作相关的 API 方法。
"""
async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
- 发送群消息
+ 发送群消息。
- :param group_id: 群号
- :param message: 消息内容,可以是字符串、MessageSegment 对象或 MessageSegment 列表
- :param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
- :return: API 响应结果
+ Args:
+ group_id (int): 目标群组的群号。
+ message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
+ 可以是纯文本字符串、单个消息段对象或消息段列表。
+ auto_escape (bool, optional): 仅当 `message` 为字符串时有效,
+ 是否对消息内容进行 CQ 码转义。Defaults to False.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api(
"send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape}
@@ -28,12 +36,15 @@ class MessageAPI(BaseAPI):
async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
- 发送私聊消息
+ 发送私聊消息。
- :param user_id: 用户 QQ 号
- :param message: 消息内容,可以是字符串、MessageSegment 对象或 MessageSegment 列表
- :param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
- :return: API 响应结果
+ Args:
+ user_id (int): 目标用户的 QQ 号。
+ message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
+ auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api(
"send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape}
@@ -41,12 +52,18 @@ class MessageAPI(BaseAPI):
async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
- 智能发送消息,根据事件类型自动选择发送方式
+ 智能发送消息。
- :param event: 触发事件对象
- :param message: 消息内容
- :param auto_escape: 是否自动转义
- :return: API 响应结果
+ 该方法会根据传入的事件对象 `event` 自动判断是私聊还是群聊,
+ 并调用相应的发送函数。如果事件是消息事件,则优先使用 `reply` 方法。
+
+ Args:
+ event (OneBotEvent): 触发该发送行为的事件对象。
+ message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
+ auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False.
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
# 如果是消息事件,直接调用 reply
if hasattr(event, "reply"):
@@ -66,53 +83,98 @@ class MessageAPI(BaseAPI):
async def delete_msg(self, message_id: int) -> Dict[str, Any]:
"""
- 撤回消息
+ 撤回一条消息。
- :param message_id: 消息 ID
- :return: API 响应结果
+ Args:
+ message_id (int): 要撤回的消息的 ID。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("delete_msg", {"message_id": message_id})
async def get_msg(self, message_id: int) -> Dict[str, Any]:
"""
- 获取消息
+ 获取一条消息的详细信息。
- :param message_id: 消息 ID
- :return: API 响应结果
+ Args:
+ message_id (int): 要获取的消息的 ID。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据,包含消息详情。
"""
return await self.call_api("get_msg", {"message_id": message_id})
async def get_forward_msg(self, id: str) -> Dict[str, Any]:
"""
- 获取合并转发消息
+ 获取合并转发消息的内容。
- :param id: 合并转发 ID
- :return: API 响应结果
+ Args:
+ id (str): 合并转发消息的 ID。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据,包含转发消息的节点列表。
"""
return await self.call_api("get_forward_msg", {"id": id})
+ async def send_group_forward_msg(self, group_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """
+ 发送群聊合并转发消息。
+
+ Args:
+ group_id (int): 目标群组的群号。
+ messages (List[Dict[str, Any]]): 消息节点列表。
+ 推荐使用 `bot.build_forward_node` 来构建节点。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
+ """
+ return await self.call_api("send_group_forward_msg", {"group_id": group_id, "messages": messages})
+
+ async def send_private_forward_msg(self, user_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
+ """
+ 发送私聊合并转发消息。
+
+ Args:
+ user_id (int): 目标用户的 QQ 号。
+ messages (List[Dict[str, Any]]): 消息节点列表。
+
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
+ """
+ return await self.call_api("send_private_forward_msg", {"user_id": user_id, "messages": messages})
+
async def can_send_image(self) -> Dict[str, Any]:
"""
- 检查是否可以发送图片
+ 检查当前机器人账号是否可以发送图片。
- :return: API 响应结果
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("can_send_image")
async def can_send_record(self) -> Dict[str, Any]:
"""
- 检查是否可以发送语音
+ 检查当前机器人账号是否可以发送语音。
- :return: API 响应结果
+ Returns:
+ Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("can_send_record")
def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]:
"""
- 处理消息内容,将其转换为 API 可接受的格式
+ 内部方法:将消息内容处理成 OneBot API 可接受的格式。
- :param message: 原始消息内容
- :return: 处理后的消息内容
+ - `str` -> `str`
+ - `MessageSegment` -> `List[Dict]`
+ - `List[MessageSegment]` -> `List[Dict]`
+
+ Args:
+ message: 原始消息内容。
+
+ Returns:
+ 处理后的消息内容。
"""
if isinstance(message, str):
return message
@@ -130,12 +192,16 @@ class MessageAPI(BaseAPI):
def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]:
"""
- 将 MessageSegment 对象转换为字典
+ 内部方法:将 `MessageSegment` 对象转换为字典。
- :param segment: MessageSegment 对象
- :return: 字典格式的消息段
+ Args:
+ segment (MessageSegment): 消息段对象。
+
+ Returns:
+ Dict[str, Any]: 符合 OneBot 规范的消息段字典。
"""
return {
"type": segment.type,
"data": segment.data
}
+
diff --git a/core/bot.py b/core/bot.py
index fb41a51..2af4efc 100644
--- a/core/bot.py
+++ b/core/bot.py
@@ -1,9 +1,18 @@
"""
-Bot 抽象模块
+Bot 核心抽象模块
-定义了 Bot 类,封装了 OneBot API 的调用逻辑,提供了便捷的消息发送方法。
+该模块定义了 `Bot` 类,它是与 OneBot v11 API 进行交互的主要接口。
+`Bot` 类通过继承 `api` 目录下的各个 Mixin 类,将不同类别的 API 调用
+整合在一起,提供了一个统一、便捷的调用入口。
+
+主要职责包括:
+- 封装 WebSocket 通信,提供 `call_api` 方法。
+- 提供高级消息发送功能,如 `send_forwarded_messages`。
+- 整合所有细分的 API 调用(消息、群组、好友等)。
"""
-from typing import TYPE_CHECKING, Dict, Any
+from typing import TYPE_CHECKING, Dict, Any, List, Union
+from models.events.base import OneBotEvent
+from models.message import MessageSegment
if TYPE_CHECKING:
from .WS import WS
@@ -13,24 +22,93 @@ from .api import MessageAPI, GroupAPI, FriendAPI, AccountAPI
class Bot(MessageAPI, GroupAPI, FriendAPI, AccountAPI):
"""
- Bot 抽象类,封装 API 调用和常用操作
- 继承各个 API Mixin 以提高代码的可维护性
+ 机器人核心类,封装了所有与 OneBot API 的交互。
+
+ 通过 Mixin 模式继承了所有 API 功能,使得结构清晰且易于扩展。
+ 实例由 `WS` 客户端在连接成功后创建,并传递给所有事件处理器和插件。
"""
def __init__(self, ws_client: "WS"):
"""
- 初始化 Bot 实例
+ 初始化 Bot 实例。
- :param ws_client: WebSocket 客户端实例,用于底层通信
+ Args:
+ ws_client (WS): WebSocket 客户端实例,负责底层的 API 请求和响应处理。
"""
self.ws = ws_client
async def call_api(self, action: str, params: Dict[str, Any] = None) -> Any:
"""
- 调用 OneBot API
+ 底层 API 调用方法。
- :param action: API 动作名称
- :param params: API 参数
- :return: API 响应结果
+ 所有具体的 API 实现最终都会调用此方法,通过 WebSocket 发送请求。
+
+ Args:
+ action (str): API 的动作名称,例如 "send_group_msg"。
+ params (Dict[str, Any], optional): API 请求的参数字典。Defaults to None.
+
+ Returns:
+ Any: OneBot API 的响应数据。
"""
- return await self.ws.call_api(action, params)
\ No newline at end of file
+ return await self.ws.call_api(action, params)
+
+ def build_forward_node(self, user_id: int, nickname: str, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Dict[str, Any]:
+ """
+ 构建一个用于合并转发的消息节点 (Node)。
+
+ 这是一个辅助方法,用于方便地创建符合 OneBot v11 规范的消息节点,
+ 以便在 `send_forwarded_messages` 中使用。
+
+ Args:
+ user_id (int): 发送者的 QQ 号。
+ nickname (str): 发送者在消息中显示的昵称。
+ message (Union[str, MessageSegment, List[MessageSegment]]): 该节点的消息内容,
+ 可以是纯文本、单个消息段或消息段列表。
+
+ Returns:
+ Dict[str, Any]: 构造好的消息节点字典。
+ """
+ return {
+ "type": "node",
+ "data": {
+ "uin": user_id,
+ "name": nickname,
+ "content": self._process_message(message)
+ }
+ }
+
+ async def send_forwarded_messages(self, target: Union[int, "OneBotEvent"], nodes: List[Dict[str, Any]]):
+ """
+ 发送合并转发消息。
+
+ 该方法实现了智能判断,可以根据 `target` 的类型自动发送群聊合并转发
+ 或私聊合并转发消息。
+
+ Args:
+ target (Union[int, OneBotEvent]): 发送目标。
+ - 如果是 `OneBotEvent` 对象,则自动判断是群聊还是私聊。
+ - 如果是 `int`,则默认为群号,发送群聊合并转发。
+ nodes (List[Dict[str, Any]]): 消息节点列表。
+ 推荐使用 `build_forward_node` 方法来构建列表中的每个节点。
+
+ Raises:
+ ValueError: 如果事件对象中既没有 `group_id` 也没有 `user_id`。
+ """
+ if isinstance(target, OneBotEvent):
+ group_id = getattr(target, "group_id", None)
+ user_id = getattr(target, "user_id", None)
+
+ if group_id:
+ # 直接发送群聊合并转发
+ await self.send_group_forward_msg(group_id, nodes)
+ elif user_id:
+ # 发送私聊合并转发
+ await self.send_private_forward_msg(user_id, nodes)
+ else:
+ raise ValueError("Event has neither group_id nor user_id")
+
+ else:
+ # 默认行为是发送到群聊
+ group_id = target
+ await self.send_group_forward_msg(group_id, nodes)
+
diff --git a/core/command_manager.py b/core/command_manager.py
index e2bb131..5f29ff2 100644
--- a/core/command_manager.py
+++ b/core/command_manager.py
@@ -1,7 +1,17 @@
"""
-命令管理器模块
+命令与事件管理器模块
-提供装饰器用于注册消息指令、通知处理器和请求处理器,并负责事件的分发。
+该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心。
+它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和
+请求事件处理器的能力。
+
+主要职责:
+- 提供 `@matcher.command()` 装饰器来注册命令。
+- 提供 `@matcher.on_notice()` 装饰器来注册通知处理器。
+- 提供 `@matcher.on_request()` 装饰器来注册请求处理器。
+- 负责解析收到的消息,匹配命令前缀并分发给对应的处理器。
+- 统一处理所有类型的事件,并将其分发给所有已注册的处理器。
+- 内置一个 `/help` 命令,用于展示所有已加载插件的帮助信息。
"""
import inspect
from typing import Any, Callable, Dict, List, Tuple
@@ -14,22 +24,25 @@ comm_prefixes = global_config.bot.get("command", ("/",))
class CommandManager:
"""
- 命令管理器,负责注册和分发指令、通知和请求事件
+ 命令管理器,负责注册和分发所有类型的事件。
+
+ 这是一个单例对象(`matcher`),在整个应用中共享。
"""
- def __init__(self, prefixes: Tuple[str, ...] = ("/",)):
+ def __init__(self, prefixes: Tuple[str, ...]):
"""
- 初始化命令管理器
+ 初始化命令管理器。
- :param prefixes: 命令前缀元组
+ Args:
+ prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。
"""
self.prefixes = prefixes
- self.commands: Dict[str, Callable] = {} # 存储消息指令
- self.notice_handlers: List[Dict] = [] # 存储通知处理器
- self.request_handlers: List[Dict] = [] # 存储请求处理器
- self.plugins: Dict[str, Dict[str, Any]] = {} # 存储插件元数据
+ self.commands: Dict[str, Callable] = {} # 存储消息指令处理器
+ self.notice_handlers: List[Dict] = [] # 存储通知事件处理器
+ self.request_handlers: List[Dict] = [] # 存储请求事件处理器
+ self.plugins: Dict[str, Dict[str, Any]] = {} # 存储已加载插件的元数据
- # --- 内置 help 指令 ---
+ # --- 注册内置 help 指令 ---
self.commands["help"] = self._help_command
self.plugins["core.help"] = {
"name": "帮助",
@@ -39,10 +52,13 @@ class CommandManager:
async def _help_command(self, bot, event):
"""
- 内置的 /help 指令处理器
+ 内置的 `/help` 命令的实现。
- :param bot: Bot 实例
- :param event: 消息事件对象
+ 该命令会遍历所有已加载插件的元数据,并生成一段格式化的帮助文本。
+
+ Args:
+ bot: Bot 实例。
+ event: 消息事件对象。
"""
help_text = "--- 可用指令列表 ---\n"
@@ -57,58 +73,78 @@ class CommandManager:
await bot.send(event, help_text.strip())
- # --- 1. 消息指令装饰器 ---
- def command(self, name: str):
+ def command(self, name: str) -> Callable:
"""
- 装饰器:注册消息指令
+ 装饰器:用于注册一个消息指令处理器。
- :param name: 指令名称(不含前缀)
- :return: 装饰器函数
+ Example:
+ @matcher.command("echo")
+ async def handle_echo(bot, event, args):
+ await bot.send(event, " ".join(args))
+
+ Args:
+ name (str): 指令的名称(不包含命令前缀)。
+
+ Returns:
+ Callable: 原函数,使其可以继续被调用。
"""
- def decorator(func):
+ def decorator(func: Callable) -> Callable:
self.commands[name] = func
return func
return decorator
- # --- 2. 通知事件装饰器 ---
- def on_notice(self, notice_type: str = None):
+ def on_notice(self, notice_type: str = None) -> Callable:
"""
- 装饰器:注册通知处理器
+ 装饰器:用于注册一个通知事件处理器。
- :param notice_type: 通知类型,如果为 None 则处理所有通知
- :return: 装饰器函数
+ 如果 `notice_type` 未指定,则该处理器会接收所有类型的通知事件。
+
+ Args:
+ notice_type (str, optional): 要处理的通知类型 (e.g., "group_increase")。
+ Defaults to None.
+
+ Returns:
+ Callable: 原函数。
"""
- def decorator(func):
+ def decorator(func: Callable) -> Callable:
self.notice_handlers.append({"type": notice_type, "func": func})
return func
return decorator
- # --- 3. 请求事件装饰器 ---
- def on_request(self, request_type: str = None):
+ def on_request(self, request_type: str = None) -> Callable:
"""
- 装饰器:注册请求处理器
+ 装饰器:用于注册一个请求事件处理器。
- :param request_type: 请求类型,如果为 None 则处理所有请求
- :return: 装饰器函数
+ 如果 `request_type` 未指定,则该处理器会接收所有类型的请求事件。
+
+ Args:
+ request_type (str, optional): 要处理的请求类型 (e.g., "friend", "group")。
+ Defaults to None.
+
+ Returns:
+ Callable: 原函数。
"""
- def decorator(func):
+ def decorator(func: Callable) -> Callable:
self.request_handlers.append({"type": request_type, "func": func})
return func
return decorator
- # --- 统一事件分发入口 ---
async def handle_event(self, bot, event):
"""
- 统一事件分发入口
+ 统一的事件分发入口。
- :param bot: Bot 实例
- :param event: 事件对象
+ 由 `WS` 客户端在接收到事件后调用。该方法会根据事件的 `post_type`
+ 将其分发给对应的具体处理方法。
+
+ Args:
+ bot: Bot 实例。
+ event: 已解析的事件对象。
"""
post_type = event.post_type
@@ -119,13 +155,16 @@ class CommandManager:
elif post_type == 'request':
await self.handle_request(bot, event)
- # --- 消息分发逻辑 ---
async def handle_message(self, bot, event):
"""
- 解析并分发消息指令
+ 处理消息事件,解析并分发指令。
- :param bot: Bot 实例
- :param event: 消息事件对象
+ 该方法会检查消息是否以已配置的命令前缀开头,如果是,则解析出
+ 指令名称和参数,并调用对应的处理器。
+
+ Args:
+ bot: Bot 实例。
+ event: 消息事件对象。
"""
if not event.raw_message:
return
@@ -155,39 +194,43 @@ class CommandManager:
func = self.commands[cmd_name]
await self._run_handler(func, bot, event, args)
- # --- 通知分发逻辑 ---
async def handle_notice(self, bot, event):
"""
- 分发通知事件
+ 分发通知事件给所有匹配的处理器。
- :param bot: Bot 实例
- :param event: 通知事件对象
+ Args:
+ bot: Bot 实例。
+ event: 通知事件对象。
"""
for handler in self.notice_handlers:
if handler["type"] is None or handler["type"] == event.notice_type:
await self._run_handler(handler["func"], bot, event)
- # --- 请求分发逻辑 ---
async def handle_request(self, bot, event):
"""
- 分发请求事件
+ 分发请求事件给所有匹配的处理器。
- :param bot: Bot 实例
- :param event: 请求事件对象
+ Args:
+ bot: Bot 实例。
+ event: 请求事件对象。
"""
for handler in self.request_handlers:
if handler["type"] is None or handler["type"] == event.request_type:
await self._run_handler(handler["func"], bot, event)
- # --- 通用执行器:自动注入参数 ---
- async def _run_handler(self, func, bot, event, args=None):
+ async def _run_handler(self, func: Callable, bot, event, args: List[str] = None):
"""
- 根据函数签名自动注入 bot, event 或 args
+ 智能执行事件处理器。
- :param func: 目标处理函数
- :param bot: Bot 实例
- :param event: 事件对象
- :param args: 指令参数(仅消息指令有效)
+ 该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参数
+ (如 `bot`, `event`, `args`),实现了依赖注入。
+
+ Args:
+ func (Callable): 目标处理器函数。
+ bot: Bot 实例。
+ event: 事件对象。
+ args (List[str], optional): 指令参数列表(仅对消息事件有效)。
+ Defaults to None.
"""
sig = inspect.signature(func)
params = sig.parameters
@@ -204,11 +247,14 @@ class CommandManager:
await func(**kwargs)
-# 确保前缀是元组格式
+# --- 全局单例 ---
+
+# 确保前缀配置是元组格式
if isinstance(comm_prefixes, list):
- comm_prefixes = tuple[Any, ...](comm_prefixes)
+ comm_prefixes = tuple(comm_prefixes)
elif isinstance(comm_prefixes, str):
comm_prefixes = (comm_prefixes,)
-# 实例化全局管理器
+# 实例化全局唯一的命令管理器
matcher = CommandManager(prefixes=comm_prefixes)
+
diff --git a/core/config_loader.py b/core/config_loader.py
index 0882266..776ddcb 100644
--- a/core/config_loader.py
+++ b/core/config_loader.py
@@ -64,6 +64,15 @@ class Config:
"""
return self._data.get("features", {})
+ @property
+ def redis(self) -> dict:
+ """
+ 获取 Redis 配置
+
+ :return: 配置字典
+ """
+ return self._data.get("redis", {})
+
# 实例化全局配置对象
global_config = Config()
diff --git a/core/redis_manager.py b/core/redis_manager.py
new file mode 100644
index 0000000..2b232d2
--- /dev/null
+++ b/core/redis_manager.py
@@ -0,0 +1,60 @@
+import redis
+from .config_loader import global_config as config
+
+class RedisManager:
+ """
+ Redis 连接管理器
+ """
+ _pool = None
+ _client = None
+
+ @classmethod
+ def initialize(cls):
+ """
+ 初始化 Redis 连接并进行健康检查
+ """
+ if cls._pool is None:
+ try:
+ host = config.redis['host']
+ port = config.redis['port']
+ db = config.redis['db']
+ password = config.redis.get('password')
+
+ print(f" 正在尝试连接 Redis: {host}:{port}, DB: {db}")
+
+ cls._pool = redis.ConnectionPool(
+ host=host,
+ port=port,
+ db=db,
+ password=password,
+ decode_responses=True
+ )
+ cls._client = redis.Redis(connection_pool=cls._pool)
+ if cls._client.ping():
+ print(" Redis 连接成功!")
+ else:
+ print(" Redis 连接失败: PING 命令无响应")
+ except redis.exceptions.ConnectionError as e:
+ print(f" Redis 连接失败: {e}")
+ cls._pool = None
+ cls._client = None
+ except Exception as e:
+ print(f" Redis 初始化时发生未知错误: {e}")
+ cls._pool = None
+ cls._client = None
+
+ @classmethod
+ def get_redis(cls):
+ """
+ 获取 Redis 连接
+
+ :return: Redis 连接实例
+ """
+ if cls._client is None:
+ # 理论上 initialize 应该在程序启动时被调用,这里作为备用
+ cls.initialize()
+ return cls._client
+
+# 在模块加载时直接初始化
+RedisManager.initialize()
+redis_client = RedisManager.get_redis()
diff --git a/core/ws.py b/core/ws.py
index 7a6206d..e35317f 100644
--- a/core/ws.py
+++ b/core/ws.py
@@ -1,7 +1,15 @@
"""
-WebSocket 核心模块
+WebSocket 核心通信模块
-负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。
+该模块定义了 `WS` 类,负责与 OneBot v11 实现(如 NapCat)建立和管理
+WebSocket 连接。它是整个机器人框架的底层通信基础。
+
+主要职责包括:
+- 建立 WebSocket 连接并处理认证。
+- 实现断线自动重连机制。
+- 监听并接收来自 OneBot 的事件和 API 响应。
+- 分发事件给 `CommandManager` 进行处理。
+- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
"""
import asyncio
import json
@@ -20,12 +28,14 @@ from .config_loader import global_config
class WS:
"""
- WebSocket 客户端类,负责与 OneBot 实现端建立连接并处理通信
+ WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。
"""
def __init__(self):
"""
- 初始化 WebSocket 客户端
+ 初始化 WebSocket 客户端。
+
+ 从全局配置中读取 WebSocket URI、访问令牌(Token)和重连间隔。
"""
# 读取参数
cfg = global_config.napcat_ws
@@ -39,7 +49,10 @@ class WS:
async def connect(self):
"""
- 主连接循环,负责建立连接和自动重连
+ 启动并管理 WebSocket 连接。
+
+ 这是一个无限循环,负责建立连接。如果连接断开,它会根据配置的
+ `reconnect_interval` 时间间隔后自动尝试重新连接。
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
@@ -67,9 +80,13 @@ class WS:
async def _listen_loop(self, websocket):
"""
- 核心监听循环,处理接收到的 WebSocket 消息
+ 核心监听循环,处理所有接收到的 WebSocket 消息。
- :param websocket: WebSocket 连接对象
+ 此循环会持续从 WebSocket 连接中读取消息,并根据消息内容
+ 判断是 API 响应还是上报的事件,然后分发给相应的处理逻辑。
+
+ Args:
+ websocket: 当前活动的 WebSocket 连接对象。
"""
async for message in websocket:
try:
@@ -96,9 +113,16 @@ class WS:
async def on_event(self, raw_data: dict):
"""
- 事件分发层:根据 post_type 调用 matcher 对应的处理器
+ 事件处理和分发层。
- :param raw_data: 原始事件数据字典
+ 当接收到一个 OneBot 事件时,此方法负责:
+ 1. 使用 `EventFactory` 将原始 JSON 数据解析成对应的事件对象。
+ 2. 为事件对象注入 `Bot` 实例,以便在插件中可以调用 API。
+ 3. 打印格式化的事件日志。
+ 4. 将事件对象传递给 `CommandManager` (`matcher`) 进行后续处理。
+
+ Args:
+ raw_data (dict): 从 WebSocket 接收到的原始事件字典。
"""
try:
# 使用工厂创建事件对象
@@ -124,11 +148,18 @@ class WS:
async def call_api(self, action: str, params: dict = None):
"""
- 调用 OneBot API
+ 向 OneBot v11 实现端发送一个 API 请求。
- :param action: API 动作名称
- :param params: API 参数
- :return: API 响应结果
+ 该方法通过 WebSocket 发送请求,并使用 `echo` 字段来匹配对应的响应。
+ 它创建了一个 `Future` 对象来异步等待响应,并设置了超时机制。
+
+ Args:
+ action (str): API 的动作名称,例如 "send_group_msg"。
+ params (dict, optional): API 请求的参数字典。 Defaults to None.
+
+ Returns:
+ dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
+ 表示失败的字典。
"""
if not self.ws:
return {"status": "failed", "msg": "websocket not initialized"}
@@ -152,3 +183,4 @@ class WS:
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
return {"status": "failed", "retcode": -1, "msg": "api timeout"}
+
diff --git a/main.py b/main.py
index 99e9a8d..50b87cc 100644
--- a/main.py
+++ b/main.py
@@ -12,6 +12,7 @@ from watchdog.events import FileSystemEventHandler
from core import WS
from core.plugin_manager import load_all_plugins
+#from core.redis_manager import redis_client
class PluginReloadHandler(FileSystemEventHandler):
diff --git a/models/events/base.py b/models/events/base.py
index 5bbe8d6..2a83962 100644
--- a/models/events/base.py
+++ b/models/events/base.py
@@ -1,7 +1,8 @@
"""
基础事件模型模块
-定义了所有 OneBot 11 事件的基类和事件类型枚举。
+该模块定义了所有 OneBot v11 事件模型的基类 `OneBotEvent` 和
+事件类型常量 `EventType`。所有具体的事件模型都应继承自 `OneBotEvent`。
"""
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional
@@ -13,46 +14,60 @@ if TYPE_CHECKING:
class EventType:
"""
- 事件类型枚举
+ OneBot v11 事件类型常量。
+
+ 用于标识不同种类的事件上报。
"""
- META = 'meta_event' # 元事件
- REQUEST = 'request' # 请求事件
- NOTICE = 'notice' # 通知事件
- MESSAGE = 'message' # 消息事件
- MESSAGE_SENT = 'message_sent' # 消息发送事件
+ META = 'meta_event'
+ """元事件 (meta_event): 如心跳、生命周期等。"""
+ REQUEST = 'request'
+ """请求事件 (request): 如加好友请求、加群请求等。"""
+ NOTICE = 'notice'
+ """通知事件 (notice): 如群成员增加、文件上传等。"""
+ MESSAGE = 'message'
+ """消息事件 (message): 如私聊消息、群消息等。"""
+ MESSAGE_SENT = 'message_sent'
+ """消息发送事件 (message_sent): 机器人自己发送消息的上报。"""
-@dataclass
+@dataclass(slots=True)
class OneBotEvent(ABC):
"""
- OneBot 事件基类
- 所有具体的事件类型都应该继承自此类
+ OneBot v11 事件的抽象基类。
+
+ 所有具体的事件模型都必须继承此类,并实现 `post_type` 属性。
+
+ Attributes:
+ time (int): 事件发生的时间戳 (秒)。
+ self_id (int): 收到事件的机器人 QQ 号。
+ _bot (Optional[Bot]): 内部持有的 Bot 实例引用,用于快捷 API 调用。
"""
time: int
- """事件发生的时间戳"""
-
self_id: int
- """收到事件的机器人 QQ 号"""
-
_bot: Optional["Bot"] = field(default=None, init=False)
- """Bot 实例引用,用于快捷调用 API"""
@property
@abstractmethod
def post_type(self) -> str:
"""
- 上报类型
+ 抽象属性,代表事件的上报类型。
+
+ 子类必须重写此属性,并返回对应的 `EventType` 常量值。
+ 例如: `return EventType.MESSAGE`
"""
pass
@property
def bot(self) -> "Bot":
"""
- 获取 Bot 实例
+ 获取与此事件关联的 `Bot` 实例,以便快捷调用 API。
- :return: Bot 实例
- :raises ValueError: 如果 Bot 实例未设置
+ Returns:
+ Bot: 当前事件所对应的 `Bot` 实例。
+
+ Raises:
+ ValueError: 如果 `Bot` 实例尚未被设置到事件对象中。
"""
if self._bot is None:
raise ValueError("Bot instance not set for this event")
@@ -61,8 +76,10 @@ class OneBotEvent(ABC):
@bot.setter
def bot(self, value: "Bot"):
"""
- 设置 Bot 实例
+ 为事件对象设置关联的 `Bot` 实例。
- :param value: Bot 实例
+ Args:
+ value (Bot): 要设置的 `Bot` 实例。
"""
self._bot = value
+
diff --git a/models/events/message.py b/models/events/message.py
index e2e0bf1..3ede724 100644
--- a/models/events/message.py
+++ b/models/events/message.py
@@ -11,7 +11,7 @@ from models.sender import Sender
from .base import OneBotEvent, EventType
-@dataclass
+@dataclass(slots=True)
class Anonymous:
"""
匿名信息
diff --git a/models/events/meta.py b/models/events/meta.py
index 91b44d8..b2c720f 100644
--- a/models/events/meta.py
+++ b/models/events/meta.py
@@ -8,7 +8,7 @@ from typing import Optional
from .base import OneBotEvent, EventType
-@dataclass
+@dataclass(slots=True)
class HeartbeatStatus:
"""
心跳状态接口
diff --git a/models/events/notice.py b/models/events/notice.py
index 8dcf56c..82cbbfc 100644
--- a/models/events/notice.py
+++ b/models/events/notice.py
@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
from .base import OneBotEvent, EventType
-@dataclass
+@dataclass(slots=True)
class NoticeEvent(OneBotEvent):
"""
通知事件基类
diff --git a/models/events/request.py b/models/events/request.py
index 87930b6..6f7d82d 100644
--- a/models/events/request.py
+++ b/models/events/request.py
@@ -7,7 +7,7 @@ from dataclasses import dataclass
from .base import OneBotEvent, EventType
-@dataclass
+@dataclass(slots=True)
class RequestEvent(OneBotEvent):
"""
请求事件基类
diff --git a/models/message.py b/models/message.py
index 914a149..ee7f701 100644
--- a/models/message.py
+++ b/models/message.py
@@ -1,48 +1,56 @@
"""
消息段模型模块
-定义了 MessageSegment 类,用于封装 OneBot 11 的消息段。
+该模块定义了 `MessageSegment` 类,用于构建和表示 OneBot v11 协议中的消息段。
+通过此类,可以方便地创建文本、图片、At 等不同类型的消息内容,并支持链式操作。
"""
from dataclasses import dataclass
from typing import Any, Dict
-@dataclass
+@dataclass(slots=True)
class MessageSegment:
"""
- 消息段,对应 OneBot 11 标准中的消息段对象
+ 表示一个 OneBot v11 消息段。
+
+ Attributes:
+ type (str): 消息段的类型,例如 'text', 'image', 'at'。
+ data (Dict[str, Any]): 消息段的具体数据,是一个键值对字典。
"""
type: str
- """消息段类型,如 text, image, at 等"""
-
data: Dict[str, Any]
- """消息段数据"""
@property
def text(self) -> str:
"""
- 获取文本内容(仅当 type 为 text 时有效)
+ 当消息段类型为 'text' 时,快速获取其文本内容。
- :return: 文本内容
+ Returns:
+ str: 消息段的文本内容。如果类型不是 'text',则返回空字符串。
"""
return self.data.get("text", "") if self.type == "text" else ""
@property
def image_url(self) -> str:
"""
- 获取图片 URL(仅当 type 为 image 时有效)
+ 当消息段类型为 'image' 时,快速获取其图片 URL。
- :return: 图片 URL
+ Returns:
+ str: 图片的 URL。如果类型不是 'image' 或数据中不含 'url',则返回空字符串。
"""
return self.data.get("url", "") if self.type == "image" else ""
def is_at(self, user_id: int = None) -> bool:
"""
- 判断是否为 @某人
+ 检查当前消息段是否是一个 'at' (提及) 消息段。
- :param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型
- :return: 是否匹配
+ Args:
+ user_id (int, optional): 如果提供,则进一步检查被提及的 QQ 号是否匹配。
+ Defaults to None.
+
+ Returns:
+ bool: 如果消息段是 'at' 类型且 user_id 匹配 (如果提供),则返回 True。
"""
if self.type != "at":
return False
@@ -51,6 +59,9 @@ class MessageSegment:
return str(self.data.get("qq")) == str(user_id)
def __repr__(self):
+ """
+ 返回消息段对象的字符串表示形式,便于调试。
+ """
return f"[MS:{self.type}:{self.data}]"
# --- 快捷构造方法 ---
@@ -58,39 +69,52 @@ class MessageSegment:
@staticmethod
def text(text: str) -> "MessageSegment":
"""
- 构造文本消息段
+ 创建一个文本消息段。
- :param text: 文本内容
- :return: MessageSegment 对象
+ Args:
+ text (str): 文本内容。
+
+ Returns:
+ MessageSegment: 一个类型为 'text' 的消息段对象。
"""
return MessageSegment(type="text", data={"text": text})
@staticmethod
def at(user_id: int | str) -> "MessageSegment":
"""
- 构造 @某人 消息段
+ 创建一个 @某人 的消息段。
- :param user_id: 目标 QQ 号,"all" 表示 @全体成员
- :return: MessageSegment 对象
+ Args:
+ user_id (int | str): 要提及的 QQ 号。若为 "all",则表示 @全体成员。
+
+ Returns:
+ MessageSegment: 一个类型为 'at' 的消息段对象。
"""
return MessageSegment(type="at", data={"qq": str(user_id)})
@staticmethod
def image(file: str) -> "MessageSegment":
"""
- 构造图片消息段
+ 创建一个图片消息段。
- :param file: 图片文件名、URL 或 Base64
- :return: MessageSegment 对象
+ Args:
+ file (str): 图片的路径、URL 或 Base64 编码的字符串。
+
+ Returns:
+ MessageSegment: 一个类型为 'image' 的消息段对象。
"""
return MessageSegment(type="image", data={"file": file})
@staticmethod
def face(id: int) -> "MessageSegment":
"""
- 构造表情消息段
+ 创建一个 QQ 表情消息段。
- :param id: 表情 ID
- :return: MessageSegment 对象
+ Args:
+ id (int): QQ 表情的 ID。
+
+ Returns:
+ MessageSegment: 一个类型为 'face' 的消息段对象。
"""
return MessageSegment(type="face", data={"id": str(id)})
+
diff --git a/models/objects.py b/models/objects.py
index 49d1769..2f20c51 100644
--- a/models/objects.py
+++ b/models/objects.py
@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
from typing import List, Optional
-@dataclass
+@dataclass(slots=True)
class GroupInfo:
"""
群信息
diff --git a/models/sender.py b/models/sender.py
index 3d63821..df81dfb 100644
--- a/models/sender.py
+++ b/models/sender.py
@@ -7,7 +7,7 @@ from dataclasses import dataclass
from typing import Optional
-@dataclass
+@dataclass(slots=True)
class Sender:
"""
发送者信息类,对应 OneBot 11 标准中的 sender 字段
diff --git a/plugins/forward_test.py b/plugins/forward_test.py
new file mode 100644
index 0000000..429037f
--- /dev/null
+++ b/plugins/forward_test.py
@@ -0,0 +1,42 @@
+"""
+合并转发消息测试插件
+"""
+from core.command_manager import matcher
+from core.bot import Bot
+from models import MessageEvent
+from models.message import MessageSegment
+
+__plugin_meta__ = {
+ "name": "furry",
+ "description": "处理 /furry 指令,发送furry图片,同时也是bot.build_forward_node演示",
+ "usage": "/furry - 发送一条furry图",
+}
+
+@matcher.command("furry")
+async def handle_forward_test(bot: Bot, event: MessageEvent, args: list[str]):
+ """
+ 处理 /furry 指令,发送furry图片,同时也是bot.build_forward_node实例
+
+ :param bot: Bot 实例
+ :param event: 消息事件对象
+ :param args: 指令参数
+ """
+ # 1. 构建消息节点列表
+ nodes = [
+ bot.build_forward_node(user_id=event.self_id, nickname="机器人", message="你要的furry来了"),
+ bot.build_forward_node(user_id=event.user_id, nickname=event.sender.nickname, message="让我看看"),
+ bot.build_forward_node(
+ user_id=event.self_id,
+ nickname="机器人",
+ message=[
+ MessageSegment.text("你要的福瑞图"),
+ MessageSegment.image("https://api.furry.ist/furry-img/")
+ ]
+ )
+ ]
+
+ try:
+ # 2. 发送合并转发消息
+ await bot.send_forwarded_messages(event, nodes)
+ except Exception as e:
+ await event.reply(f"发送失败: {e}")
diff --git a/plugins/jrcd.py b/plugins/jrcd.py
index 2528643..4b7e318 100644
--- a/plugins/jrcd.py
+++ b/plugins/jrcd.py
@@ -1,3 +1,8 @@
+"""
+今日人品插件
+
+提供 /jrcd 和 /bbcd 指令,用于娱乐。
+"""
import random
from datetime import datetime
@@ -19,7 +24,7 @@ JRCDMSG_2 = [
JRCDMSG_3 = [
"今天的长度是%scm,哦豁?听说你很勇哦?(✧◡✧)",
"今天的长度是%scm,嘶哈嘶哈(((o(*°▽°*)o)))...",
- "今天的长度是%scm,我靠,让哥哥爽一爽吧!(((o(*°▽°*)o)))...",
+ "今天的长度是%scm,我靠,让哥哥爽一-爽吧!(((o(*°▽°*)o)))...",
"今天的长度是%scm,单是看到哥哥的长度就....(〃w〃)",
]
@@ -38,6 +43,12 @@ BBCDMSG7 = ["试试刺刀看看谁能赢吧!"]
def get_jrcd(user_id: int) -> int:
+ """
+ 根据用户ID和当前日期生成一个伪随机的“长度”值。
+
+ :param user_id: 用户QQ号。
+ :return: 返回一个1到30之间的整数。
+ """
current_time = (
datetime.now().year * 100 + datetime.now().month * 100 + datetime.now().day
)
@@ -52,11 +63,11 @@ def get_jrcd(user_id: int) -> int:
@matcher.command("jrcd")
async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
"""
- 处理 jrcd 指令,来看看你的长度吧!
+ 处理 jrcd 指令,回复用户的“今日长度”。
- :param bot: Bot 实例
- :param event: 消息事件对象
- :param args: 指令参数列表
+ :param bot: Bot 实例。
+ :param event: 消息事件对象。
+ :param args: 指令参数列表(未使用)。
"""
user_id = event.user_id
jrcd = get_jrcd(user_id)
@@ -73,11 +84,11 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
@matcher.command("bbcd")
async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]):
"""
- 处理 bbcd 指令,和别人比比长度吧!
+ 处理 bbcd 指令,比较两位用户的“长度”。
- :param bot: Bot 实例
- :param event: 消息事件对象
- :param args: 指令参数列表
+ :param bot: Bot 实例。
+ :param event: 消息事件对象。
+ :param args: 指令参数列表(未使用)。
"""
message = event.message
print(message)
diff --git a/plugins/thpic.py b/plugins/thpic.py
index caa752d..a586533 100644
--- a/plugins/thpic.py
+++ b/plugins/thpic.py
@@ -12,4 +12,11 @@ from models import MessageEvent, MessageSegment
@matcher.command("thpic")
async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
+ """
+ 处理 thpic 指令,发送一张随机的东方Project图片。
+
+ :param bot: Bot 实例(未使用)。
+ :param event: 消息事件对象。
+ :param args: 指令参数列表(未使用)。
+ """
await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random"))
diff --git a/requirements.txt b/requirements.txt
index 5fff61d..a0a58de 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,3 +11,4 @@ urllib3==2.6.2
websockets==15.0.1
yarg==0.1.10
watchdog==6.0.0
+redis==5.0.7
From 093a47ea5046ad80a1743624428e8d62f60c81b0 Mon Sep 17 00:00:00 2001
From: K2cr2O1 <2221577113@qq.com>
Date: Fri, 2 Jan 2026 17:23:13 +0800
Subject: [PATCH 4/4] =?UTF-8?q?=E6=9B=B4=E6=8D=A2print=E5=88=B0logger?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
core/logger.py | 50 ++++++++++++++++++++++++++++++++++++++++++
core/plugin_manager.py | 7 +++---
core/redis_manager.py | 11 +++++-----
core/ws.py | 31 ++++++++++++++------------
main.py | 17 ++++++++------
requirements.txt | 1 +
6 files changed, 88 insertions(+), 29 deletions(-)
create mode 100644 core/logger.py
diff --git a/core/logger.py b/core/logger.py
new file mode 100644
index 0000000..76ec223
--- /dev/null
+++ b/core/logger.py
@@ -0,0 +1,50 @@
+"""
+日志模块
+
+该模块负责初始化和配置 loguru 日志记录器,为整个应用程序提供统一的日志记录接口。
+"""
+import sys
+from pathlib import Path
+from loguru import logger
+
+# 定义日志格式
+LOG_FORMAT = (
+ "{time:YYYY-MM-DD HH:mm:ss.SSS} | "
+ "{level: <8} | "
+ "{name}:{function}:{line} - "
+ "{message}"
+)
+
+# 移除 loguru 默认的处理器
+logger.remove()
+
+# 添加控制台输出处理器
+logger.add(
+ sys.stderr,
+ level="INFO",
+ format=LOG_FORMAT,
+ colorize=True,
+ enqueue=True # 异步写入
+)
+
+# 定义日志文件路径
+log_dir = Path("logs")
+log_dir.mkdir(exist_ok=True)
+log_file_path = log_dir / "{time:YYYY-MM-DD}.log"
+
+# 添加文件输出处理器
+logger.add(
+ log_file_path,
+ level="DEBUG",
+ format=LOG_FORMAT,
+ colorize=False,
+ rotation="00:00", # 每天午夜创建新文件
+ retention="7 days", # 保留最近 7 天的日志
+ encoding="utf-8",
+ enqueue=True, # 异步写入
+ backtrace=True, # 记录完整的异常堆栈
+ diagnose=True # 添加异常诊断信息
+)
+
+# 导出配置好的 logger
+__all__ = ["logger"]
diff --git a/core/plugin_manager.py b/core/plugin_manager.py
index df15828..aa16f52 100644
--- a/core/plugin_manager.py
+++ b/core/plugin_manager.py
@@ -9,6 +9,7 @@ import pkgutil
import sys
from core.command_manager import matcher
+from .logger import logger
def load_all_plugins():
@@ -24,7 +25,7 @@ def load_all_plugins():
plugin_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "plugins")
package_name = "plugins"
- print(f" 正在从 {package_name} 加载插件...")
+ logger.info(f"正在从 {package_name} 加载插件...")
for loader, module_name, is_pkg in pkgutil.iter_modules([plugin_dir]):
full_module_name = f"{package_name}.{module_name}"
@@ -43,6 +44,6 @@ def load_all_plugins():
matcher.plugins[full_module_name] = meta
type_str = "包" if is_pkg else "文件"
- print(f" [{type_str}] 成功{action}: {module_name}")
+ logger.success(f" [{type_str}] 成功{action}: {module_name}")
except Exception as e:
- print(f" {action if 'action' in locals() else '加载'}插件 {module_name} 失败: {e}")
+ logger.error(f" {action if 'action' in locals() else '加载'}插件 {module_name} 失败: {e}")
diff --git a/core/redis_manager.py b/core/redis_manager.py
index 2b232d2..3786d9c 100644
--- a/core/redis_manager.py
+++ b/core/redis_manager.py
@@ -1,5 +1,6 @@
import redis
from .config_loader import global_config as config
+from .logger import logger
class RedisManager:
"""
@@ -20,7 +21,7 @@ class RedisManager:
db = config.redis['db']
password = config.redis.get('password')
- print(f" 正在尝试连接 Redis: {host}:{port}, DB: {db}")
+ logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
cls._pool = redis.ConnectionPool(
host=host,
@@ -31,15 +32,15 @@ class RedisManager:
)
cls._client = redis.Redis(connection_pool=cls._pool)
if cls._client.ping():
- print(" Redis 连接成功!")
+ logger.success("Redis 连接成功!")
else:
- print(" Redis 连接失败: PING 命令无响应")
+ logger.error("Redis 连接失败: PING 命令无响应")
except redis.exceptions.ConnectionError as e:
- print(f" Redis 连接失败: {e}")
+ logger.error(f"Redis 连接失败: {e}")
cls._pool = None
cls._client = None
except Exception as e:
- print(f" Redis 初始化时发生未知错误: {e}")
+ logger.exception(f"Redis 初始化时发生未知错误: {e}")
cls._pool = None
cls._client = None
diff --git a/core/ws.py b/core/ws.py
index e35317f..c9d2b77 100644
--- a/core/ws.py
+++ b/core/ws.py
@@ -24,6 +24,7 @@ from models import EventFactory
from .bot import Bot
from .command_manager import matcher
from .config_loader import global_config
+from .logger import logger
class WS:
@@ -58,24 +59,23 @@ class WS:
while True:
try:
- print(f" 正在尝试连接至 NapCat: {self.url}")
+ logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket:
self.ws = websocket
- print(" 连接成功!")
+ logger.success("连接成功!")
await self._listen_loop(websocket)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
- print(f" 连接断开或服务器拒绝访问: {e}")
+ logger.warning(f"连接断开或服务器拒绝访问: {e}")
except Exception as e:
- print(f" 运行异常: {e}")
- traceback.print_exc()
+ logger.exception(f"运行异常: {e}")
- print(f" {self.reconnect_interval}秒后尝试重连...")
+ logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket):
@@ -108,8 +108,7 @@ class WS:
asyncio.create_task(self.on_event(data))
except Exception as e:
- print(f" 解析消息异常: {e}")
- traceback.print_exc()
+ logger.exception(f"解析消息异常: {e}")
async def on_event(self, raw_data: dict):
"""
@@ -130,21 +129,22 @@ class WS:
event.bot = self.bot # 注入 Bot 实例
# 打印日志
- t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S")
if event.post_type == "message":
sender_name = event.sender.nickname if event.sender else "Unknown"
- print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
+ logger.info(f"[消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
elif event.post_type == "notice":
- print(f" [{t}] [通知] {event.notice_type}")
+ logger.info(f"[通知] {event.notice_type}")
elif event.post_type == "request":
- print(f" [{t}] [请求] {event.request_type}")
+ logger.info(f"[请求] {event.request_type}")
+ elif event.post_type == "meta_event":
+ logger.debug(f"[元事件] {event.meta_event_type}")
+
# 分发事件
await matcher.handle_event(self.bot, event)
except Exception as e:
- print(f" 事件处理异常: {e}")
- traceback.print_exc()
+ logger.exception(f"事件处理异常: {e}")
async def call_api(self, action: str, params: dict = None):
"""
@@ -162,11 +162,13 @@ class WS:
表示失败的字典。
"""
if not self.ws:
+ logger.error("调用 API 失败: WebSocket 未初始化")
return {"status": "failed", "msg": "websocket not initialized"}
from websockets.protocol import State
if getattr(self.ws, "state", None) is not State.OPEN:
+ logger.error("调用 API 失败: WebSocket 连接未打开")
return {"status": "failed", "msg": "websocket is not open"}
echo_id = str(uuid.uuid4())
@@ -182,5 +184,6 @@ class WS:
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
+ logger.warning(f"API 调用超时: action={action}, params={params}")
return {"status": "failed", "retcode": -1, "msg": "api timeout"}
diff --git a/main.py b/main.py
index 50b87cc..1d9d6cc 100644
--- a/main.py
+++ b/main.py
@@ -10,9 +10,11 @@ import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
+# 初始化日志系统,必须在其他 core 模块导入之前执行
+from core.logger import logger
+
from core import WS
from core.plugin_manager import load_all_plugins
-#from core.redis_manager import redis_client
class PluginReloadHandler(FileSystemEventHandler):
@@ -55,17 +57,18 @@ class PluginReloadHandler(FileSystemEventHandler):
self.last_reload_time = current_time
- print(f"\n[HotReload] 检测到文件变更: {event.src_path}")
- print("[HotReload] 正在重载插件...")
+ logger.info(f"检测到文件变更: {event.src_path}")
+ logger.info("正在重载插件...")
try:
# 重新扫描并加载插件
load_all_plugins()
- print("[HotReload] 插件重载完成")
+ logger.success("插件重载完成")
except Exception as e:
- print(f"[HotReload] 重载失败: {e}")
+ logger.exception(f"重载失败: {e}")
+@logger.catch
async def main():
"""
主函数
@@ -87,9 +90,9 @@ async def main():
if os.path.exists(plugin_path):
observer.schedule(event_handler, plugin_path, recursive=True)
observer.start()
- print(f"[HotReload] 已启动插件热重载监控: {plugin_path}")
+ logger.info(f"已启动插件热重载监控: {plugin_path}")
else:
- print(f"[HotReload] 警告: 插件目录不存在 {plugin_path}")
+ logger.warning(f"插件目录不存在 {plugin_path}")
try:
bot = WS()
diff --git a/requirements.txt b/requirements.txt
index a0a58de..f630282 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -12,3 +12,4 @@ websockets==15.0.1
yarg==0.1.10
watchdog==6.0.0
redis==5.0.7
+loguru