""" WebSocket 核心模块 负责与 OneBot 实现端建立 WebSocket 连接,处理消息接收、事件分发和 API 调用。 """ import asyncio import json import traceback import uuid from datetime import datetime import websockets from models import EventFactory from .bot import Bot from .command_manager import matcher from .config_loader import global_config class WS: """ WebSocket 客户端类,负责与 OneBot 实现端建立连接并处理通信 """ def __init__(self): """ 初始化 WebSocket 客户端 """ # 读取参数 cfg = global_config.napcat_ws self.url = cfg.get("uri") self.token = cfg.get("token") self.reconnect_interval = cfg.get("reconnect_interval", 5) self.ws = None self._pending_requests = {} self.bot = Bot(self) async def connect(self): """ 主连接循环,负责建立连接和自动重连 """ headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} while True: try: print(f" 正在尝试连接至 NapCat: {self.url}") async with websockets.connect( self.url, additional_headers=headers ) as websocket: self.ws = websocket print(" 连接成功!") await self._listen_loop(websocket) except ( websockets.exceptions.ConnectionClosed, ConnectionRefusedError, ) as e: print(f" 连接断开或服务器拒绝访问: {e}") except Exception as e: print(f" 运行异常: {e}") traceback.print_exc() print(f" {self.reconnect_interval}秒后尝试重连...") await asyncio.sleep(self.reconnect_interval) async def _listen_loop(self, websocket): """ 核心监听循环,处理接收到的 WebSocket 消息 :param websocket: WebSocket 连接对象 """ async for message in websocket: try: data = json.loads(message) # 1. 处理 API 响应 # 如果消息中包含 echo 字段,说明是 API 调用的响应 echo_id = data.get("echo") if echo_id and echo_id in self._pending_requests: future = self._pending_requests.pop(echo_id) if not future.done(): future.set_result(data) continue # 2. 处理上报事件 # 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件 if "post_type" in data: # 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环 asyncio.create_task(self.on_event(data)) except Exception as e: print(f" 解析消息异常: {e}") traceback.print_exc() async def on_event(self, raw_data: dict): """ 事件分发层:根据 post_type 调用 matcher 对应的处理器 :param raw_data: 原始事件数据字典 """ try: # 使用工厂创建事件对象 event = EventFactory.create_event(raw_data) event.bot = self.bot # 注入 Bot 实例 # 打印日志 t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S") if event.post_type == "message": sender_name = event.sender.nickname if event.sender else "Unknown" print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}") elif event.post_type == "notice": print(f" [{t}] [通知] {event.notice_type}") elif event.post_type == "request": print(f" [{t}] [请求] {event.request_type}") # 分发事件 await matcher.handle_event(self.bot, event) except Exception as e: print(f" 事件处理异常: {e}") traceback.print_exc() async def call_api(self, action: str, params: dict = None): """ 调用 OneBot API :param action: API 动作名称 :param params: API 参数 :return: API 响应结果 """ if not self.ws: return {"status": "failed", "msg": "websocket not initialized"} from websockets.protocol import State if getattr(self.ws, "state", None) is not State.OPEN: return {"status": "failed", "msg": "websocket is not open"} echo_id = str(uuid.uuid4()) payload = {"action": action, "params": params or {}, "echo": echo_id} loop = asyncio.get_running_loop() future = loop.create_future() self._pending_requests[echo_id] = future await self.ws.send(json.dumps(payload)) try: return await asyncio.wait_for(future, timeout=30.0) except asyncio.TimeoutError: self._pending_requests.pop(echo_id, None) return {"status": "failed", "retcode": -1, "msg": "api timeout"}