feat: 新增跨平台消息互通插件及适配器优化

refactor(discord_adapter): 优化音频处理与心跳机制
feat(plugins/discord-cross): 实现QQ与Discord消息互通功能
fix(events/base): 添加platform字段到基础事件模型
This commit is contained in:
2026-03-21 13:44:36 +08:00
parent 3814f49fcf
commit 51fb77e6e0
23 changed files with 1562 additions and 2148 deletions

View File

@@ -3,14 +3,16 @@
Discord 适配器 (Discord Adapter)
此模块负责与 Discord API 建立连接,接收 Discord 消息,
并将其转换为通用数据模型 (Universal Data Models)
同时提供将通用消息段发送回 Discord 的能力。
并将其转换为本地 OneBot 数据模型
同时提供将本地消息段发送回 Discord 的能力。
"""
import asyncio
import json
import os
import io
import requests
import tempfile
import subprocess
from typing import Union, List, Optional
try:
@@ -61,6 +63,8 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
"""当 Bot 成功连接到 Discord 时触发"""
self.logger.success(f"Discord Bot 已登录: {self.user} (ID: {self.user.id})")
self.start_heartbeat_task(interval=30)
# 启动 Redis 订阅以处理跨平台消息
asyncio.create_task(self.start_redis_subscription())
@@ -112,6 +116,61 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
except Exception as e:
self.logger.error(f"[DiscordAdapter] Redis 订阅异常: {e}")
async def convert_to_ogg_opus(self, audio_bytes: bytes) -> Optional[bytes]:
"""
将音频文件转换为 OGG Opus 格式,用于 Discord 语音消息
"""
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix=".tmp") as temp_in:
temp_in.write(audio_bytes)
temp_in_path = temp_in.name
with tempfile.NamedTemporaryFile(delete=False, suffix=".ogg") as temp_out:
temp_out_path = temp_out.name
# 使用 ffmpeg 转换
# -c:a libopus: 使用 Opus 编码器
# -b:a 64k: 比特率 64k
# -vbr on: 开启可变比特率
# -compression_level 10: 最高压缩级别
# -frame_duration 20: 帧时长 20ms
# -application voip: 针对语音优化
cmd = [
"ffmpeg", "-y", "-i", temp_in_path,
"-c:a", "libopus", "-b:a", "64k", "-vbr", "on",
"-compression_level", "10", "-frame_duration", "20",
"-application", "voip", temp_out_path
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
with open(temp_out_path, "rb") as f:
ogg_bytes = f.read()
return ogg_bytes
else:
self.logger.error(f"[DiscordAdapter] ffmpeg 转换失败: {stderr.decode('utf-8', errors='ignore')}")
return None
except Exception as e:
self.logger.error(f"[DiscordAdapter] 音频转换异常: {e}")
return None
finally:
# 清理临时文件
try:
if os.path.exists(temp_in_path):
os.remove(temp_in_path)
if os.path.exists(temp_out_path):
os.remove(temp_out_path)
except:
pass
async def handle_send_message(self, data: dict):
"""处理来自 Redis 的消息发送请求"""
try:
@@ -131,6 +190,10 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
self.logger.info(f"[DiscordAdapter] 正在发送消息到频道 {channel_id}")
embed = None
if embed_data:
embed = discord.Embed.from_dict(embed_data)
files = []
if attachments:
proxies = None
@@ -153,14 +216,60 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
response = requests.get(attachment_url, proxies=proxies, timeout=30)
if not filename:
filename = os.path.basename(attachment_url.split('?')[0]) or "attachment"
files.append(discord.File(fp=io.BytesIO(response.content), filename=filename))
# 检查是否是语音文件
is_voice = filename.lower().endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a'))
if is_voice:
# 尝试转换为 OGG Opus
ogg_bytes = await self.convert_to_ogg_opus(response.content)
if ogg_bytes:
# 转换成功,作为语音消息发送
# discord.py 官方 API 目前不支持直接发送语音消息
# 我们需要使用内部的 HTTP 客户端来发送
try:
# 构造文件数据
file_data = {
"name": "voice-message.ogg",
"value": ogg_bytes,
"content_type": "audio/ogg"
}
# 构造 payload
payload = {
"flags": 8192 # IS_VOICE_MESSAGE
}
if content:
payload["content"] = content
content = "" # 清空 content避免重复发送
if embed:
payload["embeds"] = [embed.to_dict()]
embed = None # 清空 embed避免重复发送
# 使用内部 HTTP 客户端发送
route = discord.http.Route('POST', '/channels/{channel_id}/messages', channel_id=channel_id)
await self.http.request(
route,
form=[
{'name': 'payload_json', 'value': json.dumps(payload)},
{'name': 'files[0]', 'value': ogg_bytes, 'filename': 'voice-message.ogg', 'content_type': 'audio/ogg'}
]
)
self.logger.success(f"[DiscordAdapter] 语音消息已发送到频道 {channel_id}")
continue # 跳过后面的普通发送逻辑
except Exception as e:
self.logger.error(f"[DiscordAdapter] 发送语音消息失败: {e},将作为普通文件发送")
files.append(discord.File(fp=io.BytesIO(ogg_bytes), filename="voice.ogg"))
else:
# 转换失败,作为普通文件发送
files.append(discord.File(fp=io.BytesIO(response.content), filename=filename))
else:
files.append(discord.File(fp=io.BytesIO(response.content), filename=filename))
except Exception as e:
self.logger.error(f"[DiscordAdapter] 下载附件失败: {attachment_url}, 错误: {e}")
embed = None
if embed_data:
embed = discord.Embed.from_dict(embed_data)
if content or files or embed:
await channel.send(content=content, files=files if files else None, embed=embed)
@@ -169,14 +278,73 @@ class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object):
except Exception as e:
self.logger.error(f"[DiscordAdapter] 发送消息失败: {e}")
async def start_client(self):
"""启动 Discord 客户端(非阻塞方式)"""
async def start_client(self, max_retries: int = -1, retry_delay: int = 5):
"""
启动 Discord 客户端
Args:
max_retries: 最大重连次数,-1 表示无限重连
retry_delay: 重连延迟(秒)
"""
if not DISCORD_AVAILABLE:
self.logger.error("无法启动 Discord 客户端discord.py 未安装")
return
try:
self.logger.info("正在连接 Discord...")
await self.start(self.token)
except Exception as e:
self.logger.error(f"Discord 连接失败: {e}")
retry_count = 0
while max_retries == -1 or retry_count < max_retries:
try:
self.logger.info("正在连接 Discord...")
await self.start(self.token)
except asyncio.CancelledError:
self.logger.info("连接被取消")
break
except Exception as e:
retry_count += 1
self.logger.error(f"Discord 连接失败: {e}")
if max_retries != -1 and retry_count >= max_retries:
self.logger.error(f"已达到最大重连次数 ({max_retries}),停止重连")
break
self.logger.info(f"将在 {retry_delay} 秒后重连 ({retry_count}/{max_retries if max_retries != -1 else '无限'})...")
# 清理旧的连接状态
self.clear()
await asyncio.sleep(retry_delay)
self.logger.info("Discord 客户端已停止")
async def start_heartbeat(self, interval: int = 30):
"""
启动心跳机制,定期检查连接状态
Args:
interval: 心跳间隔(秒)
"""
self.logger.info(f"心跳机制已启动,间隔: {interval}")
while self.is_closed() is False:
try:
await asyncio.sleep(interval)
if self.ws is not None and self.ws.closed:
self.logger.warning("检测到 WebSocket 连接已关闭,触发重连...")
await self.close()
break
self.logger.debug(f"心跳正常: {self.user}")
except Exception as e:
self.logger.error(f"心跳检测异常: {e}")
break
def start_heartbeat_task(self, interval: int = 30):
"""
启动心跳任务(非阻塞)
Args:
interval: 心跳间隔(秒)
"""
if not hasattr(self, 'heartbeat_task') or self.heartbeat_task.done():
self.heartbeat_task = asyncio.create_task(self.start_heartbeat(interval))
self.logger.info("心跳任务已启动")