fix(discord): 修复 WebSocket 连接检测并增强跨平台文件处理

修复 Discord WebSocket 连接检测逻辑,使用正确的属性检查连接状态
为跨平台消息处理添加文件类型支持,并增加详细的调试日志
优化附件处理逻辑,确保所有文件类型都能正确识别和转发
This commit is contained in:
2026-03-21 14:26:54 +08:00
committed by 镀铬酸钾
parent 6fa8dd27c4
commit f6d431cb5f
3 changed files with 833 additions and 114 deletions

View File

@@ -21,10 +21,10 @@ try:
except ImportError:
DISCORD_AVAILABLE = False
from neobot.core.utils.logger import ModuleLogger
from core.utils.logger import ModuleLogger
from .router import DiscordToOneBotConverter
from neobot.core.managers.redis_manager import redis_manager
from neobot.core.config_loader import global_config
from core.managers.redis_manager import redis_manager
from core.config_loader import global_config
class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
"""
@@ -41,7 +41,6 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
self.proxy = None
self.proxy_type = "http"
self._redis_sub_task = None
if global_config.discord.proxy:
self.proxy = global_config.discord.proxy
self.proxy_type = global_config.discord.proxy_type or "http"
@@ -66,29 +65,8 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
self.start_heartbeat_task(interval=30)
if self._redis_sub_task is None or self._redis_sub_task.done():
if self._redis_sub_task is not None and not self._redis_sub_task.done():
self._redis_sub_task.cancel()
try:
await self._redis_sub_task
except asyncio.CancelledError:
pass
self._redis_sub_task = asyncio.create_task(self.start_redis_subscription())
async def on_resumed(self):
"""当 Bot 重新连接到 Discord 时触发"""
self.logger.success(f"Discord Bot 已重新连接: {self.user} (ID: {self.user.id})")
self.start_heartbeat_task(interval=30)
if self._redis_sub_task is None or self._redis_sub_task.done():
if self._redis_sub_task is not None and not self._redis_sub_task.done():
self._redis_sub_task.cancel()
try:
await self._redis_sub_task
except asyncio.CancelledError:
pass
self._redis_sub_task = asyncio.create_task(self.start_redis_subscription())
# 启动 Redis 订阅以处理跨平台消息
asyncio.create_task(self.start_redis_subscription())
async def on_message(self, message: 'discord.Message'):
"""当收到 Discord 消息时触发"""
@@ -101,7 +79,7 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
# 1. 将 discord.Message 伪装成 OneBot 事件模型
# 2. 触发业务逻辑
# 将伪装后的事件丢给现有的命令管理器 (matcher)
from neobot.core.managers.command_manager import matcher
from core.managers.command_manager import matcher
# matcher.handle_event 需要 bot 实例和 event 实例
# 我们在 create_mock_event 中已经注入了一个假的 bot 对象
@@ -110,13 +88,10 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
await matcher.handle_event(mock_event.bot, mock_event)
except Exception as e:
self.logger.error(f"处理 Discord 消息时发生异常: {e}")
# 记录详细的异常信息
import traceback
self.logger.error(f"异常堆栈: {traceback.format_exc()}")
async def start_redis_subscription(self):
"""启动 Redis 订阅以处理跨平台消息发送"""
if redis_manager._redis is None:
if redis_manager.redis is None:
self.logger.warning("[DiscordAdapter] Redis 未初始化,跳过订阅")
return
@@ -132,8 +107,7 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
try:
data = json.loads(message["data"])
if data.get("type") == "send_message":
# 使用 asyncio.create_task 异步处理消息,避免阻塞订阅循环
asyncio.create_task(self.handle_send_message(data))
await self.handle_send_message(data)
except json.JSONDecodeError as e:
self.logger.error(f"[DiscordAdapter] 解析 Redis 消息失败: {e}")
except Exception as e:
@@ -214,12 +188,7 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
self.logger.error(f"[DiscordAdapter] 未找到频道: {channel_id}")
return
# 检查会话状态
if not self.is_closed():
self.logger.info(f"[DiscordAdapter] 正在发送消息到频道 {channel_id}")
else:
self.logger.warning(f"[DiscordAdapter] 会话已关闭,消息将被丢弃: channel_id={channel_id}")
return
self.logger.info(f"[DiscordAdapter] 正在发送消息到频道 {channel_id}")
embed = None
if embed_data:
@@ -244,12 +213,7 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
if attachment_url.startswith('http'):
try:
import aiohttp
proxy_url = self.proxy if self.proxy else None
async with aiohttp.ClientSession() as session:
async with session.get(attachment_url, proxy=proxy_url, timeout=30) as response:
content_bytes = await response.read()
response = requests.get(attachment_url, proxies=proxies, timeout=30)
if not filename:
filename = os.path.basename(attachment_url.split('?')[0]) or "attachment"
@@ -258,7 +222,7 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
if is_voice:
# 尝试转换为 OGG Opus
ogg_bytes = await self.convert_to_ogg_opus(content_bytes)
ogg_bytes = await self.convert_to_ogg_opus(response.content)
if ogg_bytes:
# 转换成功,作为语音消息发送
# discord.py 官方 API 目前不支持直接发送语音消息
@@ -300,21 +264,16 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
files.append(discord.File(fp=io.BytesIO(ogg_bytes), filename="voice.ogg"))
else:
# 转换失败,作为普通文件发送
files.append(discord.File(fp=io.BytesIO(content_bytes), filename=filename))
files.append(discord.File(fp=io.BytesIO(response.content), filename=filename))
else:
files.append(discord.File(fp=io.BytesIO(content_bytes), filename=filename))
files.append(discord.File(fp=io.BytesIO(response.content), filename=filename))
except Exception as e:
self.logger.error(f"[DiscordAdapter] 下载附件失败: {attachment_url}, 错误: {e}")
if content or files or embed:
try:
await channel.send(content=content, files=files if files else None, embed=embed)
self.logger.success(f"[DiscordAdapter] 消息已发送到频道 {channel_id}")
except Exception as send_error:
self.logger.error(f"[DiscordAdapter] 发送消息失败 (channel.send): {send_error}")
raise
else:
self.logger.debug(f"[DiscordAdapter] 没有内容需要发送到频道 {channel_id}")
await channel.send(content=content, files=files if files else None, embed=embed)
self.logger.success(f"[DiscordAdapter] 消息已发送到频道 {channel_id}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] 发送消息失败: {e}")
@@ -340,64 +299,21 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
except asyncio.CancelledError:
self.logger.info("连接被取消")
break
except discord.ConnectionClosed as e:
retry_count += 1
self.logger.warning(f"Discord 连接关闭: code={e.code}, reason={e.reason}")
# 如果是正常关闭,不计入重连次数
if e.code == 1000:
self.logger.info("连接正常关闭,等待重新连接...")
continue
if max_retries != -1 and retry_count >= max_retries:
self.logger.error(f"已达到最大重连次数 ({max_retries}),停止重连")
break
self.logger.info(f"将在 {retry_delay} 秒后重连 ({retry_count}/{max_retries if max_retries != -1 else '无限'})...")
await self._cleanup_connection()
await asyncio.sleep(retry_delay)
except Exception as e:
retry_count += 1
self.logger.error(f"Discord 连接异常: {e}")
self.logger.error(f"Discord 连接失败: {e}")
if max_retries != -1 and retry_count >= max_retries:
self.logger.error(f"已达到最大重连次数 ({max_retries}),停止重连")
break
self.logger.info(f"将在 {retry_delay} 秒后重连 ({retry_count}/{max_retries if max_retries != -1 else '无限'})...")
await self._cleanup_connection()
# 清理旧的连接状态
self.clear()
await asyncio.sleep(retry_delay)
self.logger.info("Discord 客户端已停止")
async def _cleanup_connection(self):
"""
清理旧的连接状态
"""
try:
# 停止心跳任务
if hasattr(self, 'heartbeat_task') and not self.heartbeat_task.done():
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(f"清理心跳任务时出错: {e}")
try:
# 清理 HTTP 连接
if hasattr(self, 'http') and self.http:
await self.http.close()
except Exception as e:
self.logger.error(f"清理 HTTP 连接时出错: {e}")
try:
# 清理客户端状态
self.clear()
except Exception as e:
self.logger.error(f"清理客户端状态时出错: {e}")
async def start_heartbeat(self, interval: int = 30):
"""
启动心跳机制,定期检查连接状态
@@ -407,20 +323,16 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
"""
self.logger.info(f"心跳机制已启动,间隔: {interval}")
while not self.is_closed():
while self.is_closed() is False:
try:
await asyncio.sleep(interval)
# 检查 WebSocket 连接状态
if self.ws is not None:
# 正确检查 WebSocket 状态
if not getattr(self.ws, 'open', False):
self.logger.warning("检测到 WebSocket 连接已关闭,触发重连...")
try:
await self.ws.close(code=4000)
except Exception as close_error:
self.logger.error(f"关闭 WebSocket 连接时出错: {close_error}")
break
# discord.py 的 ws 对象是 DiscordWebSocket它没有 closed 属性
# 我们可以通过检查 self.is_closed() 或者 ws.open 来判断
if self.ws is not None and not getattr(self.ws, 'open', True):
self.logger.warning("检测到 WebSocket 连接已关闭,触发重连...")
await self.close()
break
self.logger.debug(f"心跳正常: {self.user}")