Merge branch 'dev' into plugins
This commit is contained in:
109
README.md
109
README.md
@@ -39,6 +39,42 @@ NEO 框架的设计遵循以下核心理念:
|
||||
* **异步核心**:基于 `asyncio` 和 `websockets` 的高性能异步核心。
|
||||
* **自动重连**:内置 WebSocket 断线重连机制。
|
||||
|
||||
## ⚡️ 性能优化
|
||||
|
||||
### Redis 缓存机制
|
||||
为了提升响应速度并减少对 OneBot API 的重复调用,框架核心集成了一套基于 Redis 的缓存系统。对于一些不频繁变更的数据(如群信息、好友列表等),首次查询后会将其缓存至 Redis,在缓存有效期内(默认为 1 小时),后续请求将直接从 Redis 读取,极大提升了性能。
|
||||
|
||||
#### 工作原理
|
||||
- **自动缓存**:框架会自动缓存特定 API 的调用结果。
|
||||
- **缓存键**:缓存键根据 API 名称和关键参数(如 `group_id`, `user_id`)生成,确保唯一性。
|
||||
- **过期时间**:默认缓存 1 小时,之后会自动失效,下次调用时将重新从 OneBot 实现端获取最新数据。
|
||||
|
||||
#### 受影响的 API
|
||||
以下核心 API 已默认启用缓存:
|
||||
- `get_group_info`
|
||||
- `get_group_member_info`
|
||||
- `get_friend_list`
|
||||
- `get_stranger_info`
|
||||
- `get_login_info`
|
||||
|
||||
#### 如何绕过缓存
|
||||
在某些场景下,你可能需要获取实时数据而非缓存数据。为此,所有受缓存影响的 API 方法都增加了一个 `no_cache: bool = False` 的可选参数。
|
||||
|
||||
当你需要强制从服务器获取最新信息时,只需在调用时传入 `no_cache=True` 即可。
|
||||
|
||||
**示例:**
|
||||
```python
|
||||
# 正常调用,会使用缓存
|
||||
group_info = await bot.get_group_info(group_id=12345)
|
||||
|
||||
# 强制获取最新信息,不使用缓存
|
||||
latest_group_info = await bot.get_group_info(group_id=12345, no_cache=True)
|
||||
```
|
||||
|
||||
### `__slots__` 内存优化
|
||||
框架内的所有数据模型(包括事件、消息段、API 返回对象等)均已启用 `__slots__ = True` 优化。这可以显著减少每个对象实例的内存占用,特别是在处理大量事件和数据时,能够有效降低机器人的整体内存消耗。
|
||||
|
||||
|
||||
## 📝 待办事项 (TODO)
|
||||
|
||||
### API 封装
|
||||
@@ -90,10 +126,11 @@ NEO 框架的设计遵循以下核心理念:
|
||||
- [ ] **系统控制**
|
||||
- `set_restart`
|
||||
- [ ] **扩展功能**
|
||||
- `send_forward_msg`: 发送合并转发消息
|
||||
- [x] `send_forward_msg`: 发送合并转发消息
|
||||
|
||||
### 其他改进
|
||||
- [x] **API 强类型封装**: 将 API 返回值从 `dict` 转换为数据模型对象。
|
||||
- [x] **Redis 支持**: 集成 Redis 连接池,便于插件复用连接。
|
||||
- [ ] **日志系统优化**: 引入更完善的日志记录机制,支持文件输出和日志级别控制。
|
||||
- [ ] **异常处理增强**: 增强插件执行过程中的异常捕获,防止单个插件崩溃影响整个 Bot。
|
||||
- [ ] **中间件支持**: 添加消息处理中间件,支持在指令执行前/后进行拦截和处理。
|
||||
@@ -104,7 +141,10 @@ NEO 框架的设计遵循以下核心理念:
|
||||
```
|
||||
NEO/
|
||||
├── plugins/ # 插件目录,新建插件文件即可自动加载(支持热重载)
|
||||
│ └── echo.py # 示例插件:实现 /echo 和 /赞我 指令
|
||||
│ ├── echo.py # 示例插件:实现 /echo 和 /赞我 指令
|
||||
│ ├── forward_test.py # 示例插件:演示合并转发消息的构建和发送
|
||||
│ ├── jrcd.py # 娱乐插件:提供 /jrcd 和 /bbcd 指令
|
||||
│ └── thpic.py # 图片插件:提供 /thpic 指令,发送随机东方图片
|
||||
├── core/ # 核心框架代码
|
||||
│ ├── api/ # API 模块抽象层 (MessageAPI, GroupAPI, FriendAPI, AccountAPI)
|
||||
│ │ ├── __init__.py
|
||||
@@ -117,6 +157,7 @@ NEO/
|
||||
│ ├── command_manager.py # 命令与事件分发器
|
||||
│ ├── config_loader.py # 配置加载器
|
||||
│ ├── plugin_manager.py # 插件加载与管理
|
||||
│ ├── redis_manager.py # Redis 连接管理器
|
||||
│ └── ws.py # WebSocket 客户端核心
|
||||
├── models/ # 数据模型
|
||||
│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta)
|
||||
@@ -378,6 +419,70 @@ async def dangerous_command(bot: Bot, event: MessageEvent, args: list[str]):
|
||||
5. **性能考虑**:避免在插件中执行耗时同步操作
|
||||
6. **资源清理**:必要时使用 `try...finally` 确保资源释放
|
||||
|
||||
### 🚀 高性能插件开发规范 (避坑指南)
|
||||
为了保证整个机器人框架的响应速度和稳定性,所有插件都必须遵循异步、非阻塞的开发原则。任何一个插件中的阻塞操作都可能导致整个机器人卡顿或无响应。
|
||||
|
||||
以下是必须遵守的核心规范:
|
||||
|
||||
#### 1. 禁止任何形式的同步网络请求
|
||||
- **错误示范**: 使用 `requests` 库发起网络请求。
|
||||
```python
|
||||
import requests
|
||||
# 错误!这会阻塞整个程序
|
||||
response = requests.get("https://api.example.com")
|
||||
```
|
||||
- **正确做法**: 必须使用异步 HTTP 客户端,如 `aiohttp` 或 `httpx`。
|
||||
```python
|
||||
import httpx
|
||||
# 正确,使用 async with 和 await
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get("https://api.example.com")
|
||||
```
|
||||
|
||||
#### 2. 禁止使用 `time.sleep()`
|
||||
- **错误示范**: 使用 `time.sleep()` 进行等待。
|
||||
```python
|
||||
import time
|
||||
# 错误!这会阻塞事件循环
|
||||
time.sleep(5)
|
||||
```
|
||||
- **正确做法**: 必须使用 `asyncio.sleep()`。
|
||||
```python
|
||||
import asyncio
|
||||
# 正确,这会将控制权交还给事件循环
|
||||
await asyncio.sleep(5)
|
||||
```
|
||||
|
||||
#### 3. 谨慎处理文件 I/O
|
||||
- 对于读写小型、本地文件,直接使用 `with open(...)` 通常是可接受的。
|
||||
- 但对于大型文件或网络文件系统(NFS)上的文件,同步 I/O 可能会导致明显的阻塞。
|
||||
- **推荐做法**: 对于可能耗时较长的文件操作,使用 `aiofiles` 库。
|
||||
```python
|
||||
import aiofiles
|
||||
async with aiofiles.open('large_file.dat', mode='rb') as f:
|
||||
contents = await f.read()
|
||||
```
|
||||
|
||||
#### 4. 将 CPU 密集型任务移出事件循环
|
||||
- 如果插件需要执行复杂的计算(例如,图像处理、视频转码、数据分析),这些任务会长时间占用 CPU,同样会阻塞事件循环。
|
||||
- **正确做法**: 使用 `loop.run_in_executor()` 将这类任务抛到独立的线程池或进程池中执行。
|
||||
```python
|
||||
import asyncio
|
||||
|
||||
def cpu_bound_task(data):
|
||||
# 这是一个耗时的同步函数
|
||||
# ... 进行大量计算 ...
|
||||
return "计算结果"
|
||||
|
||||
# 在异步的事件处理器中调用
|
||||
loop = asyncio.get_running_loop()
|
||||
# `None` 表示使用默认的线程池
|
||||
result = await loop.run_in_executor(None, cpu_bound_task, "一些数据")
|
||||
await event.reply(result)
|
||||
```
|
||||
|
||||
遵循以上规范,可以确保您开发的插件不会成为整个机器人应用的性能瓶颈。
|
||||
|
||||
### 示例:完整插件模板
|
||||
```python
|
||||
"""
|
||||
|
||||
@@ -5,3 +5,9 @@ reconnect_interval = 5
|
||||
|
||||
[bot]
|
||||
command = ["/"]
|
||||
|
||||
[redis]
|
||||
host = "114.66.58.203"
|
||||
port = 1931
|
||||
db = 0
|
||||
password = "redis_5dxyJG"
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from .command_manager import matcher
|
||||
from .config_loader import global_config
|
||||
from .plugin_manager import PluginDataManager
|
||||
from .ws import WS
|
||||
from .WS import WS
|
||||
|
||||
__all__ = ["WS", "matcher", "global_config", "PluginDataManager"]
|
||||
|
||||
@@ -1,78 +1,106 @@
|
||||
"""
|
||||
账号相关 API 模块
|
||||
账号与状态相关 API 模块
|
||||
|
||||
该模块定义了 `AccountAPI` Mixin 类,提供了所有与机器人自身账号信息、
|
||||
状态设置等相关的 OneBot v11 API 封装。
|
||||
"""
|
||||
import json
|
||||
from typing import Dict, Any
|
||||
from .base import BaseAPI
|
||||
from models.objects import LoginInfo, VersionInfo, Status
|
||||
from core.redis_manager import redis_client as redis_manager
|
||||
|
||||
|
||||
class AccountAPI(BaseAPI):
|
||||
"""
|
||||
账号相关 API Mixin
|
||||
`AccountAPI` Mixin 类,提供了所有与机器人账号、状态相关的 API 方法。
|
||||
"""
|
||||
|
||||
async def get_login_info(self) -> LoginInfo:
|
||||
async def get_login_info(self, no_cache: bool = False) -> LoginInfo:
|
||||
"""
|
||||
获取登录号信息
|
||||
获取当前登录的机器人账号信息。
|
||||
|
||||
:return: 登录信息对象
|
||||
Args:
|
||||
no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False.
|
||||
|
||||
Returns:
|
||||
LoginInfo: 包含登录号 QQ 和昵称的 `LoginInfo` 数据对象。
|
||||
"""
|
||||
cache_key = f"neobot:cache:get_login_info:{self.self_id}"
|
||||
if not no_cache:
|
||||
cached_data = await redis_manager.get(cache_key)
|
||||
if cached_data:
|
||||
return LoginInfo(**json.loads(cached_data))
|
||||
|
||||
res = await self.call_api("get_login_info")
|
||||
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
|
||||
return LoginInfo(**res)
|
||||
|
||||
async def get_version_info(self) -> VersionInfo:
|
||||
"""
|
||||
获取版本信息
|
||||
获取 OneBot v11 实现的版本信息。
|
||||
|
||||
:return: 版本信息对象
|
||||
Returns:
|
||||
VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。
|
||||
"""
|
||||
res = await self.call_api("get_version_info")
|
||||
return VersionInfo(**res)
|
||||
|
||||
async def get_status(self) -> Status:
|
||||
"""
|
||||
获取状态
|
||||
获取 OneBot v11 实现的状态信息。
|
||||
|
||||
:return: 状态对象
|
||||
Returns:
|
||||
Status: 包含 OneBot 状态信息的 `Status` 数据对象。
|
||||
"""
|
||||
res = await self.call_api("get_status")
|
||||
return Status(**res)
|
||||
|
||||
async def bot_exit(self) -> Dict[str, Any]:
|
||||
"""
|
||||
退出机器人
|
||||
让机器人进程退出(需要实现端支持)。
|
||||
|
||||
:return: API 响应结果
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("bot_exit")
|
||||
|
||||
async def set_self_longnick(self, long_nick: str) -> Dict[str, Any]:
|
||||
"""
|
||||
设置个性签名
|
||||
设置机器人账号的个性签名。
|
||||
|
||||
:param long_nick: 个性签名内容
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
long_nick (str): 要设置的个性签名内容。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_self_longnick", {"longNick": long_nick})
|
||||
|
||||
async def set_input_status(self, user_id: int, event_type: int) -> Dict[str, Any]:
|
||||
"""
|
||||
设置输入状态
|
||||
设置 "对方正在输入..." 状态提示。
|
||||
|
||||
:param user_id: 用户 ID
|
||||
:param event_type: 事件类型
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
user_id (int): 目标用户的 QQ 号。
|
||||
event_type (int): 事件类型。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_input_status", {"user_id": user_id, "event_type": event_type})
|
||||
|
||||
async def set_diy_online_status(self, face_id: int, face_type: int, wording: str) -> Dict[str, Any]:
|
||||
"""
|
||||
设置自定义在线状态
|
||||
设置自定义的 "在线状态"。
|
||||
|
||||
:param face_id: 状态 ID
|
||||
:param face_type: 状态类型
|
||||
:param wording: 状态描述
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
face_id (int): 状态的表情 ID。
|
||||
face_type (int): 状态的表情类型。
|
||||
wording (str): 状态的描述文本。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_diy_online_status", {
|
||||
"face_id": face_id,
|
||||
@@ -82,43 +110,55 @@ class AccountAPI(BaseAPI):
|
||||
|
||||
async def set_online_status(self, status_code: int) -> Dict[str, Any]:
|
||||
"""
|
||||
设置在线状态
|
||||
设置在线状态(如在线、离开、摸鱼等)。
|
||||
|
||||
:param status_code: 状态码
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
status_code (int): 目标在线状态的状态码。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_online_status", {"status_code": status_code})
|
||||
|
||||
async def set_qq_profile(self, **kwargs) -> Dict[str, Any]:
|
||||
"""
|
||||
设置 QQ 资料
|
||||
设置机器人账号的个人资料。
|
||||
|
||||
:param kwargs: 个人资料相关参数
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
**kwargs: 个人资料的相关参数,具体字段请参考 OneBot v11 规范。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_qq_profile", kwargs)
|
||||
|
||||
async def set_qq_avatar(self, **kwargs) -> Dict[str, Any]:
|
||||
"""
|
||||
设置 QQ 头像
|
||||
设置机器人账号的头像。
|
||||
|
||||
:param kwargs: 头像相关参数
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
**kwargs: 头像的相关参数,具体字段请参考 OneBot v11 规范。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_qq_avatar", kwargs)
|
||||
|
||||
async def get_clientkey(self) -> Dict[str, Any]:
|
||||
"""
|
||||
获取客户端密钥
|
||||
获取客户端密钥(通常用于 QQ 登录相关操作)。
|
||||
|
||||
:return: API 响应结果
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("get_clientkey")
|
||||
|
||||
async def clean_cache(self) -> Dict[str, Any]:
|
||||
"""
|
||||
清理缓存
|
||||
清理 OneBot v11 实现端的缓存。
|
||||
|
||||
:return: API 响应结果
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("clean_cache")
|
||||
|
||||
|
||||
@@ -1,53 +1,86 @@
|
||||
"""
|
||||
好友相关 API 模块
|
||||
好友与陌生人相关 API 模块
|
||||
|
||||
该模块定义了 `FriendAPI` Mixin 类,提供了所有与好友、陌生人信息
|
||||
等相关的 OneBot v11 API 封装。
|
||||
"""
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
from .base import BaseAPI
|
||||
from models.objects import FriendInfo, StrangerInfo
|
||||
from core.redis_manager import redis_client as redis_manager
|
||||
|
||||
|
||||
class FriendAPI(BaseAPI):
|
||||
"""
|
||||
好友相关 API Mixin
|
||||
`FriendAPI` Mixin 类,提供了所有与好友、陌生人操作相关的 API 方法。
|
||||
"""
|
||||
|
||||
async def send_like(self, user_id: int, times: int = 1) -> Dict[str, Any]:
|
||||
"""
|
||||
发送点赞
|
||||
向指定用户发送 "戳一戳" (点赞)。
|
||||
|
||||
:param user_id: 对方 QQ 号
|
||||
:param times: 点赞次数
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
user_id (int): 目标用户的 QQ 号。
|
||||
times (int, optional): 点赞次数,建议不超过 10 次。Defaults to 1.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("send_like", {"user_id": user_id, "times": times})
|
||||
|
||||
async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> StrangerInfo:
|
||||
"""
|
||||
获取陌生人信息
|
||||
获取陌生人的信息。
|
||||
|
||||
:param user_id: QQ 号
|
||||
:param no_cache: 是否不使用缓存
|
||||
:return: 陌生人信息对象
|
||||
Args:
|
||||
user_id (int): 目标用户的 QQ 号。
|
||||
no_cache (bool, optional): 是否不使用缓存,直接从服务器获取。Defaults to False.
|
||||
|
||||
Returns:
|
||||
StrangerInfo: 包含陌生人信息的 `StrangerInfo` 数据对象。
|
||||
"""
|
||||
cache_key = f"neobot:cache:get_stranger_info:{user_id}"
|
||||
if not no_cache:
|
||||
cached_data = await redis_manager.get(cache_key)
|
||||
if cached_data:
|
||||
return StrangerInfo(**json.loads(cached_data))
|
||||
|
||||
res = await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache})
|
||||
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
|
||||
return StrangerInfo(**res)
|
||||
|
||||
async def get_friend_list(self) -> List[FriendInfo]:
|
||||
async def get_friend_list(self, no_cache: bool = False) -> List[FriendInfo]:
|
||||
"""
|
||||
获取好友列表
|
||||
获取机器人账号的好友列表。
|
||||
|
||||
:return: 好友信息对象列表
|
||||
Args:
|
||||
no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False.
|
||||
|
||||
Returns:
|
||||
List[FriendInfo]: 包含所有好友信息的 `FriendInfo` 对象列表。
|
||||
"""
|
||||
cache_key = f"neobot:cache:get_friend_list:{self.self_id}"
|
||||
if not no_cache:
|
||||
cached_data = await redis_manager.get(cache_key)
|
||||
if cached_data:
|
||||
return [FriendInfo(**item) for item in json.loads(cached_data)]
|
||||
|
||||
res = await self.call_api("get_friend_list")
|
||||
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
|
||||
return [FriendInfo(**item) for item in res]
|
||||
|
||||
async def set_friend_add_request(self, flag: str, approve: bool = True, remark: str = "") -> Dict[str, Any]:
|
||||
"""
|
||||
处理加好友请求
|
||||
处理收到的加好友请求。
|
||||
|
||||
:param flag: 加好友请求的 flag(需从上报的数据中获取)
|
||||
:param approve: 是否同意请求
|
||||
:param remark: 添加后的好友备注(仅在同意时有效)
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
flag (str): 请求的标识,需要从 `request` 事件中获取。
|
||||
approve (bool, optional): 是否同意该好友请求。Defaults to True.
|
||||
remark (str, optional): 在同意请求时,为该好友设置的备注。Defaults to "".
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark})
|
||||
|
||||
|
||||
@@ -1,47 +1,64 @@
|
||||
"""
|
||||
群组相关 API 模块
|
||||
|
||||
该模块定义了 `GroupAPI` Mixin 类,提供了所有与群组管理、成员操作
|
||||
等相关的 OneBot v11 API 封装。
|
||||
"""
|
||||
from typing import List, Dict, Any, Optional
|
||||
import json
|
||||
from core.redis_manager import redis_client as redis_manager
|
||||
from .base import BaseAPI
|
||||
from models.objects import GroupInfo, GroupMemberInfo, GroupHonorInfo
|
||||
|
||||
|
||||
class GroupAPI(BaseAPI):
|
||||
"""
|
||||
群组相关 API Mixin
|
||||
`GroupAPI` Mixin 类,提供了所有与群组操作相关的 API 方法。
|
||||
"""
|
||||
|
||||
async def set_group_kick(self, group_id: int, user_id: int, reject_add_request: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
群组踢人
|
||||
将指定成员踢出群组。
|
||||
|
||||
:param group_id: 群号
|
||||
:param user_id: 要踢的 QQ 号
|
||||
:param reject_add_request: 拒绝此人的加群请求
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
user_id (int): 要踢出的成员的 QQ 号。
|
||||
reject_add_request (bool, optional): 是否拒绝该用户此后的加群请求。Defaults to False.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_kick", {"group_id": group_id, "user_id": user_id, "reject_add_request": reject_add_request})
|
||||
|
||||
async def set_group_ban(self, group_id: int, user_id: int, duration: int = 30 * 60) -> Dict[str, Any]:
|
||||
async def set_group_ban(self, group_id: int, user_id: int, duration: int = 1800) -> Dict[str, Any]:
|
||||
"""
|
||||
群组单人禁言
|
||||
禁言群组中的指定成员。
|
||||
|
||||
:param group_id: 群号
|
||||
:param user_id: 要禁言的 QQ 号
|
||||
:param duration: 禁言时长(秒),0 表示解除禁言
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
user_id (int): 要禁言的成员的 QQ 号。
|
||||
duration (int, optional): 禁言时长,单位为秒。设置为 0 表示解除禁言。
|
||||
Defaults to 1800 (30 分钟).
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_ban", {"group_id": group_id, "user_id": user_id, "duration": duration})
|
||||
|
||||
async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 30 * 60, flag: str = None) -> Dict[str, Any]:
|
||||
async def set_group_anonymous_ban(self, group_id: int, anonymous: Dict[str, Any] = None, duration: int = 1800, flag: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
群组匿名禁言
|
||||
禁言群组中的匿名用户。
|
||||
|
||||
:param group_id: 群号
|
||||
:param anonymous: 可选,要禁言的匿名用户对象(群消息事件的 anonymous 字段)
|
||||
:param duration: 禁言时长(秒)
|
||||
:param flag: 可选,要禁言的匿名用户的 flag(需从群消息事件的 anonymous 字段中获取)
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
anonymous (Dict[str, Any], optional): 要禁言的匿名用户对象,
|
||||
可从群消息事件的 `anonymous` 字段中获取。Defaults to None.
|
||||
duration (int, optional): 禁言时长,单位为秒。Defaults to 1800.
|
||||
flag (str, optional): 要禁言的匿名用户的 flag 标识,
|
||||
可从群消息事件的 `anonymous` 字段中获取。Defaults to None.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
params = {"group_id": group_id, "duration": duration}
|
||||
if anonymous:
|
||||
@@ -52,139 +69,196 @@ class GroupAPI(BaseAPI):
|
||||
|
||||
async def set_group_whole_ban(self, group_id: int, enable: bool = True) -> Dict[str, Any]:
|
||||
"""
|
||||
群组全员禁言
|
||||
开启或关闭群组全员禁言。
|
||||
|
||||
:param group_id: 群号
|
||||
:param enable: 是否开启
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
enable (bool, optional): True 表示开启全员禁言,False 表示关闭。Defaults to True.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_whole_ban", {"group_id": group_id, "enable": enable})
|
||||
|
||||
async def set_group_admin(self, group_id: int, user_id: int, enable: bool = True) -> Dict[str, Any]:
|
||||
"""
|
||||
群组设置管理员
|
||||
设置或取消群组成员的管理员权限。
|
||||
|
||||
:param group_id: 群号
|
||||
:param user_id: 要设置的 QQ 号
|
||||
:param enable: True 为设置,False 为取消
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
user_id (int): 目标成员的 QQ 号。
|
||||
enable (bool, optional): True 表示设为管理员,False 表示取消管理员。Defaults to True.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_admin", {"group_id": group_id, "user_id": user_id, "enable": enable})
|
||||
|
||||
async def set_group_anonymous(self, group_id: int, enable: bool = True) -> Dict[str, Any]:
|
||||
"""
|
||||
群组匿名
|
||||
开启或关闭群组的匿名聊天功能。
|
||||
|
||||
:param group_id: 群号
|
||||
:param enable: 是否开启
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
enable (bool, optional): True 表示开启匿名,False 表示关闭。Defaults to True.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_anonymous", {"group_id": group_id, "enable": enable})
|
||||
|
||||
async def set_group_card(self, group_id: int, user_id: int, card: str = "") -> Dict[str, Any]:
|
||||
"""
|
||||
设置群名片(群备注)
|
||||
设置群组成员的群名片。
|
||||
|
||||
:param group_id: 群号
|
||||
:param user_id: 要设置的 QQ 号
|
||||
:param card: 群名片内容,不填或空字符串表示删除群名片
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
user_id (int): 目标成员的 QQ 号。
|
||||
card (str, optional): 要设置的群名片内容。
|
||||
传入空字符串 `""` 或 `None` 表示删除该成员的群名片。Defaults to "".
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_card", {"group_id": group_id, "user_id": user_id, "card": card})
|
||||
|
||||
async def set_group_name(self, group_id: int, group_name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
设置群名
|
||||
设置群组的名称。
|
||||
|
||||
:param group_id: 群号
|
||||
:param group_name: 新群名
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
group_name (str): 新的群组名称。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_name", {"group_id": group_id, "group_name": group_name})
|
||||
|
||||
async def set_group_leave(self, group_id: int, is_dismiss: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
退出群组
|
||||
退出或解散一个群组。
|
||||
|
||||
:param group_id: 群号
|
||||
:param is_dismiss: 是否解散,如果登录号是群主,则仅在此项为 True 时能够解散
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
is_dismiss (bool, optional): 是否解散群组。
|
||||
仅当机器人是群主时,此项设为 True 才能解散群。Defaults to False.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_leave", {"group_id": group_id, "is_dismiss": is_dismiss})
|
||||
|
||||
async def set_group_special_title(self, group_id: int, user_id: int, special_title: str = "", duration: int = -1) -> Dict[str, Any]:
|
||||
"""
|
||||
设置群组专属头衔
|
||||
为群组成员设置专属头衔。
|
||||
|
||||
:param group_id: 群号
|
||||
:param user_id: 要设置的 QQ 号
|
||||
:param special_title: 专属头衔,不填或空字符串表示删除
|
||||
:param duration: 有效期(秒),-1 表示永久
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
user_id (int): 目标成员的 QQ 号。
|
||||
special_title (str, optional): 专属头衔内容。
|
||||
传入空字符串 `""` 或 `None` 表示删除头衔。Defaults to "".
|
||||
duration (int, optional): 头衔有效期,单位为秒。-1 表示永久。Defaults to -1.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_special_title", {"group_id": group_id, "user_id": user_id, "special_title": special_title, "duration": duration})
|
||||
|
||||
async def get_group_info(self, group_id: int, no_cache: bool = False) -> GroupInfo:
|
||||
"""
|
||||
获取群信息
|
||||
获取群组的详细信息。
|
||||
|
||||
:param group_id: 群号
|
||||
:param no_cache: 是否不使用缓存
|
||||
:return: 群信息对象
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
no_cache (bool, optional): 是否不使用缓存,直接从服务器获取最新信息。Defaults to False.
|
||||
|
||||
Returns:
|
||||
GroupInfo: 包含群组信息的 `GroupInfo` 数据对象。
|
||||
"""
|
||||
res = await self.call_api("get_group_info", {"group_id": group_id, "no_cache": no_cache})
|
||||
cache_key = f"neobot:cache:get_group_info:{group_id}"
|
||||
if not no_cache:
|
||||
cached_data = await redis_manager.get(cache_key)
|
||||
if cached_data:
|
||||
return GroupInfo(**json.loads(cached_data))
|
||||
|
||||
res = await self.call_api("get_group_info", {"group_id": group_id})
|
||||
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
|
||||
return GroupInfo(**res)
|
||||
|
||||
async def get_group_list(self) -> List[GroupInfo]:
|
||||
"""
|
||||
获取群列表
|
||||
获取机器人加入的所有群组的列表。
|
||||
|
||||
:return: 群信息对象列表
|
||||
Returns:
|
||||
List[GroupInfo]: 包含所有群组信息的 `GroupInfo` 对象列表。
|
||||
"""
|
||||
res = await self.call_api("get_group_list")
|
||||
return [GroupInfo(**item) for item in res]
|
||||
|
||||
async def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> GroupMemberInfo:
|
||||
"""
|
||||
获取群成员信息
|
||||
获取指定群组成员的详细信息。
|
||||
|
||||
:param group_id: 群号
|
||||
:param user_id: QQ 号
|
||||
:param no_cache: 是否不使用缓存
|
||||
:return: 群成员信息对象
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
user_id (int): 目标成员的 QQ 号。
|
||||
no_cache (bool, optional): 是否不使用缓存。Defaults to False.
|
||||
|
||||
Returns:
|
||||
GroupMemberInfo: 包含群成员信息的 `GroupMemberInfo` 数据对象。
|
||||
"""
|
||||
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id, "no_cache": no_cache})
|
||||
cache_key = f"neobot:cache:get_group_member_info:{group_id}:{user_id}"
|
||||
if not no_cache:
|
||||
cached_data = await redis_manager.get(cache_key)
|
||||
if cached_data:
|
||||
return GroupMemberInfo(**json.loads(cached_data))
|
||||
|
||||
res = await self.call_api("get_group_member_info", {"group_id": group_id, "user_id": user_id})
|
||||
await redis_manager.set(cache_key, json.dumps(res), ex=3600) # 缓存 1 小时
|
||||
return GroupMemberInfo(**res)
|
||||
|
||||
async def get_group_member_list(self, group_id: int) -> List[GroupMemberInfo]:
|
||||
"""
|
||||
获取群成员列表
|
||||
获取一个群组的所有成员列表。
|
||||
|
||||
:param group_id: 群号
|
||||
:return: 群成员信息对象列表
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
|
||||
Returns:
|
||||
List[GroupMemberInfo]: 包含所有群成员信息的 `GroupMemberInfo` 对象列表。
|
||||
"""
|
||||
res = await self.call_api("get_group_member_list", {"group_id": group_id})
|
||||
return [GroupMemberInfo(**item) for item in res]
|
||||
|
||||
async def get_group_honor_info(self, group_id: int, type: str) -> GroupHonorInfo:
|
||||
"""
|
||||
获取群荣誉信息
|
||||
获取群组的荣誉信息(如龙王、群聊之火等)。
|
||||
|
||||
:param group_id: 群号
|
||||
:param type: 要获取的群荣誉类型,可传入 talkative, performer, legend, strong_newbie, emotion 等
|
||||
:return: 群荣誉信息对象
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
type (str): 要获取的荣誉类型。
|
||||
可选值: "talkative", "performer", "legend", "strong_newbie", "emotion" 等。
|
||||
|
||||
Returns:
|
||||
GroupHonorInfo: 包含群荣誉信息的 `GroupHonorInfo` 数据对象。
|
||||
"""
|
||||
res = await self.call_api("get_group_honor_info", {"group_id": group_id, "type": type})
|
||||
return GroupHonorInfo(**res)
|
||||
|
||||
async def set_group_add_request(self, flag: str, sub_type: str, approve: bool = True, reason: str = "") -> Dict[str, Any]:
|
||||
"""
|
||||
处理加群请求/邀请
|
||||
处理加群请求或邀请。
|
||||
|
||||
:param flag: 加群请求的 flag(需从上报的数据中获取)
|
||||
:param sub_type: add 或 invite,请求类型(需要与上报消息中的 sub_type 字段相符)
|
||||
:param approve: 是否同意请求/邀请
|
||||
:param reason: 拒绝理由(仅在拒绝时有效)
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
flag (str): 请求的标识,需要从 `request` 事件中获取。
|
||||
sub_type (str): 请求的子类型,`add` 或 `invite`,
|
||||
需要与 `request` 事件中的 `sub_type` 字段相符。
|
||||
approve (bool, optional): 是否同意请求或邀请。Defaults to True.
|
||||
reason (str, optional): 拒绝加群的理由(仅在 `approve` 为 False 时有效)。Defaults to "".
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason})
|
||||
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
"""
|
||||
消息相关 API 模块
|
||||
|
||||
该模块定义了 `MessageAPI` Mixin 类,提供了所有与消息发送、撤回、
|
||||
转发等相关的 OneBot v11 API 封装。
|
||||
"""
|
||||
from typing import Union, List, Dict, Any, TYPE_CHECKING
|
||||
from .base import BaseAPI
|
||||
@@ -10,17 +13,22 @@ if TYPE_CHECKING:
|
||||
|
||||
class MessageAPI(BaseAPI):
|
||||
"""
|
||||
消息相关 API Mixin
|
||||
`MessageAPI` Mixin 类,提供了所有与消息操作相关的 API 方法。
|
||||
"""
|
||||
|
||||
async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
发送群消息
|
||||
发送群消息。
|
||||
|
||||
:param group_id: 群号
|
||||
:param message: 消息内容,可以是字符串、MessageSegment 对象或 MessageSegment 列表
|
||||
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
|
||||
可以是纯文本字符串、单个消息段对象或消息段列表。
|
||||
auto_escape (bool, optional): 仅当 `message` 为字符串时有效,
|
||||
是否对消息内容进行 CQ 码转义。Defaults to False.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api(
|
||||
"send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape}
|
||||
@@ -28,12 +36,15 @@ class MessageAPI(BaseAPI):
|
||||
|
||||
async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
发送私聊消息
|
||||
发送私聊消息。
|
||||
|
||||
:param user_id: 用户 QQ 号
|
||||
:param message: 消息内容,可以是字符串、MessageSegment 对象或 MessageSegment 列表
|
||||
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
user_id (int): 目标用户的 QQ 号。
|
||||
message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
|
||||
auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api(
|
||||
"send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape}
|
||||
@@ -41,12 +52,18 @@ class MessageAPI(BaseAPI):
|
||||
|
||||
async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
智能发送消息,根据事件类型自动选择发送方式
|
||||
智能发送消息。
|
||||
|
||||
:param event: 触发事件对象
|
||||
:param message: 消息内容
|
||||
:param auto_escape: 是否自动转义
|
||||
:return: API 响应结果
|
||||
该方法会根据传入的事件对象 `event` 自动判断是私聊还是群聊,
|
||||
并调用相应的发送函数。如果事件是消息事件,则优先使用 `reply` 方法。
|
||||
|
||||
Args:
|
||||
event (OneBotEvent): 触发该发送行为的事件对象。
|
||||
message (Union[str, MessageSegment, List[MessageSegment]]): 要发送的消息内容。
|
||||
auto_escape (bool, optional): 是否对消息内容进行 CQ 码转义。Defaults to False.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
# 如果是消息事件,直接调用 reply
|
||||
if hasattr(event, "reply"):
|
||||
@@ -66,53 +83,98 @@ class MessageAPI(BaseAPI):
|
||||
|
||||
async def delete_msg(self, message_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
撤回消息
|
||||
撤回一条消息。
|
||||
|
||||
:param message_id: 消息 ID
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
message_id (int): 要撤回的消息的 ID。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("delete_msg", {"message_id": message_id})
|
||||
|
||||
async def get_msg(self, message_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
获取消息
|
||||
获取一条消息的详细信息。
|
||||
|
||||
:param message_id: 消息 ID
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
message_id (int): 要获取的消息的 ID。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据,包含消息详情。
|
||||
"""
|
||||
return await self.call_api("get_msg", {"message_id": message_id})
|
||||
|
||||
async def get_forward_msg(self, id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
获取合并转发消息
|
||||
获取合并转发消息的内容。
|
||||
|
||||
:param id: 合并转发 ID
|
||||
:return: API 响应结果
|
||||
Args:
|
||||
id (str): 合并转发消息的 ID。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据,包含转发消息的节点列表。
|
||||
"""
|
||||
return await self.call_api("get_forward_msg", {"id": id})
|
||||
|
||||
async def send_group_forward_msg(self, group_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""
|
||||
发送群聊合并转发消息。
|
||||
|
||||
Args:
|
||||
group_id (int): 目标群组的群号。
|
||||
messages (List[Dict[str, Any]]): 消息节点列表。
|
||||
推荐使用 `bot.build_forward_node` 来构建节点。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("send_group_forward_msg", {"group_id": group_id, "messages": messages})
|
||||
|
||||
async def send_private_forward_msg(self, user_id: int, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""
|
||||
发送私聊合并转发消息。
|
||||
|
||||
Args:
|
||||
user_id (int): 目标用户的 QQ 号。
|
||||
messages (List[Dict[str, Any]]): 消息节点列表。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("send_private_forward_msg", {"user_id": user_id, "messages": messages})
|
||||
|
||||
async def can_send_image(self) -> Dict[str, Any]:
|
||||
"""
|
||||
检查是否可以发送图片
|
||||
检查当前机器人账号是否可以发送图片。
|
||||
|
||||
:return: API 响应结果
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("can_send_image")
|
||||
|
||||
async def can_send_record(self) -> Dict[str, Any]:
|
||||
"""
|
||||
检查是否可以发送语音
|
||||
检查当前机器人账号是否可以发送语音。
|
||||
|
||||
:return: API 响应结果
|
||||
Returns:
|
||||
Dict[str, Any]: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.call_api("can_send_record")
|
||||
|
||||
def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]:
|
||||
"""
|
||||
处理消息内容,将其转换为 API 可接受的格式
|
||||
内部方法:将消息内容处理成 OneBot API 可接受的格式。
|
||||
|
||||
:param message: 原始消息内容
|
||||
:return: 处理后的消息内容
|
||||
- `str` -> `str`
|
||||
- `MessageSegment` -> `List[Dict]`
|
||||
- `List[MessageSegment]` -> `List[Dict]`
|
||||
|
||||
Args:
|
||||
message: 原始消息内容。
|
||||
|
||||
Returns:
|
||||
处理后的消息内容。
|
||||
"""
|
||||
if isinstance(message, str):
|
||||
return message
|
||||
@@ -130,12 +192,16 @@ class MessageAPI(BaseAPI):
|
||||
|
||||
def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]:
|
||||
"""
|
||||
将 MessageSegment 对象转换为字典
|
||||
内部方法:将 `MessageSegment` 对象转换为字典。
|
||||
|
||||
:param segment: MessageSegment 对象
|
||||
:return: 字典格式的消息段
|
||||
Args:
|
||||
segment (MessageSegment): 消息段对象。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 符合 OneBot 规范的消息段字典。
|
||||
"""
|
||||
return {
|
||||
"type": segment.type,
|
||||
"data": segment.data
|
||||
}
|
||||
|
||||
|
||||
102
core/bot.py
102
core/bot.py
@@ -1,36 +1,114 @@
|
||||
"""
|
||||
Bot 抽象模块
|
||||
Bot 核心抽象模块
|
||||
|
||||
定义了 Bot 类,封装了 OneBot API 的调用逻辑,提供了便捷的消息发送方法。
|
||||
该模块定义了 `Bot` 类,它是与 OneBot v11 API 进行交互的主要接口。
|
||||
`Bot` 类通过继承 `api` 目录下的各个 Mixin 类,将不同类别的 API 调用
|
||||
整合在一起,提供了一个统一、便捷的调用入口。
|
||||
|
||||
主要职责包括:
|
||||
- 封装 WebSocket 通信,提供 `call_api` 方法。
|
||||
- 提供高级消息发送功能,如 `send_forwarded_messages`。
|
||||
- 整合所有细分的 API 调用(消息、群组、好友等)。
|
||||
"""
|
||||
from typing import TYPE_CHECKING, Dict, Any
|
||||
from typing import TYPE_CHECKING, Dict, Any, List, Union
|
||||
from models.events.base import OneBotEvent
|
||||
from models.message import MessageSegment
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .ws import WS
|
||||
from .WS import WS
|
||||
|
||||
from .api import MessageAPI, GroupAPI, FriendAPI, AccountAPI
|
||||
|
||||
|
||||
class Bot(MessageAPI, GroupAPI, FriendAPI, AccountAPI):
|
||||
"""
|
||||
Bot 抽象类,封装 API 调用和常用操作
|
||||
继承各个 API Mixin 以提高代码的可维护性
|
||||
机器人核心类,封装了所有与 OneBot API 的交互。
|
||||
|
||||
通过 Mixin 模式继承了所有 API 功能,使得结构清晰且易于扩展。
|
||||
实例由 `WS` 客户端在连接成功后创建,并传递给所有事件处理器和插件。
|
||||
"""
|
||||
|
||||
def __init__(self, ws_client: "WS"):
|
||||
"""
|
||||
初始化 Bot 实例
|
||||
初始化 Bot 实例。
|
||||
|
||||
:param ws_client: WebSocket 客户端实例,用于底层通信
|
||||
Args:
|
||||
ws_client (WS): WebSocket 客户端实例,负责底层的 API 请求和响应处理。
|
||||
"""
|
||||
self.ws = ws_client
|
||||
|
||||
async def call_api(self, action: str, params: Dict[str, Any] = None) -> Any:
|
||||
"""
|
||||
调用 OneBot API
|
||||
底层 API 调用方法。
|
||||
|
||||
:param action: API 动作名称
|
||||
:param params: API 参数
|
||||
:return: API 响应结果
|
||||
所有具体的 API 实现最终都会调用此方法,通过 WebSocket 发送请求。
|
||||
|
||||
Args:
|
||||
action (str): API 的动作名称,例如 "send_group_msg"。
|
||||
params (Dict[str, Any], optional): API 请求的参数字典。Defaults to None.
|
||||
|
||||
Returns:
|
||||
Any: OneBot API 的响应数据。
|
||||
"""
|
||||
return await self.ws.call_api(action, params)
|
||||
|
||||
def build_forward_node(self, user_id: int, nickname: str, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Dict[str, Any]:
|
||||
"""
|
||||
构建一个用于合并转发的消息节点 (Node)。
|
||||
|
||||
这是一个辅助方法,用于方便地创建符合 OneBot v11 规范的消息节点,
|
||||
以便在 `send_forwarded_messages` 中使用。
|
||||
|
||||
Args:
|
||||
user_id (int): 发送者的 QQ 号。
|
||||
nickname (str): 发送者在消息中显示的昵称。
|
||||
message (Union[str, MessageSegment, List[MessageSegment]]): 该节点的消息内容,
|
||||
可以是纯文本、单个消息段或消息段列表。
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 构造好的消息节点字典。
|
||||
"""
|
||||
return {
|
||||
"type": "node",
|
||||
"data": {
|
||||
"uin": user_id,
|
||||
"name": nickname,
|
||||
"content": self._process_message(message)
|
||||
}
|
||||
}
|
||||
|
||||
async def send_forwarded_messages(self, target: Union[int, "OneBotEvent"], nodes: List[Dict[str, Any]]):
|
||||
"""
|
||||
发送合并转发消息。
|
||||
|
||||
该方法实现了智能判断,可以根据 `target` 的类型自动发送群聊合并转发
|
||||
或私聊合并转发消息。
|
||||
|
||||
Args:
|
||||
target (Union[int, OneBotEvent]): 发送目标。
|
||||
- 如果是 `OneBotEvent` 对象,则自动判断是群聊还是私聊。
|
||||
- 如果是 `int`,则默认为群号,发送群聊合并转发。
|
||||
nodes (List[Dict[str, Any]]): 消息节点列表。
|
||||
推荐使用 `build_forward_node` 方法来构建列表中的每个节点。
|
||||
|
||||
Raises:
|
||||
ValueError: 如果事件对象中既没有 `group_id` 也没有 `user_id`。
|
||||
"""
|
||||
if isinstance(target, OneBotEvent):
|
||||
group_id = getattr(target, "group_id", None)
|
||||
user_id = getattr(target, "user_id", None)
|
||||
|
||||
if group_id:
|
||||
# 直接发送群聊合并转发
|
||||
await self.send_group_forward_msg(group_id, nodes)
|
||||
elif user_id:
|
||||
# 发送私聊合并转发
|
||||
await self.send_private_forward_msg(user_id, nodes)
|
||||
else:
|
||||
raise ValueError("Event has neither group_id nor user_id")
|
||||
|
||||
else:
|
||||
# 默认行为是发送到群聊
|
||||
group_id = target
|
||||
await self.send_group_forward_msg(group_id, nodes)
|
||||
|
||||
|
||||
@@ -1,7 +1,17 @@
|
||||
"""
|
||||
命令管理器模块
|
||||
命令与事件管理器模块
|
||||
|
||||
提供装饰器用于注册消息指令、通知处理器和请求处理器,并负责事件的分发。
|
||||
该模块定义了 `CommandManager` 类,它是整个机器人框架事件处理的核心。
|
||||
它通过装饰器模式,为插件提供了注册消息指令、通知事件处理器和
|
||||
请求事件处理器的能力。
|
||||
|
||||
主要职责:
|
||||
- 提供 `@matcher.command()` 装饰器来注册命令。
|
||||
- 提供 `@matcher.on_notice()` 装饰器来注册通知处理器。
|
||||
- 提供 `@matcher.on_request()` 装饰器来注册请求处理器。
|
||||
- 负责解析收到的消息,匹配命令前缀并分发给对应的处理器。
|
||||
- 统一处理所有类型的事件,并将其分发给所有已注册的处理器。
|
||||
- 内置一个 `/help` 命令,用于展示所有已加载插件的帮助信息。
|
||||
"""
|
||||
import inspect
|
||||
from typing import Any, Callable, Dict, List, Tuple
|
||||
@@ -14,22 +24,25 @@ comm_prefixes = global_config.bot.get("command", ("/",))
|
||||
|
||||
class CommandManager:
|
||||
"""
|
||||
命令管理器,负责注册和分发指令、通知和请求事件
|
||||
命令管理器,负责注册和分发所有类型的事件。
|
||||
|
||||
这是一个单例对象(`matcher`),在整个应用中共享。
|
||||
"""
|
||||
|
||||
def __init__(self, prefixes: Tuple[str, ...] = ("/",)):
|
||||
def __init__(self, prefixes: Tuple[str, ...]):
|
||||
"""
|
||||
初始化命令管理器
|
||||
初始化命令管理器。
|
||||
|
||||
:param prefixes: 命令前缀元组
|
||||
Args:
|
||||
prefixes (Tuple[str, ...]): 一个包含所有合法命令前缀的元组。
|
||||
"""
|
||||
self.prefixes = prefixes
|
||||
self.commands: Dict[str, Callable] = {} # 存储消息指令
|
||||
self.notice_handlers: List[Dict] = [] # 存储通知处理器
|
||||
self.request_handlers: List[Dict] = [] # 存储请求处理器
|
||||
self.plugins: Dict[str, Dict[str, Any]] = {} # 存储插件元数据
|
||||
self.commands: Dict[str, Callable] = {} # 存储消息指令处理器
|
||||
self.notice_handlers: List[Dict] = [] # 存储通知事件处理器
|
||||
self.request_handlers: List[Dict] = [] # 存储请求事件处理器
|
||||
self.plugins: Dict[str, Dict[str, Any]] = {} # 存储已加载插件的元数据
|
||||
|
||||
# --- 内置 help 指令 ---
|
||||
# --- 注册内置 help 指令 ---
|
||||
self.commands["help"] = self._help_command
|
||||
self.plugins["core.help"] = {
|
||||
"name": "帮助",
|
||||
@@ -39,10 +52,13 @@ class CommandManager:
|
||||
|
||||
async def _help_command(self, bot, event):
|
||||
"""
|
||||
内置的 /help 指令处理器
|
||||
内置的 `/help` 命令的实现。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 消息事件对象
|
||||
该命令会遍历所有已加载插件的元数据,并生成一段格式化的帮助文本。
|
||||
|
||||
Args:
|
||||
bot: Bot 实例。
|
||||
event: 消息事件对象。
|
||||
"""
|
||||
help_text = "--- 可用指令列表 ---\n"
|
||||
|
||||
@@ -57,58 +73,78 @@ class CommandManager:
|
||||
|
||||
await bot.send(event, help_text.strip())
|
||||
|
||||
# --- 1. 消息指令装饰器 ---
|
||||
def command(self, name: str):
|
||||
def command(self, name: str) -> Callable:
|
||||
"""
|
||||
装饰器:注册消息指令
|
||||
装饰器:用于注册一个消息指令处理器。
|
||||
|
||||
:param name: 指令名称(不含前缀)
|
||||
:return: 装饰器函数
|
||||
Example:
|
||||
@matcher.command("echo")
|
||||
async def handle_echo(bot, event, args):
|
||||
await bot.send(event, " ".join(args))
|
||||
|
||||
Args:
|
||||
name (str): 指令的名称(不包含命令前缀)。
|
||||
|
||||
Returns:
|
||||
Callable: 原函数,使其可以继续被调用。
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
def decorator(func: Callable) -> Callable:
|
||||
self.commands[name] = func
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
# --- 2. 通知事件装饰器 ---
|
||||
def on_notice(self, notice_type: str = None):
|
||||
def on_notice(self, notice_type: str = None) -> Callable:
|
||||
"""
|
||||
装饰器:注册通知处理器
|
||||
装饰器:用于注册一个通知事件处理器。
|
||||
|
||||
:param notice_type: 通知类型,如果为 None 则处理所有通知
|
||||
:return: 装饰器函数
|
||||
如果 `notice_type` 未指定,则该处理器会接收所有类型的通知事件。
|
||||
|
||||
Args:
|
||||
notice_type (str, optional): 要处理的通知类型 (e.g., "group_increase")。
|
||||
Defaults to None.
|
||||
|
||||
Returns:
|
||||
Callable: 原函数。
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
def decorator(func: Callable) -> Callable:
|
||||
self.notice_handlers.append({"type": notice_type, "func": func})
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
# --- 3. 请求事件装饰器 ---
|
||||
def on_request(self, request_type: str = None):
|
||||
def on_request(self, request_type: str = None) -> Callable:
|
||||
"""
|
||||
装饰器:注册请求处理器
|
||||
装饰器:用于注册一个请求事件处理器。
|
||||
|
||||
:param request_type: 请求类型,如果为 None 则处理所有请求
|
||||
:return: 装饰器函数
|
||||
如果 `request_type` 未指定,则该处理器会接收所有类型的请求事件。
|
||||
|
||||
Args:
|
||||
request_type (str, optional): 要处理的请求类型 (e.g., "friend", "group")。
|
||||
Defaults to None.
|
||||
|
||||
Returns:
|
||||
Callable: 原函数。
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
def decorator(func: Callable) -> Callable:
|
||||
self.request_handlers.append({"type": request_type, "func": func})
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
# --- 统一事件分发入口 ---
|
||||
async def handle_event(self, bot, event):
|
||||
"""
|
||||
统一事件分发入口
|
||||
统一的事件分发入口。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 事件对象
|
||||
由 `WS` 客户端在接收到事件后调用。该方法会根据事件的 `post_type`
|
||||
将其分发给对应的具体处理方法。
|
||||
|
||||
Args:
|
||||
bot: Bot 实例。
|
||||
event: 已解析的事件对象。
|
||||
"""
|
||||
post_type = event.post_type
|
||||
|
||||
@@ -119,13 +155,16 @@ class CommandManager:
|
||||
elif post_type == 'request':
|
||||
await self.handle_request(bot, event)
|
||||
|
||||
# --- 消息分发逻辑 ---
|
||||
async def handle_message(self, bot, event):
|
||||
"""
|
||||
解析并分发消息指令
|
||||
处理消息事件,解析并分发指令。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 消息事件对象
|
||||
该方法会检查消息是否以已配置的命令前缀开头,如果是,则解析出
|
||||
指令名称和参数,并调用对应的处理器。
|
||||
|
||||
Args:
|
||||
bot: Bot 实例。
|
||||
event: 消息事件对象。
|
||||
"""
|
||||
if not event.raw_message:
|
||||
return
|
||||
@@ -155,39 +194,43 @@ class CommandManager:
|
||||
func = self.commands[cmd_name]
|
||||
await self._run_handler(func, bot, event, args)
|
||||
|
||||
# --- 通知分发逻辑 ---
|
||||
async def handle_notice(self, bot, event):
|
||||
"""
|
||||
分发通知事件
|
||||
分发通知事件给所有匹配的处理器。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 通知事件对象
|
||||
Args:
|
||||
bot: Bot 实例。
|
||||
event: 通知事件对象。
|
||||
"""
|
||||
for handler in self.notice_handlers:
|
||||
if handler["type"] is None or handler["type"] == event.notice_type:
|
||||
await self._run_handler(handler["func"], bot, event)
|
||||
|
||||
# --- 请求分发逻辑 ---
|
||||
async def handle_request(self, bot, event):
|
||||
"""
|
||||
分发请求事件
|
||||
分发请求事件给所有匹配的处理器。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 请求事件对象
|
||||
Args:
|
||||
bot: Bot 实例。
|
||||
event: 请求事件对象。
|
||||
"""
|
||||
for handler in self.request_handlers:
|
||||
if handler["type"] is None or handler["type"] == event.request_type:
|
||||
await self._run_handler(handler["func"], bot, event)
|
||||
|
||||
# --- 通用执行器:自动注入参数 ---
|
||||
async def _run_handler(self, func, bot, event, args=None):
|
||||
async def _run_handler(self, func: Callable, bot, event, args: List[str] = None):
|
||||
"""
|
||||
根据函数签名自动注入 bot, event 或 args
|
||||
智能执行事件处理器。
|
||||
|
||||
:param func: 目标处理函数
|
||||
:param bot: Bot 实例
|
||||
:param event: 事件对象
|
||||
:param args: 指令参数(仅消息指令有效)
|
||||
该方法会检查目标处理器的函数签名,并根据签名动态地传入所需的参数
|
||||
(如 `bot`, `event`, `args`),实现了依赖注入。
|
||||
|
||||
Args:
|
||||
func (Callable): 目标处理器函数。
|
||||
bot: Bot 实例。
|
||||
event: 事件对象。
|
||||
args (List[str], optional): 指令参数列表(仅对消息事件有效)。
|
||||
Defaults to None.
|
||||
"""
|
||||
sig = inspect.signature(func)
|
||||
params = sig.parameters
|
||||
@@ -204,11 +247,14 @@ class CommandManager:
|
||||
await func(**kwargs)
|
||||
|
||||
|
||||
# 确保前缀是元组格式
|
||||
# --- 全局单例 ---
|
||||
|
||||
# 确保前缀配置是元组格式
|
||||
if isinstance(comm_prefixes, list):
|
||||
comm_prefixes = tuple[Any, ...](comm_prefixes)
|
||||
comm_prefixes = tuple(comm_prefixes)
|
||||
elif isinstance(comm_prefixes, str):
|
||||
comm_prefixes = (comm_prefixes,)
|
||||
|
||||
# 实例化全局管理器
|
||||
# 实例化全局唯一的命令管理器
|
||||
matcher = CommandManager(prefixes=comm_prefixes)
|
||||
|
||||
|
||||
@@ -64,6 +64,15 @@ class Config:
|
||||
"""
|
||||
return self._data.get("features", {})
|
||||
|
||||
@property
|
||||
def redis(self) -> dict:
|
||||
"""
|
||||
获取 Redis 配置
|
||||
|
||||
:return: 配置字典
|
||||
"""
|
||||
return self._data.get("redis", {})
|
||||
|
||||
|
||||
# 实例化全局配置对象
|
||||
global_config = Config()
|
||||
|
||||
50
core/logger.py
Normal file
50
core/logger.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""
|
||||
日志模块
|
||||
|
||||
该模块负责初始化和配置 loguru 日志记录器,为整个应用程序提供统一的日志记录接口。
|
||||
"""
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
# 定义日志格式
|
||||
LOG_FORMAT = (
|
||||
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
|
||||
"<level>{level: <8}</level> | "
|
||||
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
|
||||
"<level>{message}</level>"
|
||||
)
|
||||
|
||||
# 移除 loguru 默认的处理器
|
||||
logger.remove()
|
||||
|
||||
# 添加控制台输出处理器
|
||||
logger.add(
|
||||
sys.stderr,
|
||||
level="INFO",
|
||||
format=LOG_FORMAT,
|
||||
colorize=True,
|
||||
enqueue=True # 异步写入
|
||||
)
|
||||
|
||||
# 定义日志文件路径
|
||||
log_dir = Path("logs")
|
||||
log_dir.mkdir(exist_ok=True)
|
||||
log_file_path = log_dir / "{time:YYYY-MM-DD}.log"
|
||||
|
||||
# 添加文件输出处理器
|
||||
logger.add(
|
||||
log_file_path,
|
||||
level="DEBUG",
|
||||
format=LOG_FORMAT,
|
||||
colorize=False,
|
||||
rotation="00:00", # 每天午夜创建新文件
|
||||
retention="7 days", # 保留最近 7 天的日志
|
||||
encoding="utf-8",
|
||||
enqueue=True, # 异步写入
|
||||
backtrace=True, # 记录完整的异常堆栈
|
||||
diagnose=True # 添加异常诊断信息
|
||||
)
|
||||
|
||||
# 导出配置好的 logger
|
||||
__all__ = ["logger"]
|
||||
@@ -11,6 +11,7 @@ import pkgutil
|
||||
import sys
|
||||
|
||||
from core.command_manager import matcher
|
||||
from .logger import logger
|
||||
|
||||
|
||||
def load_all_plugins():
|
||||
@@ -28,7 +29,7 @@ def load_all_plugins():
|
||||
)
|
||||
package_name = "plugins"
|
||||
|
||||
print(f" 正在从 {package_name} 加载插件...")
|
||||
logger.info(f"正在从 {package_name} 加载插件...")
|
||||
|
||||
for loader, module_name, is_pkg in pkgutil.iter_modules([plugin_dir]):
|
||||
full_module_name = f"{package_name}.{module_name}"
|
||||
@@ -47,7 +48,7 @@ def load_all_plugins():
|
||||
matcher.plugins[full_module_name] = meta
|
||||
|
||||
type_str = "包" if is_pkg else "文件"
|
||||
print(f" [{type_str}] 成功{action}: {module_name}")
|
||||
logger.success(f" [{type_str}] 成功{action}: {module_name}")
|
||||
except Exception as e:
|
||||
print(
|
||||
f" {action if 'action' in locals() else '加载'}插件 {module_name} 失败: {e}"
|
||||
|
||||
61
core/redis_manager.py
Normal file
61
core/redis_manager.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import redis
|
||||
from .config_loader import global_config as config
|
||||
from .logger import logger
|
||||
|
||||
class RedisManager:
|
||||
"""
|
||||
Redis 连接管理器
|
||||
"""
|
||||
_pool = None
|
||||
_client = None
|
||||
|
||||
@classmethod
|
||||
def initialize(cls):
|
||||
"""
|
||||
初始化 Redis 连接并进行健康检查
|
||||
"""
|
||||
if cls._pool is None:
|
||||
try:
|
||||
host = config.redis['host']
|
||||
port = config.redis['port']
|
||||
db = config.redis['db']
|
||||
password = config.redis.get('password')
|
||||
|
||||
logger.info(f"正在尝试连接 Redis: {host}:{port}, DB: {db}")
|
||||
|
||||
cls._pool = redis.ConnectionPool(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
password=password,
|
||||
decode_responses=True
|
||||
)
|
||||
cls._client = redis.Redis(connection_pool=cls._pool)
|
||||
if cls._client.ping():
|
||||
logger.success("Redis 连接成功!")
|
||||
else:
|
||||
logger.error("Redis 连接失败: PING 命令无响应")
|
||||
except redis.exceptions.ConnectionError as e:
|
||||
logger.error(f"Redis 连接失败: {e}")
|
||||
cls._pool = None
|
||||
cls._client = None
|
||||
except Exception as e:
|
||||
logger.exception(f"Redis 初始化时发生未知错误: {e}")
|
||||
cls._pool = None
|
||||
cls._client = None
|
||||
|
||||
@classmethod
|
||||
def get_redis(cls):
|
||||
"""
|
||||
获取 Redis 连接
|
||||
|
||||
:return: Redis 连接实例
|
||||
"""
|
||||
if cls._client is None:
|
||||
# 理论上 initialize 应该在程序启动时被调用,这里作为备用
|
||||
cls.initialize()
|
||||
return cls._client
|
||||
|
||||
# 在模块加载时直接初始化
|
||||
RedisManager.initialize()
|
||||
redis_client = RedisManager.get_redis()
|
||||
89
core/ws.py
89
core/ws.py
@@ -1,7 +1,15 @@
|
||||
"""
|
||||
WebSocket 核心模块
|
||||
WebSocket 核心通信模块
|
||||
|
||||
负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。
|
||||
该模块定义了 `WS` 类,负责与 OneBot v11 实现(如 NapCat)建立和管理
|
||||
WebSocket 连接。它是整个机器人框架的底层通信基础。
|
||||
|
||||
主要职责包括:
|
||||
- 建立 WebSocket 连接并处理认证。
|
||||
- 实现断线自动重连机制。
|
||||
- 监听并接收来自 OneBot 的事件和 API 响应。
|
||||
- 分发事件给 `CommandManager` 进行处理。
|
||||
- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
@@ -16,16 +24,19 @@ from models import EventFactory
|
||||
from .bot import Bot
|
||||
from .command_manager import matcher
|
||||
from .config_loader import global_config
|
||||
from .logger import logger
|
||||
|
||||
|
||||
class WS:
|
||||
"""
|
||||
WebSocket 客户端类,负责与 OneBot 实现端建立连接并处理通信
|
||||
WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
初始化 WebSocket 客户端
|
||||
初始化 WebSocket 客户端。
|
||||
|
||||
从全局配置中读取 WebSocket URI、访问令牌(Token)和重连间隔。
|
||||
"""
|
||||
# 读取参数
|
||||
cfg = global_config.napcat_ws
|
||||
@@ -39,37 +50,43 @@ class WS:
|
||||
|
||||
async def connect(self):
|
||||
"""
|
||||
主连接循环,负责建立连接和自动重连
|
||||
启动并管理 WebSocket 连接。
|
||||
|
||||
这是一个无限循环,负责建立连接。如果连接断开,它会根据配置的
|
||||
`reconnect_interval` 时间间隔后自动尝试重新连接。
|
||||
"""
|
||||
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
|
||||
|
||||
while True:
|
||||
try:
|
||||
print(f" 正在尝试连接至 NapCat: {self.url}")
|
||||
logger.info(f"正在尝试连接至 NapCat: {self.url}")
|
||||
async with websockets.connect(
|
||||
self.url, additional_headers=headers
|
||||
) as websocket:
|
||||
self.ws = websocket
|
||||
print(" 连接成功!")
|
||||
logger.success("连接成功!")
|
||||
await self._listen_loop(websocket)
|
||||
|
||||
except (
|
||||
websockets.exceptions.ConnectionClosed,
|
||||
ConnectionRefusedError,
|
||||
) as e:
|
||||
print(f" 连接断开或服务器拒绝访问: {e}")
|
||||
logger.warning(f"连接断开或服务器拒绝访问: {e}")
|
||||
except Exception as e:
|
||||
print(f" 运行异常: {e}")
|
||||
traceback.print_exc()
|
||||
logger.exception(f"运行异常: {e}")
|
||||
|
||||
print(f" {self.reconnect_interval}秒后尝试重连...")
|
||||
logger.info(f"{self.reconnect_interval}秒后尝试重连...")
|
||||
await asyncio.sleep(self.reconnect_interval)
|
||||
|
||||
async def _listen_loop(self, websocket):
|
||||
"""
|
||||
核心监听循环,处理接收到的 WebSocket 消息
|
||||
核心监听循环,处理所有接收到的 WebSocket 消息。
|
||||
|
||||
:param websocket: WebSocket 连接对象
|
||||
此循环会持续从 WebSocket 连接中读取消息,并根据消息内容
|
||||
判断是 API 响应还是上报的事件,然后分发给相应的处理逻辑。
|
||||
|
||||
Args:
|
||||
websocket: 当前活动的 WebSocket 连接对象。
|
||||
"""
|
||||
async for message in websocket:
|
||||
try:
|
||||
@@ -91,14 +108,20 @@ class WS:
|
||||
asyncio.create_task(self.on_event(data))
|
||||
|
||||
except Exception as e:
|
||||
print(f" 解析消息异常: {e}")
|
||||
traceback.print_exc()
|
||||
logger.exception(f"解析消息异常: {e}")
|
||||
|
||||
async def on_event(self, raw_data: dict):
|
||||
"""
|
||||
事件分发层:根据 post_type 调用 matcher 对应的处理器
|
||||
事件处理和分发层。
|
||||
|
||||
:param raw_data: 原始事件数据字典
|
||||
当接收到一个 OneBot 事件时,此方法负责:
|
||||
1. 使用 `EventFactory` 将原始 JSON 数据解析成对应的事件对象。
|
||||
2. 为事件对象注入 `Bot` 实例,以便在插件中可以调用 API。
|
||||
3. 打印格式化的事件日志。
|
||||
4. 将事件对象传递给 `CommandManager` (`matcher`) 进行后续处理。
|
||||
|
||||
Args:
|
||||
raw_data (dict): 从 WebSocket 接收到的原始事件字典。
|
||||
"""
|
||||
try:
|
||||
# 使用工厂创建事件对象
|
||||
@@ -106,36 +129,46 @@ class WS:
|
||||
event.bot = self.bot # 注入 Bot 实例
|
||||
|
||||
# 打印日志
|
||||
t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S")
|
||||
if event.post_type == "message":
|
||||
sender_name = event.sender.nickname if event.sender else "Unknown"
|
||||
print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
|
||||
logger.info(f"[消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
|
||||
elif event.post_type == "notice":
|
||||
print(f" [{t}] [通知] {event.notice_type}")
|
||||
logger.info(f"[通知] {event.notice_type}")
|
||||
elif event.post_type == "request":
|
||||
print(f" [{t}] [请求] {event.request_type}")
|
||||
logger.info(f"[请求] {event.request_type}")
|
||||
elif event.post_type == "meta_event":
|
||||
logger.debug(f"[元事件] {event.meta_event_type}")
|
||||
|
||||
|
||||
# 分发事件
|
||||
await matcher.handle_event(self.bot, event)
|
||||
|
||||
except Exception as e:
|
||||
print(f" 事件处理异常: {e}")
|
||||
traceback.print_exc()
|
||||
logger.exception(f"事件处理异常: {e}")
|
||||
|
||||
async def call_api(self, action: str, params: dict = None):
|
||||
"""
|
||||
调用 OneBot API
|
||||
向 OneBot v11 实现端发送一个 API 请求。
|
||||
|
||||
:param action: API 动作名称
|
||||
:param params: API 参数
|
||||
:return: API 响应结果
|
||||
该方法通过 WebSocket 发送请求,并使用 `echo` 字段来匹配对应的响应。
|
||||
它创建了一个 `Future` 对象来异步等待响应,并设置了超时机制。
|
||||
|
||||
Args:
|
||||
action (str): API 的动作名称,例如 "send_group_msg"。
|
||||
params (dict, optional): API 请求的参数字典。 Defaults to None.
|
||||
|
||||
Returns:
|
||||
dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
|
||||
表示失败的字典。
|
||||
"""
|
||||
if not self.ws:
|
||||
logger.error("调用 API 失败: WebSocket 未初始化")
|
||||
return {"status": "failed", "msg": "websocket not initialized"}
|
||||
|
||||
from websockets.protocol import State
|
||||
|
||||
if getattr(self.ws, "state", None) is not State.OPEN:
|
||||
logger.error("调用 API 失败: WebSocket 连接未打开")
|
||||
return {"status": "failed", "msg": "websocket is not open"}
|
||||
|
||||
echo_id = str(uuid.uuid4())
|
||||
@@ -151,4 +184,6 @@ class WS:
|
||||
return await asyncio.wait_for(future, timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
self._pending_requests.pop(echo_id, None)
|
||||
logger.warning(f"API 调用超时: action={action}, params={params}")
|
||||
return {"status": "failed", "retcode": -1, "msg": "api timeout"}
|
||||
|
||||
|
||||
16
main.py
16
main.py
@@ -10,6 +10,9 @@ import time
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
|
||||
# 初始化日志系统,必须在其他 core 模块导入之前执行
|
||||
from core.logger import logger
|
||||
|
||||
from core import WS
|
||||
from core.plugin_manager import load_all_plugins
|
||||
|
||||
@@ -54,17 +57,18 @@ class PluginReloadHandler(FileSystemEventHandler):
|
||||
|
||||
self.last_reload_time = current_time
|
||||
|
||||
print(f"\n[HotReload] 检测到文件变更: {event.src_path}")
|
||||
print("[HotReload] 正在重载插件...")
|
||||
logger.info(f"检测到文件变更: {event.src_path}")
|
||||
logger.info("正在重载插件...")
|
||||
|
||||
try:
|
||||
# 重新扫描并加载插件
|
||||
load_all_plugins()
|
||||
print("[HotReload] 插件重载完成")
|
||||
logger.success("插件重载完成")
|
||||
except Exception as e:
|
||||
print(f"[HotReload] 重载失败: {e}")
|
||||
logger.exception(f"重载失败: {e}")
|
||||
|
||||
|
||||
@logger.catch
|
||||
async def main():
|
||||
"""
|
||||
主函数
|
||||
@@ -86,9 +90,9 @@ async def main():
|
||||
if os.path.exists(plugin_path):
|
||||
observer.schedule(event_handler, plugin_path, recursive=True)
|
||||
observer.start()
|
||||
print(f"[HotReload] 已启动插件热重载监控: {plugin_path}")
|
||||
logger.info(f"已启动插件热重载监控: {plugin_path}")
|
||||
else:
|
||||
print(f"[HotReload] 警告: 插件目录不存在 {plugin_path}")
|
||||
logger.warning(f"插件目录不存在 {plugin_path}")
|
||||
|
||||
try:
|
||||
bot = WS()
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
"""
|
||||
基础事件模型模块
|
||||
|
||||
定义了所有 OneBot 11 事件的基类和事件类型枚举。
|
||||
该模块定义了所有 OneBot v11 事件模型的基类 `OneBotEvent` 和
|
||||
事件类型常量 `EventType`。所有具体的事件模型都应继承自 `OneBotEvent`。
|
||||
"""
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
@@ -13,46 +14,60 @@ if TYPE_CHECKING:
|
||||
|
||||
class EventType:
|
||||
"""
|
||||
事件类型枚举
|
||||
OneBot v11 事件类型常量。
|
||||
|
||||
用于标识不同种类的事件上报。
|
||||
"""
|
||||
META = 'meta_event' # 元事件
|
||||
REQUEST = 'request' # 请求事件
|
||||
NOTICE = 'notice' # 通知事件
|
||||
MESSAGE = 'message' # 消息事件
|
||||
MESSAGE_SENT = 'message_sent' # 消息发送事件
|
||||
META = 'meta_event'
|
||||
"""元事件 (meta_event): 如心跳、生命周期等。"""
|
||||
REQUEST = 'request'
|
||||
"""请求事件 (request): 如加好友请求、加群请求等。"""
|
||||
NOTICE = 'notice'
|
||||
"""通知事件 (notice): 如群成员增加、文件上传等。"""
|
||||
MESSAGE = 'message'
|
||||
"""消息事件 (message): 如私聊消息、群消息等。"""
|
||||
MESSAGE_SENT = 'message_sent'
|
||||
"""消息发送事件 (message_sent): 机器人自己发送消息的上报。"""
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class OneBotEvent(ABC):
|
||||
"""
|
||||
OneBot 事件基类
|
||||
所有具体的事件类型都应该继承自此类
|
||||
OneBot v11 事件的抽象基类。
|
||||
|
||||
所有具体的事件模型都必须继承此类,并实现 `post_type` 属性。
|
||||
|
||||
Attributes:
|
||||
time (int): 事件发生的时间戳 (秒)。
|
||||
self_id (int): 收到事件的机器人 QQ 号。
|
||||
_bot (Optional[Bot]): 内部持有的 Bot 实例引用,用于快捷 API 调用。
|
||||
"""
|
||||
|
||||
time: int
|
||||
"""事件发生的时间戳"""
|
||||
|
||||
self_id: int
|
||||
"""收到事件的机器人 QQ 号"""
|
||||
|
||||
_bot: Optional["Bot"] = field(default=None, init=False)
|
||||
"""Bot 实例引用,用于快捷调用 API"""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def post_type(self) -> str:
|
||||
"""
|
||||
上报类型
|
||||
抽象属性,代表事件的上报类型。
|
||||
|
||||
子类必须重写此属性,并返回对应的 `EventType` 常量值。
|
||||
例如: `return EventType.MESSAGE`
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
def bot(self) -> "Bot":
|
||||
"""
|
||||
获取 Bot 实例
|
||||
获取与此事件关联的 `Bot` 实例,以便快捷调用 API。
|
||||
|
||||
:return: Bot 实例
|
||||
:raises ValueError: 如果 Bot 实例未设置
|
||||
Returns:
|
||||
Bot: 当前事件所对应的 `Bot` 实例。
|
||||
|
||||
Raises:
|
||||
ValueError: 如果 `Bot` 实例尚未被设置到事件对象中。
|
||||
"""
|
||||
if self._bot is None:
|
||||
raise ValueError("Bot instance not set for this event")
|
||||
@@ -61,8 +76,10 @@ class OneBotEvent(ABC):
|
||||
@bot.setter
|
||||
def bot(self, value: "Bot"):
|
||||
"""
|
||||
设置 Bot 实例
|
||||
为事件对象设置关联的 `Bot` 实例。
|
||||
|
||||
:param value: Bot 实例
|
||||
Args:
|
||||
value (Bot): 要设置的 `Bot` 实例。
|
||||
"""
|
||||
self._bot = value
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ from models.sender import Sender
|
||||
from .base import OneBotEvent, EventType
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class Anonymous:
|
||||
"""
|
||||
匿名信息
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Optional
|
||||
from .base import OneBotEvent, EventType
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class HeartbeatStatus:
|
||||
"""
|
||||
心跳状态接口
|
||||
|
||||
@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
|
||||
from .base import OneBotEvent, EventType
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class NoticeEvent(OneBotEvent):
|
||||
"""
|
||||
通知事件基类
|
||||
|
||||
@@ -7,7 +7,7 @@ from dataclasses import dataclass
|
||||
from .base import OneBotEvent, EventType
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class RequestEvent(OneBotEvent):
|
||||
"""
|
||||
请求事件基类
|
||||
|
||||
@@ -1,49 +1,57 @@
|
||||
"""
|
||||
消息段模型模块
|
||||
|
||||
定义了 MessageSegment 类,用于封装 OneBot 11 的消息段。
|
||||
该模块定义了 `MessageSegment` 类,用于构建和表示 OneBot v11 协议中的消息段。
|
||||
通过此类,可以方便地创建文本、图片、At 等不同类型的消息内容,并支持链式操作。
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class MessageSegment:
|
||||
"""
|
||||
消息段,对应 OneBot 11 标准中的消息段对象
|
||||
表示一个 OneBot v11 消息段。
|
||||
|
||||
Attributes:
|
||||
type (str): 消息段的类型,例如 'text', 'image', 'at'。
|
||||
data (Dict[str, Any]): 消息段的具体数据,是一个键值对字典。
|
||||
"""
|
||||
|
||||
type: str
|
||||
"""消息段类型,如 text, image, at 等"""
|
||||
|
||||
data: Dict[str, Any]
|
||||
"""消息段数据"""
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
"""
|
||||
获取文本内容(仅当 type 为 text 时有效)
|
||||
当消息段类型为 'text' 时,快速获取其文本内容。
|
||||
|
||||
:return: 文本内容
|
||||
Returns:
|
||||
str: 消息段的文本内容。如果类型不是 'text',则返回空字符串。
|
||||
"""
|
||||
return self.data.get("text", "") if self.type == "text" else ""
|
||||
|
||||
@property
|
||||
def image_url(self) -> str:
|
||||
"""
|
||||
获取图片 URL(仅当 type 为 image 时有效)
|
||||
当消息段类型为 'image' 时,快速获取其图片 URL。
|
||||
|
||||
:return: 图片 URL
|
||||
Returns:
|
||||
str: 图片的 URL。如果类型不是 'image' 或数据中不含 'url',则返回空字符串。
|
||||
"""
|
||||
return self.data.get("url", "") if self.type == "image" else ""
|
||||
|
||||
def is_at(self, user_id: int = None) -> bool:
|
||||
"""
|
||||
判断是否为 @某人
|
||||
检查当前消息段是否是一个 'at' (提及) 消息段。
|
||||
|
||||
:param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型
|
||||
:return: 是否匹配
|
||||
Args:
|
||||
user_id (int, optional): 如果提供,则进一步检查被提及的 QQ 号是否匹配。
|
||||
Defaults to None.
|
||||
|
||||
Returns:
|
||||
bool: 如果消息段是 'at' 类型且 user_id 匹配 (如果提供),则返回 True。
|
||||
"""
|
||||
if self.type != "at":
|
||||
return False
|
||||
@@ -52,6 +60,9 @@ class MessageSegment:
|
||||
return str(self.data.get("qq")) == str(user_id)
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
返回消息段对象的字符串表示形式,便于调试。
|
||||
"""
|
||||
return f"[MS:{self.type}:{self.data}]"
|
||||
|
||||
# --- 快捷构造方法 ---
|
||||
@@ -59,39 +70,52 @@ class MessageSegment:
|
||||
@staticmethod
|
||||
def text(text: str) -> "MessageSegment": # noqa: F811
|
||||
"""
|
||||
构造文本消息段
|
||||
创建一个文本消息段。
|
||||
|
||||
:param text: 文本内容
|
||||
:return: MessageSegment 对象
|
||||
Args:
|
||||
text (str): 文本内容。
|
||||
|
||||
Returns:
|
||||
MessageSegment: 一个类型为 'text' 的消息段对象。
|
||||
"""
|
||||
return MessageSegment(type="text", data={"text": text})
|
||||
|
||||
@staticmethod
|
||||
def at(user_id: int | str) -> "MessageSegment":
|
||||
"""
|
||||
构造 @某人 消息段
|
||||
创建一个 @某人 的消息段。
|
||||
|
||||
:param user_id: 目标 QQ 号,"all" 表示 @全体成员
|
||||
:return: MessageSegment 对象
|
||||
Args:
|
||||
user_id (int | str): 要提及的 QQ 号。若为 "all",则表示 @全体成员。
|
||||
|
||||
Returns:
|
||||
MessageSegment: 一个类型为 'at' 的消息段对象。
|
||||
"""
|
||||
return MessageSegment(type="at", data={"qq": str(user_id)})
|
||||
|
||||
@staticmethod
|
||||
def image(file: str) -> "MessageSegment":
|
||||
"""
|
||||
构造图片消息段
|
||||
创建一个图片消息段。
|
||||
|
||||
:param file: 图片文件名、URL 或 Base64
|
||||
:return: MessageSegment 对象
|
||||
Args:
|
||||
file (str): 图片的路径、URL 或 Base64 编码的字符串。
|
||||
|
||||
Returns:
|
||||
MessageSegment: 一个类型为 'image' 的消息段对象。
|
||||
"""
|
||||
return MessageSegment(type="image", data={"file": file})
|
||||
|
||||
@staticmethod
|
||||
def face(id: int) -> "MessageSegment":
|
||||
"""
|
||||
构造表情消息段
|
||||
创建一个 QQ 表情消息段。
|
||||
|
||||
:param id: 表情 ID
|
||||
:return: MessageSegment 对象
|
||||
Args:
|
||||
id (int): QQ 表情的 ID。
|
||||
|
||||
Returns:
|
||||
MessageSegment: 一个类型为 'face' 的消息段对象。
|
||||
"""
|
||||
return MessageSegment(type="face", data={"id": str(id)})
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class GroupInfo:
|
||||
"""
|
||||
群信息
|
||||
|
||||
@@ -7,7 +7,7 @@ from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class Sender:
|
||||
"""
|
||||
发送者信息类,对应 OneBot 11 标准中的 sender 字段
|
||||
|
||||
42
plugins/forward_test.py
Normal file
42
plugins/forward_test.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""
|
||||
合并转发消息测试插件
|
||||
"""
|
||||
from core.command_manager import matcher
|
||||
from core.bot import Bot
|
||||
from models import MessageEvent
|
||||
from models.message import MessageSegment
|
||||
|
||||
__plugin_meta__ = {
|
||||
"name": "furry",
|
||||
"description": "处理 /furry 指令,发送furry图片,同时也是bot.build_forward_node演示",
|
||||
"usage": "/furry - 发送一条furry图",
|
||||
}
|
||||
|
||||
@matcher.command("furry")
|
||||
async def handle_forward_test(bot: Bot, event: MessageEvent, args: list[str]):
|
||||
"""
|
||||
处理 /furry 指令,发送furry图片,同时也是bot.build_forward_node实例
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 消息事件对象
|
||||
:param args: 指令参数
|
||||
"""
|
||||
# 1. 构建消息节点列表
|
||||
nodes = [
|
||||
bot.build_forward_node(user_id=event.self_id, nickname="机器人", message="你要的furry来了"),
|
||||
bot.build_forward_node(user_id=event.user_id, nickname=event.sender.nickname, message="让我看看"),
|
||||
bot.build_forward_node(
|
||||
user_id=event.self_id,
|
||||
nickname="机器人",
|
||||
message=[
|
||||
MessageSegment.text("你要的福瑞图"),
|
||||
MessageSegment.image("https://api.furry.ist/furry-img/")
|
||||
]
|
||||
)
|
||||
]
|
||||
|
||||
try:
|
||||
# 2. 发送合并转发消息
|
||||
await bot.send_forwarded_messages(event, nodes)
|
||||
except Exception as e:
|
||||
await event.reply(f"发送失败: {e}")
|
||||
@@ -1,3 +1,8 @@
|
||||
"""
|
||||
今日人品插件
|
||||
|
||||
提供 /jrcd 和 /bbcd 指令,用于娱乐。
|
||||
"""
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
@@ -25,7 +30,7 @@ JRCDMSG_2 = [
|
||||
JRCDMSG_3 = [
|
||||
"今天的长度是%scm,哦豁?听说你很勇哦?(✧◡✧)",
|
||||
"今天的长度是%scm,嘶哈嘶哈(((o(*°▽°*)o)))...",
|
||||
"今天的长度是%scm,我靠,让哥哥爽一爽吧!(((o(*°▽°*)o)))...",
|
||||
"今天的长度是%scm,我靠,让哥哥爽一-爽吧!(((o(*°▽°*)o)))...",
|
||||
"今天的长度是%scm,单是看到哥哥的长度就....(〃w〃)",
|
||||
]
|
||||
|
||||
@@ -44,6 +49,12 @@ BBCDMSG7 = ["试试刺刀看看谁能赢吧!"]
|
||||
|
||||
|
||||
def get_jrcd(user_id: int) -> int:
|
||||
"""
|
||||
根据用户ID和当前日期生成一个伪随机的“长度”值。
|
||||
|
||||
:param user_id: 用户QQ号。
|
||||
:return: 返回一个1到30之间的整数。
|
||||
"""
|
||||
current_time = (
|
||||
datetime.now().year * 100 + datetime.now().month * 100 + datetime.now().day
|
||||
)
|
||||
@@ -58,11 +69,11 @@ def get_jrcd(user_id: int) -> int:
|
||||
@matcher.command("jrcd")
|
||||
async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
|
||||
"""
|
||||
处理 jrcd 指令,来看看你的长度吧!
|
||||
处理 jrcd 指令,回复用户的“今日长度”。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 消息事件对象
|
||||
:param args: 指令参数列表
|
||||
:param bot: Bot 实例。
|
||||
:param event: 消息事件对象。
|
||||
:param args: 指令参数列表(未使用)。
|
||||
"""
|
||||
user_id = event.user_id
|
||||
jrcd = get_jrcd(user_id)
|
||||
@@ -79,11 +90,11 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
|
||||
@matcher.command("bbcd")
|
||||
async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]):
|
||||
"""
|
||||
处理 bbcd 指令,和别人比比长度吧!
|
||||
处理 bbcd 指令,比较两位用户的“长度”。
|
||||
|
||||
:param bot: Bot 实例
|
||||
:param event: 消息事件对象
|
||||
:param args: 指令参数列表
|
||||
:param bot: Bot 实例。
|
||||
:param event: 消息事件对象。
|
||||
:param args: 指令参数列表(未使用)。
|
||||
"""
|
||||
message = event.message
|
||||
print(message)
|
||||
|
||||
@@ -18,4 +18,11 @@ __plugin_meta__ = {
|
||||
|
||||
@matcher.command("thpic")
|
||||
async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
|
||||
"""
|
||||
处理 thpic 指令,发送一张随机的东方Project图片。
|
||||
|
||||
:param bot: Bot 实例(未使用)。
|
||||
:param event: 消息事件对象。
|
||||
:param args: 指令参数列表(未使用)。
|
||||
"""
|
||||
await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random"))
|
||||
|
||||
@@ -11,3 +11,5 @@ urllib3==2.6.2
|
||||
websockets==15.0.1
|
||||
yarg==0.1.10
|
||||
watchdog==6.0.0
|
||||
redis==5.0.7
|
||||
loguru
|
||||
|
||||
Reference in New Issue
Block a user