# -*- coding: utf-8 -*- """ Discord 适配器 (Discord Adapter) 此模块负责与 Discord API 建立连接,接收 Discord 消息, 并将其转换为本地 OneBot 数据模型, 同时提供将本地消息段发送回 Discord 的能力。 """ import asyncio import json import os import io import requests import tempfile import subprocess from typing import Union, List, Optional try: import discord DISCORD_AVAILABLE = True except ImportError: DISCORD_AVAILABLE = False from core.utils.logger import ModuleLogger from .router import DiscordToOneBotConverter from core.managers.redis_manager import redis_manager from core.config_loader import global_config class DiscordAdapter(discord.Client if DISCORD_AVAILABLE else object): """ Discord 客户端适配器。 继承自 discord.Client,负责处理 Discord 的底层事件。 """ def __init__(self, token: str): if not DISCORD_AVAILABLE: raise ImportError("discord.py 未安装,请运行 `pip install discord.py`") self.logger = ModuleLogger("DiscordAdapter") self.token = token self.send_channel = None self.proxy = None self.proxy_type = "http" if global_config.discord.proxy: self.proxy = global_config.discord.proxy self.proxy_type = global_config.discord.proxy_type or "http" proxy_url = self.proxy if self.proxy_type.lower() in ["socks5", "socks4"]: if not proxy_url.startswith(("socks5://", "socks4://")): proxy_url = f"{self.proxy_type.lower()}://{proxy_url.split('://')[-1]}" os.environ["HTTP_PROXY"] = proxy_url os.environ["HTTPS_PROXY"] = proxy_url self.logger.info(f"[DiscordAdapter] 代理已设置: {proxy_url} (类型: {self.proxy_type})") intents = discord.Intents.default() intents.message_content = True super().__init__(intents=intents) async def on_ready(self): """当 Bot 成功连接到 Discord 时触发""" self.logger.success(f"Discord Bot 已登录: {self.user} (ID: {self.user.id})") self.start_heartbeat_task(interval=30) # 启动 Redis 订阅以处理跨平台消息 asyncio.create_task(self.start_redis_subscription()) async def on_message(self, message: 'discord.Message'): """当收到 Discord 消息时触发""" # 忽略机器人自己的消息 if message.author.bot: return self.logger.info(f"[Discord 消息] {message.author}: {message.content}") # 1. 将 discord.Message 伪装成 OneBot 事件模型 # 2. 触发业务逻辑 # 将伪装后的事件丢给现有的命令管理器 (matcher) from core.managers.command_manager import matcher # matcher.handle_event 需要 bot 实例和 event 实例 # 我们在 create_mock_event 中已经注入了一个假的 bot 对象 try: mock_event = DiscordToOneBotConverter.create_mock_event(message, self) await matcher.handle_event(mock_event.bot, mock_event) except Exception as e: self.logger.error(f"处理 Discord 消息时发生异常: {e}") async def start_redis_subscription(self): """启动 Redis 订阅以处理跨平台消息发送""" if redis_manager.redis is None: self.logger.warning("[DiscordAdapter] Redis 未初始化,跳过订阅") return try: channel_name = "neobot_discord_send" pubsub = redis_manager.redis.pubsub() await pubsub.subscribe(channel_name) self.logger.success(f"[DiscordAdapter] 已订阅 Redis 频道: {channel_name}") async for message in pubsub.listen(): if message["type"] == "message": try: data = json.loads(message["data"]) if data.get("type") == "send_message": await self.handle_send_message(data) except json.JSONDecodeError as e: self.logger.error(f"[DiscordAdapter] 解析 Redis 消息失败: {e}") except Exception as e: self.logger.error(f"[DiscordAdapter] 处理 Redis 消息失败: {e}") except Exception as e: self.logger.error(f"[DiscordAdapter] Redis 订阅异常: {e}") async def convert_to_ogg_opus(self, audio_bytes: bytes) -> Optional[bytes]: """ 将音频文件转换为 OGG Opus 格式,用于 Discord 语音消息 """ try: # 创建临时文件 with tempfile.NamedTemporaryFile(delete=False, suffix=".tmp") as temp_in: temp_in.write(audio_bytes) temp_in_path = temp_in.name with tempfile.NamedTemporaryFile(delete=False, suffix=".ogg") as temp_out: temp_out_path = temp_out.name # 使用 ffmpeg 转换 # -c:a libopus: 使用 Opus 编码器 # -b:a 64k: 比特率 64k # -vbr on: 开启可变比特率 # -compression_level 10: 最高压缩级别 # -frame_duration 20: 帧时长 20ms # -application voip: 针对语音优化 cmd = [ "ffmpeg", "-y", "-i", temp_in_path, "-c:a", "libopus", "-b:a", "64k", "-vbr", "on", "-compression_level", "10", "-frame_duration", "20", "-application", "voip", temp_out_path ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode == 0: with open(temp_out_path, "rb") as f: ogg_bytes = f.read() return ogg_bytes else: self.logger.error(f"[DiscordAdapter] ffmpeg 转换失败: {stderr.decode('utf-8', errors='ignore')}") return None except Exception as e: self.logger.error(f"[DiscordAdapter] 音频转换异常: {e}") return None finally: # 清理临时文件 try: if os.path.exists(temp_in_path): os.remove(temp_in_path) if os.path.exists(temp_out_path): os.remove(temp_out_path) except: pass async def handle_send_message(self, data: dict): """处理来自 Redis 的消息发送请求""" try: channel_id = data.get("channel_id") content = data.get("content", "") attachments = data.get("attachments", []) embed_data = data.get("embed") if channel_id is None: self.logger.error("[DiscordAdapter] 缺少 channel_id") return channel = self.get_channel(channel_id) if channel is None: self.logger.error(f"[DiscordAdapter] 未找到频道: {channel_id}") return self.logger.info(f"[DiscordAdapter] 正在发送消息到频道 {channel_id}") embed = None if embed_data: embed = discord.Embed.from_dict(embed_data) files = [] if attachments: proxies = None if self.proxy: proxies = { "http": self.proxy, "https": self.proxy } for attachment in attachments: if isinstance(attachment, dict): attachment_url = attachment.get("url", "") filename = attachment.get("filename", "") else: attachment_url = str(attachment) filename = "" if attachment_url.startswith('http'): try: response = requests.get(attachment_url, proxies=proxies, timeout=30) if not filename: filename = os.path.basename(attachment_url.split('?')[0]) or "attachment" # 检查是否是语音文件 is_voice = filename.lower().endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')) if is_voice: # 尝试转换为 OGG Opus ogg_bytes = await self.convert_to_ogg_opus(response.content) if ogg_bytes: # 转换成功,作为语音消息发送 # discord.py 官方 API 目前不支持直接发送语音消息 # 我们需要使用内部的 HTTP 客户端来发送 try: # 构造文件数据 file_data = { "name": "voice-message.ogg", "value": ogg_bytes, "content_type": "audio/ogg" } # 构造 payload payload = { "flags": 8192 # IS_VOICE_MESSAGE } if content: payload["content"] = content content = "" # 清空 content,避免重复发送 if embed: payload["embeds"] = [embed.to_dict()] embed = None # 清空 embed,避免重复发送 # 使用内部 HTTP 客户端发送 route = discord.http.Route('POST', '/channels/{channel_id}/messages', channel_id=channel_id) await self.http.request( route, form=[ {'name': 'payload_json', 'value': json.dumps(payload)}, {'name': 'files[0]', 'value': ogg_bytes, 'filename': 'voice-message.ogg', 'content_type': 'audio/ogg'} ] ) self.logger.success(f"[DiscordAdapter] 语音消息已发送到频道 {channel_id}") continue # 跳过后面的普通发送逻辑 except Exception as e: self.logger.error(f"[DiscordAdapter] 发送语音消息失败: {e},将作为普通文件发送") files.append(discord.File(fp=io.BytesIO(ogg_bytes), filename="voice.ogg")) else: # 转换失败,作为普通文件发送 files.append(discord.File(fp=io.BytesIO(response.content), filename=filename)) else: files.append(discord.File(fp=io.BytesIO(response.content), filename=filename)) except Exception as e: self.logger.error(f"[DiscordAdapter] 下载附件失败: {attachment_url}, 错误: {e}") if content or files or embed: await channel.send(content=content, files=files if files else None, embed=embed) self.logger.success(f"[DiscordAdapter] 消息已发送到频道 {channel_id}") except Exception as e: self.logger.error(f"[DiscordAdapter] 发送消息失败: {e}") async def start_client(self, max_retries: int = -1, retry_delay: int = 5): """ 启动 Discord 客户端 Args: max_retries: 最大重连次数,-1 表示无限重连 retry_delay: 重连延迟(秒) """ if not DISCORD_AVAILABLE: self.logger.error("无法启动 Discord 客户端:discord.py 未安装") return retry_count = 0 while max_retries == -1 or retry_count < max_retries: try: self.logger.info("正在连接 Discord...") await self.start(self.token) except asyncio.CancelledError: self.logger.info("连接被取消") break except Exception as e: retry_count += 1 self.logger.error(f"Discord 连接失败: {e}") if max_retries != -1 and retry_count >= max_retries: self.logger.error(f"已达到最大重连次数 ({max_retries}),停止重连") break self.logger.info(f"将在 {retry_delay} 秒后重连 ({retry_count}/{max_retries if max_retries != -1 else '无限'})...") # 清理旧的连接状态 self.clear() await asyncio.sleep(retry_delay) self.logger.info("Discord 客户端已停止") async def start_heartbeat(self, interval: int = 30): """ 启动心跳机制,定期检查连接状态 Args: interval: 心跳间隔(秒) """ self.logger.info(f"心跳机制已启动,间隔: {interval}秒") while self.is_closed() is False: try: await asyncio.sleep(interval) # discord.py 的 ws 对象是 DiscordWebSocket,它没有 closed 属性 # 我们可以通过检查 self.is_closed() 或者 ws.open 来判断 if self.ws is not None and not getattr(self.ws, 'open', True): self.logger.warning("检测到 WebSocket 连接已关闭,触发重连...") await self.close() break self.logger.debug(f"心跳正常: {self.user}") except Exception as e: self.logger.error(f"心跳检测异常: {e}") break def start_heartbeat_task(self, interval: int = 30): """ 启动心跳任务(非阻塞) Args: interval: 心跳间隔(秒) """ if not hasattr(self, 'heartbeat_task') or self.heartbeat_task.done(): self.heartbeat_task = asyncio.create_task(self.start_heartbeat(interval)) self.logger.info("心跳任务已启动")