refactor(WS): 使用连接池上下文管理器简化连接管理

重构 WS 类中的连接获取和释放逻辑,使用 connection 上下文管理器确保连接正确释放。
同时改进消息处理循环中的异常处理和连接管理。

refactor(ws_pool): 增强连接池的健壮性和管理能力

1. 添加连接上下文管理器支持
2. 改进连接获取和释放逻辑,增加连接计数和锁保护
3. 优化连接健康检查和清理机制
4. 增强错误处理和日志记录

fix(bot_status): 增加系统信息获取和渲染的错误处理

1. 为系统信息获取添加超时和错误处理
2. 为Redis数据获取添加异常捕获
3. 为图片渲染添加异常处理
4. 改进日志记录和用户反馈
This commit is contained in:
2026-01-23 17:15:07 +08:00
parent 15dcf0f764
commit c851b49db9
3 changed files with 232 additions and 114 deletions

View File

@@ -7,9 +7,10 @@ WebSocket 连接池模块
import asyncio
import websockets
from websockets.legacy.client import WebSocketClientProtocol
from typing import Optional, Dict, Any, cast, Union
from typing import Optional, Dict, Any, cast, Union, AsyncGenerator
import uuid
from loguru import logger
import contextlib
from .config_loader import global_config
from .utils.exceptions import WebSocketError, WebSocketConnectionError
@@ -64,9 +65,11 @@ class WSConnection:
if not self.is_active:
return False
try:
await asyncio.wait_for(self.conn.ping(), timeout=timeout)
# 使用 wait_for 包装 ping
pong_waiter = await self.conn.ping()
await asyncio.wait_for(pong_waiter, timeout=timeout)
return True
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed, Exception):
self.is_active = False
return False
@@ -76,7 +79,10 @@ class WSConnection:
"""
if self.is_active:
self.is_active = False
await self.conn.close()
try:
await self.conn.close()
except Exception:
pass
class WSConnectionPool:
@@ -97,6 +103,8 @@ class WSConnectionPool:
self.pool: asyncio.Queue[WSConnection] = asyncio.Queue(maxsize=pool_size)
self._closed = False
self._cleanup_task: Optional[asyncio.Task] = None
self._current_size = 0 # 当前管理的连接数(包括池中和借出的)
self._lock = asyncio.Lock() # 用于保护 _current_size 的修改
# 从全局配置读取参数
self.url = global_config.napcat_ws.uri
@@ -115,14 +123,17 @@ class WSConnectionPool:
# 启动连接清理任务
self._cleanup_task = asyncio.create_task(self._cleanup_idle_connections())
# 创建初始连接
# 预热连接
for _ in range(self.pool_size):
try:
conn = await self._create_connection()
await self.pool.put(conn)
async with self._lock:
self._current_size += 1
logger.info(f"WebSocket 连接 {conn.conn_id} 已创建并加入连接池")
except Exception as e:
logger.error(f"创建初始连接失败: {e}")
# 初始连接失败不抛出异常,允许后续动态创建
async def _create_connection(self) -> WSConnection:
"""
@@ -143,6 +154,17 @@ class WSConnectionPool:
except Exception as e:
raise WebSocketConnectionError(f"创建 WebSocket 连接失败: {e}")
@contextlib.asynccontextmanager
async def connection(self) -> AsyncGenerator[WSConnection, None]:
"""
获取连接的上下文管理器
"""
conn = await self.get_connection()
try:
yield conn
finally:
await self.release_connection(conn)
async def get_connection(self) -> WSConnection:
"""
从连接池获取一个健康的连接,包含健康检查。
@@ -150,25 +172,64 @@ class WSConnectionPool:
if self._closed:
raise WebSocketError("连接池已关闭")
try:
# 尝试从连接池获取连接
conn = await asyncio.wait_for(self.pool.get(), timeout=5)
# 健康检查
if await conn.ping():
logger.debug(f"连接 {conn.conn_id} 健康检查通过")
return conn
else:
logger.warning(f"连接 {conn.conn_id} 健康检查失败,丢弃并获取新连接")
await conn.close()
return await self.get_connection() # 递归获取下一个
start_time = asyncio.get_event_loop().time()
timeout = 10 # 获取连接的总超时时间
except asyncio.TimeoutError:
# 连接池为空,创建新连接
logger.warning("连接池在5秒内无可用连接创建新连接")
return await self._create_connection()
except Exception as e:
raise WebSocketError(f"获取连接时发生未知错误: {e}")
while True:
if asyncio.get_event_loop().time() - start_time > timeout:
raise WebSocketError("获取连接超时")
try:
# 1. 尝试从池中获取
conn = self.pool.get_nowait()
# 健康检查
if await conn.ping():
logger.debug(f"连接 {conn.conn_id} 健康检查通过")
return conn
else:
logger.warning(f"连接 {conn.conn_id} 健康检查失败,丢弃")
await conn.close()
async with self._lock:
self._current_size -= 1
# 继续循环,尝试获取下一个或创建新的
continue
except asyncio.QueueEmpty:
# 池为空,检查是否可以创建新连接
async with self._lock:
if self._current_size < self.pool_size:
# 有配额,创建新连接
self._current_size += 1 # 先占位
create_new = True
else:
create_new = False
if create_new:
try:
conn = await self._create_connection()
return conn
except Exception as e:
async with self._lock:
self._current_size -= 1 # 回滚占位
logger.error(f"创建新连接失败: {e}")
await asyncio.sleep(1) # 避免快速失败循环
continue
else:
# 没有配额,等待池中有可用连接
try:
conn = await asyncio.wait_for(self.pool.get(), timeout=1.0)
# 获取到了,进行健康检查(在下一次循环中处理,或者这里直接处理)
# 为了代码复用,我们把 conn 放回去(或者直接用),这里直接用
if await conn.ping():
return conn
else:
await conn.close()
async with self._lock:
self._current_size -= 1
continue
except asyncio.TimeoutError:
continue
async def release_connection(self, conn: WSConnection):
"""
@@ -180,19 +241,26 @@ class WSConnectionPool:
if not conn.is_active:
logger.warning(f"连接 {conn.conn_id} 已失效,不返回连接池")
await conn.close()
async with self._lock:
self._current_size -= 1
return
try:
if self.pool.full():
# 连接池已满,关闭该连接
await conn.close()
logger.info(f"连接池已满,关闭连接 {conn.conn_id}")
else:
await self.pool.put(conn)
logger.debug(f"连接 {conn.conn_id} 已返回连接池")
# 尝试放回池中
self.pool.put_nowait(conn)
logger.debug(f"连接 {conn.conn_id} 已返回连接池")
except asyncio.QueueFull:
# 理论上不应该发生,除非 _current_size 逻辑有误
logger.warning(f"连接池已满,关闭多余连接 {conn.conn_id}")
await conn.close()
async with self._lock:
self._current_size -= 1
except Exception as e:
logger.error(f"释放连接失败: {e}")
await conn.close()
async with self._lock:
self._current_size -= 1
async def _cleanup_idle_connections(self):
"""
@@ -202,23 +270,33 @@ class WSConnectionPool:
await asyncio.sleep(60) # 每分钟检查一次
try:
# 检查连接池中的连接
new_pool = asyncio.Queue(maxsize=self.pool_size)
current_time = asyncio.get_event_loop().time()
# 我们不替换队列,而是取出检查再放回
# 这样比较安全,但可能会暂时清空池子
# 更好的做法是只检查队头的连接
while not self.pool.empty():
conn = await self.pool.get()
# 获取当前队列大小
qsize = self.pool.qsize()
for _ in range(qsize):
try:
conn = self.pool.get_nowait()
except asyncio.QueueEmpty:
break
current_time = asyncio.get_event_loop().time()
if current_time - conn.last_used > self.max_idle_time:
# 连接空闲时间过长,关闭
await conn.close()
logger.info(f"清理空闲连接 {conn.conn_id}")
await conn.close()
async with self._lock:
self._current_size -= 1
else:
# 放回新队列
await new_pool.put(conn)
# 替换原连接池
self.pool = new_pool
# 还没过期,放回去
try:
self.pool.put_nowait(conn)
except asyncio.QueueFull:
# 竞争条件下可能满了
await conn.close()
async with self._lock:
self._current_size -= 1
except Exception as e:
logger.error(f"清理空闲连接失败: {e}")
@@ -241,7 +319,10 @@ class WSConnectionPool:
# 关闭所有连接
while not self.pool.empty():
conn = await self.pool.get()
await conn.close()
try:
conn = self.pool.get_nowait()
await conn.close()
except asyncio.QueueEmpty:
break
logger.info("WebSocket 连接池已关闭")
logger.info("WebSocket 连接池已关闭")