""" API 基础模块 定义了 API 调用的基础接口和统一处理逻辑。 """ import copy from typing import Any, Dict, Optional, TYPE_CHECKING from ..utils.logger import logger if TYPE_CHECKING: from ..ws import WS class BaseAPI: """ API 基础类,提供了统一的 `call_api` 方法,包含日志记录和异常处理。 """ _ws: "WS" self_id: int def __init__(self, ws_client: "WS", self_id: int): self._ws = ws_client self.self_id = self_id async def call_api(self, action: str, params: Optional[Dict[str, Any]] = None) -> Any: """ 调用 OneBot v11 API,并提供统一的日志和异常处理。 :param action: API 动作名称 :param params: API 参数 :return: API 响应结果的数据部分 :raises Exception: 当 API 调用失败或发生网络错误时 """ if params is None: params = {} try: # 日志记录前,对敏感或过长的参数进行处理 log_params = copy.deepcopy(params) if 'message' in log_params: if isinstance(log_params['message'], list): for segment in log_params['message']: if segment.get('type') == 'image' and 'file' in segment.get('data', {}): file_data = segment['data']['file'] if file_data.startswith('data:image/'): segment['data']['file'] = f"{file_data[:50]}... (base64 truncated)" elif isinstance(log_params['message'], str) and log_params['message'].startswith('data:image/'): log_params['message'] = f"{log_params['message'][:50]}... (base64 truncated)" # 如果是发送消息的动作,则原子化地增加发送消息总数 if action in ["send_private_msg", "send_group_msg", "send_msg"]: from ..managers.redis_manager import redis_manager try: lua_script = "return redis.call('INCR', KEYS[1])" await redis_manager.execute_lua_script( script=lua_script, keys=["neobot:stats:messages_sent"], args=[] ) except Exception as e: logger.error(f"发送消息计数失败: {e}") logger.debug(f"调用API -> action: {action}, params: {log_params}") response = await self._ws.call_api(action, params) logger.debug(f"API响应 <- {response}") if response.get("status") == "failed": logger.warning(f"API调用失败: {response}") return response.get("data") except Exception as e: logger.error(f"API调用异常: action={action}, params={params}, error={e}") raise