Files
NeoBot/core/ws.py
K2cr2O1 a97c1f2b3f feat(core): 添加 WebSocket 核心通信模块实现
实现 WebSocket 客户端类 `WS`,负责与 OneBot v11 建立连接、处理消息和自动重连
2026-01-23 18:40:52 +08:00

300 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
WebSocket 核心通信模块
该模块定义了 `WS` 类,负责与 OneBot v11 实现(如 NapCat建立和管理
WebSocket 连接。它是整个机器人框架的底层通信基础。
主要职责包括:
- 建立 WebSocket 连接并处理认证。
- 实现断线自动重连机制。
- 监听并接收来自 OneBot 的事件和 API 响应。
- 分发事件给 `CommandManager` 进行处理。
- 提供 `call_api` 方法,用于异步发送 API 请求并等待响应。
"""
import asyncio
import orjson
from typing import TYPE_CHECKING, Any, Dict, Optional, cast
import uuid
if TYPE_CHECKING:
from .bot import Bot
import websockets
from websockets.legacy.client import WebSocketClientProtocol
from models.events.factory import EventFactory
from .config_loader import global_config
from .managers.command_manager import matcher
from .utils.executor import CodeExecutor
from .utils.logger import ModuleLogger
from .utils.exceptions import (
WebSocketError, WebSocketConnectionError
)
from .utils.error_codes import ErrorCode, create_error_response
class WS:
"""
WebSocket 客户端,负责与 OneBot v11 实现进行底层通信。
"""
def __init__(self, code_executor: Optional[CodeExecutor] = None) -> None:
"""
初始化 WebSocket 客户端。
从全局配置中读取 WebSocket URI、访问令牌Token和重连间隔。
:param code_executor: 代码执行器实例
"""
# 读取参数
cfg = global_config.napcat_ws
self.url = cfg.uri
self.token = cfg.token
self.reconnect_interval = cfg.reconnect_interval
# 初始化状态
self.ws: Optional[WebSocketClientProtocol] = None
self._pending_requests: Dict[str, asyncio.Future] = {} # echo: future
self.bot: 'Bot' | None = None
self.self_id: int | None = None
self.code_executor = code_executor
# 创建模块专用日志记录器
self.logger = ModuleLogger("WebSocket")
async def connect(self) -> None:
"""
启动并管理 WebSocket 连接。
这是一个无限循环,负责建立连接。如果连接断开,它会根据配置的
`reconnect_interval` 时间间隔后自动尝试重新连接。
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
while True:
try:
self.logger.info(f"正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket_raw:
websocket = cast(WebSocketClientProtocol, websocket_raw)
self.ws = websocket
self.logger.success("连接成功!")
await self._listen_loop(websocket)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
conn_error = WebSocketConnectionError(
message=f"WebSocket连接失败: {str(e)}",
code=ErrorCode.WS_CONNECTION_FAILED,
original_error=e
)
self.logger.error(f"连接失败: {conn_error.message}")
self.logger.log_custom_exception(conn_error)
except Exception as e:
error = WebSocketError(
message=f"WebSocket运行异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"运行异常: {error.message}")
self.logger.log_custom_exception(error)
self.logger.info(f"{self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket_connection: WebSocketClientProtocol) -> None:
"""
核心监听循环,处理所有接收到的 WebSocket 消息。
此循环会持续从 WebSocket 连接中读取消息,并根据消息内容
判断是 API 响应还是上报的事件,然后分发给相应的处理逻辑。
Args:
websocket_connection: 当前活动的 WebSocket 连接对象。
"""
async for message in websocket_connection:
try:
data = orjson.loads(message)
# 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id)
if not future.done():
future.set_result(data)
continue
# 2. 处理上报事件
# 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件
if "post_type" in data:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except orjson.JSONDecodeError as e:
error = WebSocketError(
message=f"JSON解析失败: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.error(f"解析消息异常: {error.message}")
# 如果message是bytes类型需要先解码
decoded_message = message.decode('utf-8') if isinstance(message, bytes) else message
self.logger.debug(f"原始消息: {decoded_message}")
except Exception as e:
error = WebSocketError(
message=f"处理消息异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.exception(f"解析消息异常: {error.message}")
self.logger.log_custom_exception(error)
async def on_event(self, event_data: Dict[str, Any]) -> None:
"""
事件处理和分发层。
当接收到一个 OneBot 事件时,此方法负责:
1. 使用 `EventFactory` 将原始 JSON 数据解析成对应的事件对象。
2. 为事件对象注入 `Bot` 实例,以便在插件中可以调用 API。
3. 打印格式化的事件日志。
4. 将事件对象传递给 `CommandManager` (`matcher`) 进行后续处理。
Args:
event_data (dict): 从 WebSocket 接收到的原始事件字典。
"""
try:
# 使用工厂创建事件对象
event = EventFactory.create_event(event_data)
# 尝试初始化 Bot 实例 (如果尚未初始化且事件包含 self_id)
# 只要事件中包含 self_id我们就可以初始化 Bot不必非要等待 meta_event
if self.bot is None and hasattr(event, 'self_id'):
from .bot import Bot
self.self_id = event.self_id
self.bot = Bot(self)
self.logger.success(f"Bot 实例初始化完成: self_id={self.self_id}")
# 将代码执行器注入到 Bot 和执行器自身
if self.code_executor:
self.bot.code_executor = self.code_executor
self.code_executor.bot = self.bot
self.logger.info("代码执行器已成功注入 Bot 实例。")
# 如果 bot 尚未初始化,则不处理后续事件
if self.bot is None:
self.logger.warning("Bot 尚未初始化,跳过事件处理。")
return
event.bot = self.bot # 注入 Bot 实例
# 打印日志
if event.post_type == "message":
sender_name = event.sender.nickname if hasattr(event, "sender") and event.sender else "Unknown"
message_type = getattr(event, "message_type", "Unknown")
user_id = getattr(event, "user_id", "Unknown")
raw_message = getattr(event, "raw_message", "")
self.logger.info(f"[消息] {message_type} | {user_id}({sender_name}): {raw_message}")
elif event.post_type == "notice":
notice_type = getattr(event, "notice_type", "Unknown")
self.logger.info(f"[通知] {notice_type}")
elif event.post_type == "request":
request_type = getattr(event, "request_type", "Unknown")
self.logger.info(f"[请求] {request_type}")
elif event.post_type == "meta_event":
meta_event_type = getattr(event, "meta_event_type", "Unknown")
self.logger.debug(f"[元事件] {meta_event_type}")
# 分发事件
await matcher.handle_event(self.bot, event)
except Exception as e:
self.logger.exception(f"事件处理异常: {str(e)}")
error = WebSocketError(
message=f"事件处理异常: {str(e)}",
code=ErrorCode.WS_MESSAGE_ERROR,
original_error=e
)
self.logger.log_custom_exception(error)
async def close(self) -> None:
"""
关闭 WebSocket 客户端,释放资源。
"""
self.logger.info("正在关闭 WebSocket 客户端...")
if self.ws:
await self.ws.close()
# 取消所有挂起的请求
for future in self._pending_requests.values():
if not future.done():
future.cancel()
self._pending_requests.clear()
self.logger.success("WebSocket 客户端已关闭")
async def call_api(self, action: str, params: Optional[Dict[Any, Any]] = None) -> Dict[Any, Any]:
"""
向 OneBot v11 实现端发送一个 API 请求。
该方法通过 WebSocket 发送请求,并使用 `echo` 字段来匹配对应的响应。
它创建了一个 `Future` 对象来异步等待响应,并设置了超时机制。
Args:
action (str): API 的动作名称,例如 "send_group_msg"
params (dict, optional): API 请求的参数字典。 Defaults to None.
Returns:
dict: OneBot API 的响应数据。如果超时或连接断开,则返回一个
表示失败的字典。
"""
if not self.ws:
self.logger.error("调用 API 失败: WebSocket 未初始化")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket未初始化",
data={"action": action, "params": params}
)
from websockets.protocol import State
if getattr(self.ws, "state", None) is not State.OPEN:
self.logger.error("调用 API 失败: WebSocket 连接未打开")
return create_error_response(
code=ErrorCode.WS_DISCONNECTED,
message="WebSocket连接未打开",
data={"action": action, "params": params}
)
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
try:
await self.ws.send(orjson.dumps(payload))
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response(
code=ErrorCode.TIMEOUT_ERROR,
message="API调用超时",
data={"action": action, "params": params}
)
except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR,
message=f"API调用异常: {str(e)}",
data={"action": action, "params": params}
)