抽象send方法,添加注释

This commit is contained in:
2026-01-01 17:58:17 +08:00
parent 9146ffbb1a
commit 046dd0860f
9 changed files with 366 additions and 38 deletions

72
core/bot.py Normal file
View File

@@ -0,0 +1,72 @@
"""
Bot 抽象模块
定义了 Bot 类,封装了 OneBot API 的调用逻辑,提供了便捷的消息发送方法。
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .ws import WS
from ..models.event import Event
class Bot:
"""
Bot 抽象类,封装 API 调用和常用操作
"""
def __init__(self, ws_client: "WS"):
"""
初始化 Bot 实例
:param ws_client: WebSocket 客户端实例,用于底层通信
"""
self.ws = ws_client
async def call_api(self, action: str, params: dict = None) -> dict:
"""
调用 OneBot API
:param action: API 动作名称
:param params: API 参数
:return: API 响应结果
"""
return await self.ws.call_api(action, params)
async def send_group_msg(self, group_id: int, message: str) -> dict:
"""
发送群消息
:param group_id: 群号
:param message: 消息内容
:return: API 响应结果
"""
return await self.call_api(
"send_group_msg", {"group_id": group_id, "message": message}
)
async def send_private_msg(self, user_id: int, message: str) -> dict:
"""
发送私聊消息
:param user_id: 用户 QQ 号
:param message: 消息内容
:return: API 响应结果
"""
return await self.call_api(
"send_private_msg", {"user_id": user_id, "message": message}
)
async def send(self, event: "Event", message: str) -> dict:
"""
智能发送消息,根据事件类型自动选择发送方式
:param event: 触发事件对象
:param message: 消息内容
:return: API 响应结果
"""
if event.message_type == "group" and event.group_id:
return await self.send_group_msg(event.group_id, message)
elif event.user_id:
return await self.send_private_msg(event.user_id, message)
return {"status": "failed", "msg": "Unknown message target"}

View File

@@ -1,3 +1,8 @@
"""
命令管理器模块
提供装饰器用于注册消息指令、通知处理器和请求处理器,并负责事件的分发。
"""
import inspect
from typing import Any, Callable, Dict, List, Tuple
@@ -8,7 +13,16 @@ comm_prefixes = global_config.bot.get("command", ("/",))
class CommandManager:
"""
命令管理器,负责注册和分发指令、通知和请求事件
"""
def __init__(self, prefixes: Tuple[str, ...] = ("/",)):
"""
初始化命令管理器
:param prefixes: 命令前缀元组
"""
self.prefixes = prefixes
self.commands: Dict[str, Callable] = {} # 存储消息指令
self.notice_handlers: List[Dict] = [] # 存储通知处理器
@@ -16,7 +30,12 @@ class CommandManager:
# --- 1. 消息指令装饰器 ---
def command(self, name: str):
"""装饰器:注册消息指令,例如 @matcher.command("echo")"""
"""
装饰器:注册消息指令
:param name: 指令名称(不含前缀)
:return: 装饰器函数
"""
def decorator(func):
self.commands[name] = func
@@ -26,7 +45,12 @@ class CommandManager:
# --- 2. 通知事件装饰器 ---
def on_notice(self, notice_type: str = None):
"""装饰器:注册通知处理器"""
"""
装饰器:注册通知处理器
:param notice_type: 通知类型,如果为 None 则处理所有通知
:return: 装饰器函数
"""
def decorator(func):
self.notice_handlers.append({"type": notice_type, "func": func})
@@ -36,7 +60,12 @@ class CommandManager:
# --- 3. 请求事件装饰器 ---
def on_request(self, request_type: str = None):
"""装饰器:注册请求处理器"""
"""
装饰器:注册请求处理器
:param request_type: 请求类型,如果为 None 则处理所有请求
:return: 装饰器函数
"""
def decorator(func):
self.request_handlers.append({"type": request_type, "func": func})
@@ -46,7 +75,12 @@ class CommandManager:
# --- 消息分发逻辑 ---
async def handle_message(self, bot, event):
"""解析并分发消息指令"""
"""
解析并分发消息指令
:param bot: Bot 实例
:param event: 消息事件对象
"""
if not event.raw_message:
return
@@ -77,21 +111,38 @@ class CommandManager:
# --- 通知分发逻辑 ---
async def handle_notice(self, bot, event):
"""分发通知事件"""
"""
分发通知事件
:param bot: Bot 实例
:param event: 通知事件对象
"""
for handler in self.notice_handlers:
if handler["type"] is None or handler["type"] == event.notice_type:
await self._run_handler(handler["func"], bot, event)
# --- 请求分发逻辑 ---
async def handle_request(self, bot, event):
"""分发请求事件"""
"""
分发请求事件
:param bot: Bot 实例
:param event: 请求事件对象
"""
for handler in self.request_handlers:
if handler["type"] is None or handler["type"] == event.request_type:
await self._run_handler(handler["func"], bot, event)
# --- 通用执行器:自动注入参数 ---
async def _run_handler(self, func, bot, event, args=None):
"""根据函数签名自动注入 bot, event 或 args"""
"""
根据函数签名自动注入 bot, event 或 args
:param func: 目标处理函数
:param bot: Bot 实例
:param event: 事件对象
:param args: 指令参数(仅消息指令有效)
"""
sig = inspect.signature(func)
params = sig.parameters
kwargs = {}

View File

@@ -1,3 +1,8 @@
"""
配置加载模块
负责读取和解析 config.toml 配置文件,提供全局配置对象。
"""
from pathlib import Path
from typing import Any, Dict
@@ -5,12 +10,26 @@ import tomllib
class Config:
"""
配置加载类,负责读取和解析 config.toml 文件
"""
def __init__(self, file_path: str = "config.toml"):
"""
初始化配置加载器
:param file_path: 配置文件路径,默认为 "config.toml"
"""
self.path = Path(file_path)
self._data: Dict[str, Any] = {}
self.load()
def load(self):
"""
加载配置文件
:raises FileNotFoundError: 如果配置文件不存在
"""
if not self.path.exists():
raise FileNotFoundError(f"配置文件 {self.path} 未找到!")
@@ -20,14 +39,29 @@ class Config:
# 通过属性访问配置
@property
def napcat_ws(self) -> dict:
"""
获取 NapCat WebSocket 配置
:return: 配置字典
"""
return self._data.get("napcat_ws", {})
@property
def bot(self) -> dict:
"""
获取 Bot 基础配置
:return: 配置字典
"""
return self._data.get("bot", {})
@property
def features(self) -> dict:
"""
获取功能特性配置
:return: 配置字典
"""
return self._data.get("features", {})

View File

@@ -1,3 +1,8 @@
"""
WebSocket 核心模块
负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。
"""
import asyncio
import json
import traceback
@@ -8,12 +13,20 @@ import websockets
from models import Event
from .bot import Bot
from .command_manager import matcher
from .config_loader import global_config
class WS:
"""
WebSocket 客户端类,负责与 OneBot 实现端建立连接并处理通信
"""
def __init__(self):
"""
初始化 WebSocket 客户端
"""
# 读取参数
cfg = global_config.napcat_ws
self.url = cfg.get("uri")
@@ -22,9 +35,12 @@ class WS:
self.ws = None
self._pending_requests = {}
self.bot = Bot(self)
async def connect(self):
"""主连接循环"""
"""
主连接循环,负责建立连接和自动重连
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
while True:
@@ -50,7 +66,11 @@ class WS:
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket):
"""核心监听循环"""
"""
核心监听循环,处理接收到的 WebSocket 消息
:param websocket: WebSocket 连接对象
"""
async for message in websocket:
try:
data = json.loads(message)
@@ -72,10 +92,15 @@ class WS:
print(f" 解析消息异常: {e}")
async def on_event(self, raw_data: dict):
"""事件分发层:根据 post_type 调用 matcher 对应的处理器"""
"""
事件分发层:根据 post_type 调用 matcher 对应的处理器
:param raw_data: 原始事件数据字典
"""
try:
# 解析为 Event 对象
event = Event.from_dict(raw_data)
event.bot = self.bot
# 格式化时间用于打印
t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S")
@@ -87,19 +112,19 @@ class WS:
print(
f" [{t}] [消息] {event.message_type} | {event.user_id}: {event.raw_message}"
)
await matcher.handle_message(self, event)
await matcher.handle_message(self.bot, event)
# B. 通知事件 (Notice)
elif event.post_type == "notice":
print(
f" [{t}] [通知] {event.notice_type} | 来自: {event.group_id or '私聊'}"
)
await matcher.handle_notice(self, event)
await matcher.handle_notice(self.bot, event)
# C. 请求事件 (Request)
elif event.post_type == "request":
print(f" [{t}] [请求] {event.request_type} | 内容: {event.comment}")
await matcher.handle_request(self, event)
await matcher.handle_request(self.bot, event)
# D. 元事件 (Meta Event) - 通常用来心跳检测,可不处理
elif event.post_type == "meta_event":
@@ -110,7 +135,13 @@ class WS:
traceback.print_exc()
async def call_api(self, action: str, params: dict = None):
"""调用 OneBot API"""
"""
调用 OneBot API
:param action: API 动作名称
:param params: API 参数
:return: API 响应结果
"""
if not self.ws:
return {"status": "failed", "msg": "websocket not initialized"}