Files
NeoBot/core/ws.py
2026-01-01 18:43:14 +08:00

155 lines
5.1 KiB
Python

"""
WebSocket 核心模块
负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。
"""
import asyncio
import json
import traceback
import uuid
from datetime import datetime
import websockets
from models import EventFactory
from .bot import Bot
from .command_manager import matcher
from .config_loader import global_config
class WS:
"""
WebSocket 客户端类,负责与 OneBot 实现端建立连接并处理通信
"""
def __init__(self):
"""
初始化 WebSocket 客户端
"""
# 读取参数
cfg = global_config.napcat_ws
self.url = cfg.get("uri")
self.token = cfg.get("token")
self.reconnect_interval = cfg.get("reconnect_interval", 5)
self.ws = None
self._pending_requests = {}
self.bot = Bot(self)
async def connect(self):
"""
主连接循环,负责建立连接和自动重连
"""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
while True:
try:
print(f" 正在尝试连接至 NapCat: {self.url}")
async with websockets.connect(
self.url, additional_headers=headers
) as websocket:
self.ws = websocket
print(" 连接成功!")
await self._listen_loop(websocket)
except (
websockets.exceptions.ConnectionClosed,
ConnectionRefusedError,
) as e:
print(f" 连接断开或服务器拒绝访问: {e}")
except Exception as e:
print(f" 运行异常: {e}")
traceback.print_exc()
print(f" {self.reconnect_interval}秒后尝试重连...")
await asyncio.sleep(self.reconnect_interval)
async def _listen_loop(self, websocket):
"""
核心监听循环,处理接收到的 WebSocket 消息
:param websocket: WebSocket 连接对象
"""
async for message in websocket:
try:
data = json.loads(message)
# 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id)
if not future.done():
future.set_result(data)
continue
# 2. 处理上报事件
# 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件
if "post_type" in data:
# 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data))
except Exception as e:
print(f" 解析消息异常: {e}")
traceback.print_exc()
async def on_event(self, raw_data: dict):
"""
事件分发层:根据 post_type 调用 matcher 对应的处理器
:param raw_data: 原始事件数据字典
"""
try:
# 使用工厂创建事件对象
event = EventFactory.create_event(raw_data)
event.bot = self.bot # 注入 Bot 实例
# 打印日志
t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S")
if event.post_type == "message":
sender_name = event.sender.nickname if event.sender else "Unknown"
print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
elif event.post_type == "notice":
print(f" [{t}] [通知] {event.notice_type}")
elif event.post_type == "request":
print(f" [{t}] [请求] {event.request_type}")
# 分发事件
await matcher.handle_event(self.bot, event)
except Exception as e:
print(f" 事件处理异常: {e}")
traceback.print_exc()
async def call_api(self, action: str, params: dict = None):
"""
调用 OneBot API
:param action: API 动作名称
:param params: API 参数
:return: API 响应结果
"""
if not self.ws:
return {"status": "failed", "msg": "websocket not initialized"}
from websockets.protocol import State
if getattr(self.ws, "state", None) is not State.OPEN:
return {"status": "failed", "msg": "websocket is not open"}
echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id}
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
await self.ws.send(json.dumps(payload))
try:
return await asyncio.wait_for(future, timeout=30.0)
except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
return {"status": "failed", "retcode": -1, "msg": "api timeout"}