import asyncio import json import uuid import websockets import traceback from .command_manager import matcher from .config_loader import global_config from models import Event from datetime import datetime class WS: def __init__(self): # 读取参数 cfg = global_config.napcat_ws self.url = cfg.get("uri") self.token = cfg.get("token") self.reconnect_interval = cfg.get("reconnect_interval", 5) self.ws = None self._pending_requests = {} async def connect(self): """主连接循环""" headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} while True: try: print(f" 正在尝试连接至 NapCat: {self.url}") async with websockets.connect(self.url, additional_headers=headers) as websocket: self.ws = websocket print(" 连接成功!") await self._listen_loop(websocket) except (websockets.exceptions.ConnectionClosed, ConnectionRefusedError) as e: print(f" 连接断开或服务器拒绝访问: {e}") except Exception as e: print(f" 运行异常: {e}") traceback.print_exc() print(f" {self.reconnect_interval}秒后尝试重连...") await asyncio.sleep(self.reconnect_interval) async def _listen_loop(self, websocket): """核心监听循环""" async for message in websocket: try: data = json.loads(message) # 1. 处理 API 响应 echo_id = data.get("echo") if echo_id and echo_id in self._pending_requests: future = self._pending_requests.pop(echo_id) if not future.done(): future.set_result(data) continue # 2. 处理上报事件 if "post_type" in data: # 使用 create_task 异步执行,避免阻塞 asyncio.create_task(self.on_event(data)) except Exception as e: print(f" 解析消息异常: {e}") async def on_event(self, raw_data: dict): """事件分发层:根据 post_type 调用 matcher 对应的处理器""" try: # 解析为 Event 对象 event = Event.from_dict(raw_data) # 格式化时间用于打印 t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S") # --- 分流处理 --- # A. 消息事件 (Message) if event.post_type == "message": print(f" [{t}] [消息] {event.message_type} | {event.user_id}: {event.raw_message}") await matcher.handle_message(self, event) # B. 通知事件 (Notice) elif event.post_type == "notice": print(f" [{t}] [通知] {event.notice_type} | 来自: {event.group_id or '私聊'}") await matcher.handle_notice(self, event) # C. 请求事件 (Request) elif event.post_type == "request": print(f" [{t}] [请求] {event.request_type} | 内容: {event.comment}") await matcher.handle_request(self, event) # D. 元事件 (Meta Event) - 通常用来心跳检测,可不处理 elif event.post_type == "meta_event": pass except Exception as e: print(f"事件分发失败: {e}") traceback.print_exc() async def call_api(self, action: str, params: dict = None): """调用 OneBot API""" if not self.ws: return {"status": "failed", "msg": "websocket not initialized"} from websockets.protocol import State if getattr(self.ws, "state", None) is not State.OPEN: return {"status": "failed", "msg": "websocket is not open"} echo_id = str(uuid.uuid4()) payload = {"action": action, "params": params or {}, "echo": echo_id} loop = asyncio.get_running_loop() future = loop.create_future() self._pending_requests[echo_id] = future await self.ws.send(json.dumps(payload)) try: return await asyncio.wait_for(future, timeout=30.0) except asyncio.TimeoutError: self._pending_requests.pop(echo_id, None) return {"status": "failed", "retcode": -1, "msg": "api timeout"}