Files
NeoBot/adapters/discord_adapter.py
镀铬酸钾 95672989ac Dev (#82)
* fix(discord): 修复 WebSocket 连接检测并增强跨平台文件处理

修复 Discord WebSocket 连接检测逻辑,使用正确的属性检查连接状态
为跨平台消息处理添加文件类型支持,并增加详细的调试日志
优化附件处理逻辑,确保所有文件类型都能正确识别和转发

* feat(跨平台): 优化消息处理并添加纯文本提取功能

添加 extract_text_only 函数过滤非文本标记
修改翻译逻辑仅处理纯文本内容
完善附件处理和消息内容拼接
修复仅包含表情时的消息处理问题

* refactor(discord-cross): 使用模块专用日志记录器替换全局日志记录器

将各模块中的全局日志记录器替换为模块专用日志记录器,以提供更清晰的日志来源标识
同时在适配器中添加会话状态检查和重连机制,提升消息发送的可靠性

* feat(翻译): 改进翻译功能,同时显示原文和译文

修改翻译功能,不再替换原文而是同时显示原文和翻译内容,方便用户对照
更新 DeepSeek API 配置为官方地址和模型
优化 Discord 适配器的重连逻辑,直接关闭 WebSocket 触发重连
修复 Discord 频道 ID 转换逻辑,简化处理流程

* feat(cross-platform): 添加跨平台功能支持及配置优化

- 新增跨平台配置模型和全局配置支持
- 优化 Discord 适配器的连接管理和错误处理
- 添加 watchdog 和 discord.py 依赖
- 创建 DeepSeek API 配置文档
- 移除重复的同步帮助图片代码
- 改进跨平台插件配置加载逻辑

* fix(jrcd): 修正群组ID检查条件

删除不再使用的示例插件文件

* feat: 改进配置加载逻辑并更新项目配置

当配置文件不存在时自动生成示例配置
添加pyproject.toml作为项目构建配置
更新.gitignore忽略更多文件类型
删除不再使用的反向WebSocket示例文件

* docs: 更新架构文档和项目结构说明

添加反向WebSocket连接模式说明
补充核心管理器文档
更新项目结构文件
在文档首页添加特色功能说明

* fix(discord): 修复WebSocket连接检查并添加错误日志

refactor(config): 更新配置文件的网络和认证信息

feat(cross-platform): 为跨平台消息处理添加异常捕获和日志

* fix(discord-cross): 修复跨平台消息处理和附件下载问题

修复QQ群消息处理中的非群消息过滤问题
优化Discord附件下载逻辑,使用aiohttp替代requests
修复Redis订阅任务重复创建问题
调整消息格式化的embed字段处理逻辑

* feat(vectordb): 添加向量数据库支持及集成功能

新增向量数据库管理器模块,支持文本的存储、检索和相似度查询
添加知识库插件和AI聊天插件,利用向量数据库实现记忆功能
优化跨平台翻译模块,集成向量数据库存储历史翻译记录
改进消息处理逻辑,优先使用用户显示名称

* feat(plugins): add furry_assistant plugin by Calgau

- Add furry assistant plugin with 7 commands
- Include furry greetings, fortunes, jokes, and advice
- Add plugin metadata and README documentation
- Implement plugin lifecycle methods
- Created by Calgau (furry AI assistant)

* fix: 调整昵称和用户名的获取优先级

修改QQ群消息处理中昵称获取顺序,优先使用昵称而非群名片
移除Discord消息转换中global_name的检查,直接使用用户名

* refactor(插件): 优化插件元信息和命令配置

- 为 AI 聊天和知识库插件添加元信息配置
- 简化插件命令配置,移除冗余别名
- 更新 Discord 适配器的 Redis 频道名称
- 增强向量数据库管理器的日志信息

---------

Co-authored-by: K2cr2O1 <indoec@163.com>
2026-03-24 14:58:04 +08:00

431 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Discord 适配器 (Discord Adapter)
此模块负责与 Discord API 建立连接,接收 Discord 消息,
并将其转换为本地 OneBot 数据模型,
同时提供将本地消息段发送回 Discord 的能力。
"""
import asyncio
import json
import os
import io
import requests
import tempfile
import subprocess
from typing import Union, List, Optional
try:
import discord
DISCORD_AVAILABLE = True
except ImportError:
DISCORD_AVAILABLE = False
from core.utils.logger import ModuleLogger
from .router import DiscordToOneBotConverter
from core.managers.redis_manager import redis_manager
from core.config_loader import global_config
class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
"""
Discord 客户端适配器。
继承自 discord.Client负责处理 Discord 的底层事件。
"""
def __init__(self, token: str):
if not DISCORD_AVAILABLE:
raise ImportError("discord.py 未安装,请运行 `pip install discord.py`")
self.logger = ModuleLogger("DiscordAdapter")
self.token = token
self.send_channel = None
self.proxy = None
self.proxy_type = "http"
self._redis_sub_task = None
if global_config.discord.proxy:
self.proxy = global_config.discord.proxy
self.proxy_type = global_config.discord.proxy_type or "http"
proxy_url = self.proxy
if self.proxy_type.lower() in ["socks5", "socks4"]:
if not proxy_url.startswith(("socks5://", "socks4://")):
proxy_url = f"{self.proxy_type.lower()}://{proxy_url.split('://')[-1]}"
os.environ["HTTP_PROXY"] = proxy_url
os.environ["HTTPS_PROXY"] = proxy_url
self.logger.info(f"[DiscordAdapter] 代理已设置: {proxy_url} (类型: {self.proxy_type})")
intents = discord.Intents.default()
intents.message_content = True
super().__init__(intents=intents)
async def on_ready(self):
"""当 Bot 成功连接到 Discord 时触发"""
self.logger.success(f"Discord Bot 已登录: {self.user} (ID: {self.user.id})")
self.start_heartbeat_task(interval=30)
# 启动 Redis 订阅以处理跨平台消息
if self._redis_sub_task is None or self._redis_sub_task.done():
self._redis_sub_task = asyncio.create_task(self.start_redis_subscription())
async def on_message(self, message: 'discord.Message'):
"""当收到 Discord 消息时触发"""
# 忽略机器人自己的消息
if message.author.bot:
return
self.logger.info(f"[Discord 消息] {message.author}: {message.content}")
# 1. 将 discord.Message 伪装成 OneBot 事件模型
# 2. 触发业务逻辑
# 将伪装后的事件丢给现有的命令管理器 (matcher)
from core.managers.command_manager import matcher
# matcher.handle_event 需要 bot 实例和 event 实例
# 我们在 create_mock_event 中已经注入了一个假的 bot 对象
try:
mock_event = DiscordToOneBotConverter.create_mock_event(message, self)
await matcher.handle_event(mock_event.bot, mock_event)
except Exception as e:
self.logger.error(f"处理 Discord 消息时发生异常: {e}")
# 记录详细的异常信息
import traceback
self.logger.error(f"异常堆栈: {traceback.format_exc()}")
async def start_redis_subscription(self):
"""启动 Redis 订阅以处理跨平台消息发送"""
if redis_manager._redis is None:
self.logger.warning("[DiscordAdapter] Redis 未初始化,跳过订阅")
return
try:
channel_name = "neobot_discord_send"
pubsub = redis_manager.redis.pubsub()
await pubsub.subscribe(channel_name)
self.logger.success(f"[DiscordAdapter] 已订阅 Redis 频道: {channel_name}")
async for message in pubsub.listen():
if message["type"] == "message":
try:
data = json.loads(message["data"])
if data.get("type") == "send_message":
# 使用 asyncio.create_task 异步处理消息,避免阻塞订阅循环
asyncio.create_task(self.handle_send_message(data))
except json.JSONDecodeError as e:
self.logger.error(f"[DiscordAdapter] 解析 Redis 消息失败: {e}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] 处理 Redis 消息失败: {e}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] Redis 订阅异常: {e}")
async def convert_to_ogg_opus(self, audio_bytes: bytes) -> Optional[bytes]:
"""
将音频文件转换为 OGG Opus 格式,用于 Discord 语音消息
"""
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix=".tmp") as temp_in:
temp_in.write(audio_bytes)
temp_in_path = temp_in.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".ogg") as temp_out:
temp_out_path = temp_out.name
# 使用 ffmpeg 转换
# -c:a libopus: 使用 Opus 编码器
# -b:a 64k: 比特率 64k
# -vbr on: 开启可变比特率
# -compression_level 10: 最高压缩级别
# -frame_duration 20: 帧时长 20ms
# -application voip: 针对语音优化
cmd = [
"ffmpeg", "-y", "-i", temp_in_path,
"-c:a", "libopus", "-b:a", "64k", "-vbr", "on",
"-compression_level", "10", "-frame_duration", "20",
"-application", "voip", temp_out_path
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
with open(temp_out_path, "rb") as f:
ogg_bytes = f.read()
return ogg_bytes
else:
self.logger.error(f"[DiscordAdapter] ffmpeg 转换失败: {stderr.decode('utf-8', errors='ignore')}")
return None
except Exception as e:
self.logger.error(f"[DiscordAdapter] 音频转换异常: {e}")
return None
finally:
# 清理临时文件
try:
if os.path.exists(temp_in_path):
os.remove(temp_in_path)
if os.path.exists(temp_out_path):
os.remove(temp_out_path)
except:
pass
async def handle_send_message(self, data: dict):
"""处理来自 Redis 的消息发送请求"""
try:
channel_id = data.get("channel_id")
content = data.get("content", "")
attachments = data.get("attachments", [])
embed_data = data.get("embed")
if channel_id is None:
self.logger.error("[DiscordAdapter] 缺少 channel_id")
return
channel = self.get_channel(channel_id)
if channel is None:
self.logger.error(f"[DiscordAdapter] 未找到频道: {channel_id}")
return
# 检查会话状态
if not self.is_closed():
self.logger.info(f"[DiscordAdapter] 正在发送消息到频道 {channel_id}")
else:
self.logger.error(f"[DiscordAdapter] 会话已关闭,无法发送消息到频道 {channel_id}")
# 触发重连
self.logger.warning(f"[DiscordAdapter] 会话已关闭,将触发重连")
if self.ws is not None:
# 关闭 WebSocket 连接,让 discord.py 自动重连
await self.ws.close(4000)
return
embed = None
if embed_data:
embed = discord.Embed.from_dict(embed_data)
files = []
if attachments:
proxies = None
if self.proxy:
proxies = {
"http": self.proxy,
"https": self.proxy
}
for attachment in attachments:
if isinstance(attachment, dict):
attachment_url = attachment.get("url", "")
filename = attachment.get("filename", "")
else:
attachment_url = str(attachment)
filename = ""
if attachment_url.startswith('http'):
try:
import aiohttp
proxy_url = self.proxy if self.proxy else None
async with aiohttp.ClientSession() as session:
async with session.get(attachment_url, proxy=proxy_url, timeout=30) as response:
content_bytes = await response.read()
if not filename:
filename = os.path.basename(attachment_url.split('?')[0]) or "attachment"
# 检查是否是语音文件
is_voice = filename.lower().endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a'))
if is_voice:
# 尝试转换为 OGG Opus
ogg_bytes = await self.convert_to_ogg_opus(content_bytes)
if ogg_bytes:
# 转换成功,作为语音消息发送
# discord.py 官方 API 目前不支持直接发送语音消息
# 我们需要使用内部的 HTTP 客户端来发送
try:
# 构造文件数据
file_data = {
"name": "voice-message.ogg",
"value": ogg_bytes,
"content_type": "audio/ogg"
}
# 构造 payload
payload = {
"flags": 8192 # IS_VOICE_MESSAGE
}
if content:
payload["content"] = content
content = "" # 清空 content避免重复发送
if embed:
payload["embeds"] = [embed.to_dict()]
embed = None # 清空 embed避免重复发送
# 使用内部 HTTP 客户端发送
route = discord.http.Route('POST', '/channels/{channel_id}/messages', channel_id=channel_id)
await self.http.request(
route,
form=[
{'name': 'payload_json', 'value': json.dumps(payload)},
{'name': 'files[0]', 'value': ogg_bytes, 'filename': 'voice-message.ogg', 'content_type': 'audio/ogg'}
]
)
self.logger.success(f"[DiscordAdapter] 语音消息已发送到频道 {channel_id}")
continue # 跳过后面的普通发送逻辑
except Exception as e:
self.logger.error(f"[DiscordAdapter] 发送语音消息失败: {e},将作为普通文件发送")
files.append(discord.File(fp=io.BytesIO(ogg_bytes), filename="voice.ogg"))
else:
# 转换失败,作为普通文件发送
files.append(discord.File(fp=io.BytesIO(content_bytes), filename=filename))
else:
files.append(discord.File(fp=io.BytesIO(content_bytes), filename=filename))
except Exception as e:
self.logger.error(f"[DiscordAdapter] 下载附件失败: {attachment_url}, 错误: {e}")
if content or files or embed:
try:
await channel.send(content=content, files=files if files else None, embed=embed)
self.logger.success(f"[DiscordAdapter] 消息已发送到频道 {channel_id}")
except Exception as send_error:
self.logger.error(f"[DiscordAdapter] 发送消息失败 (channel.send): {send_error}")
# 如果发送失败,尝试检查会话状态
if self.is_closed():
self.logger.warning(f"[DiscordAdapter] 会话已关闭,将触发重连")
if self.ws is not None:
await self.ws.close(4000)
raise
else:
self.logger.debug(f"[DiscordAdapter] 没有内容需要发送到频道 {channel_id}")
except Exception as e:
self.logger.error(f"[DiscordAdapter] 发送消息失败: {e}")
async def start_client(self, max_retries: int = -1, retry_delay: int = 5):
"""
启动 Discord 客户端
Args:
max_retries: 最大重连次数,-1 表示无限重连
retry_delay: 重连延迟(秒)
"""
if not DISCORD_AVAILABLE:
self.logger.error("无法启动 Discord 客户端discord.py 未安装")
return
retry_count = 0
while max_retries == -1 or retry_count < max_retries:
try:
self.logger.info("正在连接 Discord...")
await self.start(self.token)
except asyncio.CancelledError:
self.logger.info("连接被取消")
break
except discord.ConnectionClosed as e:
retry_count += 1
self.logger.warning(f"Discord 连接关闭: code={e.code}, reason={e.reason}")
# 如果是正常关闭,不计入重连次数
if e.code == 1000:
self.logger.info("连接正常关闭,等待重新连接...")
continue
if max_retries != -1 and retry_count >= max_retries:
self.logger.error(f"已达到最大重连次数 ({max_retries}),停止重连")
break
self.logger.info(f"将在 {retry_delay} 秒后重连 ({retry_count}/{max_retries if max_retries != -1 else '无限'})...")
await self._cleanup_connection()
await asyncio.sleep(retry_delay)
except Exception as e:
retry_count += 1
self.logger.error(f"Discord 连接异常: {e}")
if max_retries != -1 and retry_count >= max_retries:
self.logger.error(f"已达到最大重连次数 ({max_retries}),停止重连")
break
self.logger.info(f"将在 {retry_delay} 秒后重连 ({retry_count}/{max_retries if max_retries != -1 else '无限'})...")
await self._cleanup_connection()
await asyncio.sleep(retry_delay)
self.logger.info("Discord 客户端已停止")
async def _cleanup_connection(self):
"""
清理旧的连接状态
"""
try:
# 停止心跳任务
if hasattr(self, 'heartbeat_task') and not self.heartbeat_task.done():
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(f"清理心跳任务时出错: {e}")
try:
# 清理 HTTP 连接
if hasattr(self, 'http') and self.http:
await self.http.close()
except Exception as e:
self.logger.error(f"清理 HTTP 连接时出错: {e}")
try:
# 清理客户端状态
self.clear()
except Exception as e:
self.logger.error(f"清理客户端状态时出错: {e}")
async def start_heartbeat(self, interval: int = 30):
"""
启动心跳机制,定期检查连接状态
Args:
interval: 心跳间隔(秒)
"""
self.logger.info(f"心跳机制已启动,间隔: {interval}")
while not self.is_closed():
try:
await asyncio.sleep(interval)
# 检查 WebSocket 连接状态
if self.ws is not None:
# 正确检查 WebSocket 状态
if not getattr(self.ws, 'open', False):
self.logger.warning("检测到 WebSocket 连接已关闭,触发重连...")
try:
await self.ws.close(code=4000)
except Exception as close_error:
self.logger.error(f"关闭 WebSocket 连接时出错: {close_error}")
break
self.logger.debug(f"心跳正常: {self.user}")
except Exception as e:
self.logger.error(f"心跳检测异常: {e}")
break
def start_heartbeat_task(self, interval: int = 30):
"""
启动心跳任务(非阻塞)
Args:
interval: 心跳间隔(秒)
"""
if not hasattr(self, 'heartbeat_task') or self.heartbeat_task.done():
self.heartbeat_task = asyncio.create_task(self.start_heartbeat(interval))
self.logger.info("心跳任务已启动")