Files
NeoBot/core/api/base.py
K2cr2O1 cd5875be24 refactor(websocket): 移除连接池模式并改进资源清理
移除 WebSocket 连接池实现,改为单连接模式以简化代码结构
在 main 函数中添加资源清理逻辑,确保程序退出时正确关闭所有资源
改进 base64 数据处理逻辑,支持递归处理嵌套结构中的敏感数据

呵呵线程池加WS是神人
2026-01-23 18:24:59 +08:00

93 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API 基础模块
定义了 API 调用的基础接口和统一处理逻辑。
"""
import copy
from typing import Any, Dict, Optional, TYPE_CHECKING
from ..utils.logger import logger
if TYPE_CHECKING:
from ..ws import WS
class BaseAPI:
"""
API 基础类,提供了统一的 `call_api` 方法,包含日志记录和异常处理。
"""
_ws: "WS"
self_id: int
def __init__(self, ws_client: "WS", self_id: int):
self._ws = ws_client
self.self_id = self_id
async def call_api(self, action: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
调用 OneBot v11 API并提供统一的日志和异常处理。
:param action: API 动作名称
:param params: API 参数
:return: API 响应结果的数据部分
:raises Exception: 当 API 调用失败或发生网络错误时
"""
if params is None:
params = {}
try:
# 日志记录前,对敏感或过长的参数进行处理
log_params = copy.deepcopy(params)
# 处理各种可能包含base64数据的字段
def truncate_base64_recursive(obj):
"""递归处理可能包含base64数据的对象"""
if isinstance(obj, dict):
for key, value in obj.items():
if isinstance(value, str):
if value.startswith('data:image/') or value.startswith('data:video/') or value.startswith('data:audio/'):
obj[key] = f"{value[:50]}... (base64 truncated)"
elif len(value) > 100 and ('/' in value[:50] and '+' in value[:50] and '=' in value[-10:]):
# 检查是否是base64编码的字符串
obj[key] = f"{value[:50]}... (base64-like truncated)"
elif isinstance(value, (dict, list)):
truncate_base64_recursive(value)
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
truncate_base64_recursive(item)
truncate_base64_recursive(log_params)
# 如果是发送消息的动作,则原子化地增加发送消息总数
if action in ["send_private_msg", "send_group_msg", "send_msg"]:
from ..managers.redis_manager import redis_manager
try:
lua_script = "return redis.call('INCR', KEYS[1])"
await redis_manager.execute_lua_script(
script=lua_script,
keys=["neobot:stats:messages_sent"],
args=[]
)
except Exception as e:
logger.error(f"发送消息计数失败: {e}")
logger.debug(f"调用API -> action: {action}, params: {log_params}")
response = await self._ws.call_api(action, params)
# 对响应也做类似的处理
log_response = copy.deepcopy(response)
truncate_base64_recursive(log_response)
logger.debug(f"API响应 <- {log_response}")
if response.get("status") == "failed":
logger.warning(f"API调用失败: {response}")
return response.get("data")
except Exception as e:
logger.error(f"API调用异常: action={action}, params={params}, error={e}")
raise