This commit is contained in:
baby20162016
2026-01-20 09:52:32 +08:00
115 changed files with 13739 additions and 1524 deletions

View File

@@ -1,77 +1,125 @@
"""
WebSocket 核心模块
WebSocket 核心通信模块
负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。
该模块定义了 `WS` 类,负责与 OneBot v11 实现(如 NapCat建立和管理
WebSocket 连接。它是整个机器人框架的底层通信基础。
主要职责包括:
- 建立 WebSocket 连接并处理认证。
- 实现断线自动重连机制。
- 监听并接收来自 OneBot 的事件和 API 响应。
- 分发事件给 `CommandManager` 进行处理。
- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
"""
import asyncio
import json
import traceback
from typing import Any, Dict, Optional, cast
import uuid
from datetime import datetime
import websockets
from websockets.legacy.client import WebSocketClientProtocol
from models import EventFactory
from models.events.factory import EventFactory
from .bot import Bot
from .command_manager import matcher
from .config_loader import global_config
from .managers.command_manager import matcher
from .utils.executor import CodeExecutor
from .utils.logger import logger, ModuleLogger
from .utils.exceptions import (
WebSocketError, WebSocketConnectionError, WebSocketAuthenticationError
)
from .utils.error_codes import ErrorCode, create_error_response
class WS:
"""
WebSocket 客户端,负责与 OneBot 实现端建立连接并处理通信
WebSocket 客户端,负责与 OneBot v11 实现进行底层通信
"""
def __init__(self):
def __init__(self, code_executor: Optional[CodeExecutor] = None) -> None:
"""
初始化 WebSocket 客户端
初始化 WebSocket 客户端
从全局配置中读取 WebSocket URI、访问令牌Token和重连间隔。
"""
# 读取参数
cfg = global_config.napcat_ws
self.url = cfg.get("uri")
self.token = cfg.get("token")
self.reconnect_interval = cfg.get("reconnect_interval", 5)
self.url = cfg.uri
self.token = cfg.token
self.reconnect_interval = cfg.reconnect_interval
self.ws = None
self._pending_requests = {}
self.bot = Bot(self)
# 初始化状态
self.ws: Optional[WebSocketClientProtocol] = None
self._pending_requests: Dict[str, asyncio.Future] = {} # echo: future
self.bot: Bot | None = None
self.self_id: int | None = None
self.code_executor = code_executor
# 创建模块专用日志记录器
self.logger = ModuleLogger("WebSocket")
async def connect(self):
async def connect(self) -> None:
"""
主连接循环,负责建立连接和自动重连
启动并管理 WebSocket 连接。
这是一个无限循环,负责建立连接。如果连接断开,它会根据配置的
`reconnect_interval` 时间间隔后自动尝试重新连接。
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
while True:
try:
print(f" 正在尝试连接至 NapCat: {self.url}")
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket:
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
print(" 连接成功!")
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except websockets.exceptions.AuthenticationError as e:
error = WebSocketAuthenticationError(
message=f"WebSocket认证失败: {str(e)}",
code=ErrorCode.WS_AUTH_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {error.message}")
self.logger.log_custom_exception(error)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
print(f" 连接断开或服务器拒绝访问: {e}")
error = WebSocketConnectionError(
message=f"连接断开或服务器拒绝访问: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.warning(f"连接失败: {error.message}")
except Exception as e:
print(f" 运行异常: {e}")
traceback.print_exc()
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
print(f" {self.reconnect_interval}秒后尝试重连...")
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket):
async def _listen_loop(self, websocket_connection: WebSocketClientProtocol) -> None:
"""
核心监听循环,处理接收到的 WebSocket 消息
核心监听循环,处理所有接收到的 WebSocket 消息
:param websocket: WebSocket 连接对象
此循环会持续从 WebSocket 连接中读取消息,并根据消息内容
判断是 API 响应还是上报的事件,然后分发给相应的处理逻辑。
Args:
websocket_connection: 当前活动的 WebSocket 连接对象。
"""
async for message in websocket:
async for message in websocket_connection:
try:
data = json.loads(message)
@@ -90,53 +138,121 @@ class WS:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except json.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
self.logger.debug(f"原始消息: {message}")
except Exception as e:
print(f" 解析消息异常: {e}")
traceback.print_exc()
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"解析消息异常: {error.message}")
self.logger.log_custom_exception(error)
async def on_event(self, raw_data: dict):
async def on_event(self, event_data: Dict[str, Any]) -> None:
"""
事件分发层:根据 post_type 调用 matcher 对应的处理器
事件处理和分发层。
:param raw_data: 原始事件数据字典
当接收到一个 OneBot 事件时,此方法负责:
1. 使用 `EventFactory` 将原始 JSON 数据解析成对应的事件对象。
2. 为事件对象注入 `Bot` 实例,以便在插件中可以调用 API。
3. 打印格式化的事件日志。
4. 将事件对象传递给 `CommandManager` (`matcher`) 进行后续处理。
Args:
event_data (dict): 从 WebSocket 接收到的原始事件字典。
"""
try:
# 使用工厂创建事件对象
event = EventFactory.create_event(raw_data)
event = EventFactory.create_event(event_data)
# 尝试初始化 Bot 实例 (如果尚未初始化且事件包含 self_id)
# 只要事件中包含 self_id我们就可以初始化 Bot不必非要等待 meta_event
if self.bot is None and hasattr(event, 'self_id'):
self.self_id = event.self_id
self.bot = Bot(self)
self.logger.success(f"Bot 实例初始化完成: self_id={self.self_id}")
# 将代码执行器注入到 Bot 和执行器自身
if self.code_executor:
self.bot.code_executor = self.code_executor
self.code_executor.bot = self.bot
self.logger.info("代码执行器已成功注入 Bot 实例。")
# 如果 bot 尚未初始化,则不处理后续事件
if self.bot is None:
self.logger.warning("Bot 尚未初始化,跳过事件处理。")
return
event.bot = self.bot # 注入 Bot 实例
# 打印日志
t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S")
if event.post_type == "message":
sender_name = event.sender.nickname if event.sender else "Unknown"
print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
sender_name = event.sender.nickname if hasattr(event, "sender") and event.sender else "Unknown"
message_type = getattr(event, "message_type", "Unknown")
user_id = getattr(event, "user_id", "Unknown")
raw_message = getattr(event, "raw_message", "")
self.logger.info(f"[消息] {message_type} | {user_id}({sender_name}): {raw_message}")
elif event.post_type == "notice":
print(f" [{t}] [通知] {event.notice_type}")
notice_type = getattr(event, "notice_type", "Unknown")
self.logger.info(f"[通知] {notice_type}")
elif event.post_type == "request":
print(f" [{t}] [请求] {event.request_type}")
request_type = getattr(event, "request_type", "Unknown")
self.logger.info(f"[请求] {request_type}")
elif event.post_type == "meta_event":
meta_event_type = getattr(event, "meta_event_type", "Unknown")
self.logger.debug(f"[元事件] {meta_event_type}")
# 分发事件
await matcher.handle_event(self.bot, event)
except Exception as e:
print(f" 事件处理异常: {e}")
traceback.print_exc()
self.logger.exception(f"事件处理异常: {str(e)}")
error = WebSocketError(
message=f"事件处理异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.log_custom_exception(error)
async def call_api(self, action: str, params: dict = None):
async def call_api(self, action: str, params: Optional[Dict[Any, Any]] = None) -> Dict[Any, Any]:
"""
调用 OneBot API
OneBot v11 实现端发送一个 API 请求。
:param action: API 动作名称
:param params: API 参数
:return: API 响应结果
该方法通过 WebSocket 发送请求,并使用 `echo` 字段来匹配对应的响应。
它创建了一个 `Future` 对象来异步等待响应,并设置了超时机制。
Args:
action (str): API 的动作名称,例如 "send_group_msg"
params (dict, optional): API 请求的参数字典。 Defaults to None.
Returns:
dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
表示失败的字典。
"""
if not self.ws:
return {"status": "failed", "msg": "websocket not initialized"}
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
from websockets.protocol import State
if getattr(self.ws, "state", None) is not State.OPEN:
return {"status": "failed", "msg": "websocket is not open"}
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
@@ -145,10 +261,23 @@ class WS:
future = loop.create_future()
self._pending_requests[echo_id] = future
await self.ws.send(json.dumps(payload))
try:
await self.ws.send(json.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
return {"status": "failed", "retcode": -1, "msg": "api timeout"}
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)