feat(ws_pool): 新增 WebSocket 连接池实现 perf(json): 使用 orjson 替代标准 json 库提升性能 style: 清理未使用的导入和冗余代码 docs: 更新架构文档和开发规范 test: 添加 WebSocket 连接池测试用例 fix(plugins): 修复自动审批插件 API 调用参数格式
231 lines
7.6 KiB
Python
231 lines
7.6 KiB
Python
"""
|
|
WebSocket 连接池模块
|
|
|
|
该模块实现了 WebSocket 连接池功能,用于管理多个 WebSocket 连接,
|
|
提高并发处理能力和连接复用效率。
|
|
"""
|
|
import asyncio
|
|
import websockets
|
|
from websockets.legacy.client import WebSocketClientProtocol
|
|
from typing import Optional, Dict, Any, cast
|
|
import uuid
|
|
from loguru import logger
|
|
|
|
from .config_loader import global_config
|
|
from .utils.exceptions import WebSocketError, WebSocketConnectionError
|
|
|
|
|
|
class WSConnection:
|
|
"""
|
|
WebSocket 连接包装类
|
|
|
|
封装单个 WebSocket 连接的状态和操作
|
|
"""
|
|
def __init__(self, conn: WebSocketClientProtocol, conn_id: str):
|
|
self.conn = conn
|
|
self.conn_id = conn_id
|
|
self.last_used = asyncio.get_event_loop().time()
|
|
self.is_active = True
|
|
self._pending_requests: Dict[str, asyncio.Future] = {}
|
|
|
|
async def send(self, data: dict):
|
|
"""
|
|
发送数据到 WebSocket 连接
|
|
"""
|
|
if not self.is_active:
|
|
raise WebSocketError(f"连接 {self.conn_id} 已关闭")
|
|
|
|
try:
|
|
await self.conn.send(data)
|
|
self.last_used = asyncio.get_event_loop().time()
|
|
except Exception as e:
|
|
self.is_active = False
|
|
raise WebSocketError(f"发送数据失败: {e}")
|
|
|
|
async def recv(self):
|
|
"""
|
|
从 WebSocket 连接接收数据
|
|
"""
|
|
if not self.is_active:
|
|
raise WebSocketError(f"连接 {self.conn_id} 已关闭")
|
|
|
|
try:
|
|
data = await self.conn.recv()
|
|
self.last_used = asyncio.get_event_loop().time()
|
|
return data
|
|
except Exception as e:
|
|
self.is_active = False
|
|
raise WebSocketError(f"接收数据失败: {e}")
|
|
|
|
async def close(self):
|
|
"""
|
|
关闭 WebSocket 连接
|
|
"""
|
|
if self.is_active:
|
|
self.is_active = False
|
|
await self.conn.close()
|
|
|
|
|
|
class WSConnectionPool:
|
|
"""
|
|
WebSocket 连接池
|
|
|
|
管理多个 WebSocket 连接,提供连接的获取、释放和回收功能
|
|
"""
|
|
def __init__(self, pool_size: int = 3, max_idle_time: int = 300):
|
|
"""
|
|
初始化连接池
|
|
|
|
:param pool_size: 连接池大小
|
|
:param max_idle_time: 连接最大空闲时间(秒)
|
|
"""
|
|
self.pool_size = pool_size
|
|
self.max_idle_time = max_idle_time
|
|
self.pool: asyncio.Queue[WSConnection] = asyncio.Queue(maxsize=pool_size)
|
|
self._closed = False
|
|
self._cleanup_task: Optional[asyncio.Task] = None
|
|
|
|
# 从全局配置读取参数
|
|
self.url = global_config.napcat_ws.uri
|
|
self.token = global_config.napcat_ws.token
|
|
self.reconnect_interval = global_config.napcat_ws.reconnect_interval
|
|
|
|
logger.info(f"WebSocket 连接池初始化完成,大小: {pool_size}")
|
|
|
|
async def initialize(self):
|
|
"""
|
|
初始化连接池,创建初始连接
|
|
"""
|
|
if self._closed:
|
|
raise WebSocketError("连接池已关闭")
|
|
|
|
# 启动连接清理任务
|
|
self._cleanup_task = asyncio.create_task(self._cleanup_idle_connections())
|
|
|
|
# 创建初始连接
|
|
for _ in range(self.pool_size):
|
|
try:
|
|
conn = await self._create_connection()
|
|
await self.pool.put(conn)
|
|
logger.info(f"WebSocket 连接 {conn.conn_id} 已创建并加入连接池")
|
|
except Exception as e:
|
|
logger.error(f"创建初始连接失败: {e}")
|
|
|
|
async def _create_connection(self) -> WSConnection:
|
|
"""
|
|
创建新的 WebSocket 连接
|
|
"""
|
|
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
|
|
|
|
try:
|
|
conn_id = str(uuid.uuid4())
|
|
websocket_raw = await websockets.connect(
|
|
self.url, additional_headers=headers
|
|
)
|
|
websocket = cast(WebSocketClientProtocol, websocket_raw)
|
|
|
|
conn = WSConnection(websocket, conn_id)
|
|
logger.info(f"WebSocket 连接 {conn_id} 已建立")
|
|
return conn
|
|
except Exception as e:
|
|
raise WebSocketConnectionError(f"创建 WebSocket 连接失败: {e}")
|
|
|
|
async def get_connection(self) -> WSConnection:
|
|
"""
|
|
从连接池获取一个连接
|
|
"""
|
|
if self._closed:
|
|
raise WebSocketError("连接池已关闭")
|
|
|
|
try:
|
|
# 尝试从连接池获取连接
|
|
conn = await asyncio.wait_for(self.pool.get(), timeout=5)
|
|
|
|
# 检查连接是否活跃
|
|
if not conn.is_active:
|
|
logger.warning(f"连接 {conn.conn_id} 已失效,重新创建")
|
|
return await self._create_connection()
|
|
|
|
return conn
|
|
except asyncio.TimeoutError:
|
|
# 连接池为空,创建新连接
|
|
logger.warning("连接池为空,创建临时连接")
|
|
return await self._create_connection()
|
|
except Exception as e:
|
|
raise WebSocketError(f"获取连接失败: {e}")
|
|
|
|
async def release_connection(self, conn: WSConnection):
|
|
"""
|
|
释放连接回连接池
|
|
"""
|
|
if self._closed:
|
|
await conn.close()
|
|
return
|
|
|
|
if not conn.is_active:
|
|
logger.warning(f"连接 {conn.conn_id} 已失效,不返回连接池")
|
|
return
|
|
|
|
try:
|
|
if self.pool.full():
|
|
# 连接池已满,关闭该连接
|
|
await conn.close()
|
|
logger.info(f"连接池已满,关闭连接 {conn.conn_id}")
|
|
else:
|
|
await self.pool.put(conn)
|
|
logger.debug(f"连接 {conn.conn_id} 已返回连接池")
|
|
except Exception as e:
|
|
logger.error(f"释放连接失败: {e}")
|
|
await conn.close()
|
|
|
|
async def _cleanup_idle_connections(self):
|
|
"""
|
|
清理空闲连接任务
|
|
"""
|
|
while not self._closed:
|
|
await asyncio.sleep(60) # 每分钟检查一次
|
|
|
|
try:
|
|
# 检查连接池中的连接
|
|
new_pool = asyncio.Queue(maxsize=self.pool_size)
|
|
current_time = asyncio.get_event_loop().time()
|
|
|
|
while not self.pool.empty():
|
|
conn = await self.pool.get()
|
|
|
|
if current_time - conn.last_used > self.max_idle_time:
|
|
# 连接空闲时间过长,关闭
|
|
await conn.close()
|
|
logger.info(f"清理空闲连接 {conn.conn_id}")
|
|
else:
|
|
# 放回新队列
|
|
await new_pool.put(conn)
|
|
|
|
# 替换原连接池
|
|
self.pool = new_pool
|
|
except Exception as e:
|
|
logger.error(f"清理空闲连接失败: {e}")
|
|
|
|
async def close(self):
|
|
"""
|
|
关闭连接池
|
|
"""
|
|
if self._closed:
|
|
return
|
|
|
|
self._closed = True
|
|
|
|
# 停止清理任务
|
|
if self._cleanup_task:
|
|
self._cleanup_task.cancel()
|
|
try:
|
|
await self._cleanup_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# 关闭所有连接
|
|
while not self.pool.empty():
|
|
conn = await self.pool.get()
|
|
await conn.close()
|
|
|
|
logger.info("WebSocket 连接池已关闭") |