Files
NeoBot/adapters/router.py
K2Cr2O1 f868553342 feat: 添加Discord适配器与跨平台消息互通功能
新增Discord适配器支持,实现Discord与QQ之间的消息互通
添加通用数据模型用于跨平台消息转换
扩展配置系统以支持Discord和日志配置
重构日志系统以使用配置中的日志级别
在反向WebSocket管理器中注册Bot实例
更新主程序以支持Discord客户端启动
添加测试脚本验证核心功能
2026-03-15 13:36:17 +08:00

407 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
事件路由与转换器 (Event Router & Converter)
此模块负责在不同平台(如 Discord和 OneBot 业务逻辑之间进行数据转换。
核心目标是:**让现有的 OneBot 插件(如 bili.py在不修改任何代码的情况下能够处理 Discord 消息。**
实现原理:
1. 接收 Discord 消息 (`discord.Message`)。
2. 将其“伪装”成 OneBot 的 `GroupMessageEvent` 或 `PrivateMessageEvent`。
3. 拦截插件调用的 `event.reply()` 方法。
4. 将插件返回的 OneBot `MessageSegment` 转换为 Discord 格式并发送。
"""
import asyncio
from typing import Union, List, Any, Optional
try:
import discord
DISCORD_AVAILABLE = True
except ImportError:
DISCORD_AVAILABLE = False
from models.events.message import GroupMessageEvent, PrivateMessageEvent
from models.message import MessageSegment as OneBotMessageSegment
from models.sender import Sender
from core.utils.logger import ModuleLogger
logger = ModuleLogger("EventRouter")
class DiscordToOneBotConverter:
"""
将 Discord 消息转换为 OneBot 消息事件的转换器。
"""
@staticmethod
def create_mock_event(discord_message: 'discord.Message', adapter: Any) -> Union[GroupMessageEvent, PrivateMessageEvent]:
"""
将 discord.Message 伪装成 OneBot 的 MessageEvent。
Args:
discord_message: 原始的 Discord 消息对象
adapter: DiscordAdapter 实例,用于回调发送消息
Returns:
伪装后的 OneBot 事件对象
"""
# 1. 提取基础信息
user_id = discord_message.author.id
message_id = discord_message.id
# 处理 Discord 的 raw_message
# 如果消息是以 @机器人 开头Discord 的 content 会是 "<@机器人ID> /echo 1"
# 我们需要把前面的 @ 提及去掉,否则命令匹配器 (matcher) 无法识别以 "/" 开头的命令
raw_message = discord_message.content
# 添加附件信息到 raw_message
if discord_message.attachments:
for attachment in discord_message.attachments:
raw_message += f"\n{attachment.url}"
bot_mention = f"<@{adapter.user.id}>"
if raw_message.startswith(bot_mention):
raw_message = raw_message[len(bot_mention):].strip()
# 构造发送者信息
sender = Sender(
user_id=user_id,
nickname=discord_message.author.display_name,
card=getattr(discord_message.author, 'nick', ''), # 群名片
role="member" # 简化处理,默认都是普通成员
)
# 2. 判断是群聊还是私聊
is_private = isinstance(discord_message.channel, discord.DMChannel)
# 构造 message 列表 (将纯文本转换为 MessageSegment)
message_list = [OneBotMessageSegment.text(raw_message)]
import time
current_time = int(time.time())
self_id = adapter.user.id if adapter.user else 0
# 注入 Discord 特定信息(用于跨平台插件识别)
discord_channel_id = discord_message.channel.id if not isinstance(discord_message.channel, discord.DMChannel) else None
discord_username = discord_message.author.name
discord_discriminator = f"#{discord_message.author.discriminator}" if discord_message.author.discriminator != "0" else ""
if is_private:
# 构造私聊事件
event = PrivateMessageEvent(
time=current_time,
self_id=self_id,
message_type="private",
sub_type="friend",
message_id=message_id,
user_id=user_id,
raw_message=raw_message,
message=message_list,
sender=sender
)
else:
# 构造群聊事件
group_id = discord_message.channel.id
event = GroupMessageEvent(
time=current_time,
self_id=self_id,
message_type="group",
sub_type="normal",
message_id=message_id,
user_id=user_id,
group_id=group_id,
raw_message=raw_message,
message=message_list,
sender=sender
)
# 注入 Discord 特定属性(用于跨平台插件识别)
event._is_discord_message = True
event.discord_channel_id = discord_channel_id
event.discord_username = discord_username
event.discord_discriminator = discord_discriminator
# 3. 拦截并重写 reply 方法 (核心魔法)
# 插件调用 event.reply() 时,实际上会执行这个闭包
async def mock_reply(message: Union[str, OneBotMessageSegment, List[OneBotMessageSegment]], auto_escape: bool = False):
await DiscordToOneBotConverter.send_discord_reply(discord_message, message, adapter)
# 覆盖实例方法
event.reply = mock_reply
# 注入一个假的 bot 对象,防止插件调用 event.bot.xxx 时报错
# 这里只提供最基础的属性,如果插件调用了复杂的 API可能会报错
class MockBot:
def __init__(self):
self.self_id = adapter.user.id if adapter.user else 0
async def send(self, event, message, **kwargs):
await DiscordToOneBotConverter.send_discord_reply(discord_message, message, adapter)
async def send_forwarded_messages(self, target, nodes):
"""
模拟发送合并转发消息。
Discord 不支持像 QQ 那样的合并转发,所以我们将其转换为普通消息发送。
"""
content = ""
files = []
for node in nodes:
if node.get("type") == "node":
node_data = node.get("data", {})
node_content = node_data.get("content", [])
# 提取节点中的文本和图片
if isinstance(node_content, str):
# 尝试解析 CQ 码
import re
cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]'
matches = list(re.finditer(cq_pattern, node_content))
if not matches:
content += f"{node_content}\n"
else:
last_end = 0
for match in matches:
if match.start() > last_end:
content += node_content[last_end:match.start()]
cq_type = match.group(1)
cq_params_str = match.group(2) or ""
params = {}
if cq_params_str:
for param in cq_params_str.split(','):
if '=' in param:
k, v = param.split('=', 1)
params[k] = v
if cq_type in ("image", "video"):
file_url = params.get("url") or params.get("file")
if file_url:
if str(file_url).startswith("http"):
content += f"\n{file_url}\n"
elif str(file_url).startswith("base64://"):
import base64
import io
b64_data = str(file_url)[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
elif cq_type == "at":
qq_id = params.get("qq")
if qq_id == "all":
content += "@everyone "
else:
content += f"<@{qq_id}> "
last_end = match.end()
if last_end < len(node_content):
content += node_content[last_end:]
content += "\n"
elif isinstance(node_content, list):
for seg in node_content:
if isinstance(seg, dict):
seg_type = seg.get("type")
seg_data = seg.get("data", {})
if seg_type == "text":
content += seg_data.get("text", "")
elif seg_type == "image" or seg_type == "video":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
if isinstance(file_url, bytes):
import io
try:
files.append(discord.File(fp=io.BytesIO(file_url), filename="image.png"))
except Exception as e:
logger.error(f"解析 bytes 图片失败: {e}")
elif str(file_url).startswith("http"):
content += f"\n{file_url}\n"
elif str(file_url).startswith("base64://") or "data:image" in str(file_url):
import base64
import io
b64_data = str(file_url)
if b64_data.startswith("base64://"):
b64_data = b64_data[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
content += "\n"
try:
if content or files:
await discord_message.channel.send(content=content, files=files if files else None)
except Exception as e:
logger.error(f"发送 Discord 合并转发消息失败: {e}")
event.bot = MockBot()
return event
@staticmethod
async def send_discord_reply(
original_message: 'discord.Message',
message: Union[str, OneBotMessageSegment, List[OneBotMessageSegment]],
adapter: Any
):
"""
将 OneBot 的消息段转换为 Discord 格式并发送。
Args:
original_message: 触发此回复的原始 Discord 消息
message: 插件返回的 OneBot 消息内容 (字符串或 MessageSegment 列表)
adapter: DiscordAdapter 实例
"""
content = ""
files = []
# 统一转换为列表处理
if not isinstance(message, list):
message = [message]
import re
for segment in message:
if isinstance(segment, str):
# 尝试解析 CQ 码
cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]'
matches = list(re.finditer(cq_pattern, segment))
if not matches:
content += segment
continue
last_end = 0
for match in matches:
# 添加 CQ 码之前的纯文本
if match.start() > last_end:
content += segment[last_end:match.start()]
cq_type = match.group(1)
cq_params_str = match.group(2) or ""
# 解析参数
params = {}
if cq_params_str:
for param in cq_params_str.split(','):
if '=' in param:
k, v = param.split('=', 1)
params[k] = v
if cq_type in ("image", "video"):
file_url = params.get("url") or params.get("file")
if file_url:
if str(file_url).startswith("http"):
content += f"\n{file_url}"
elif str(file_url).startswith("base64://") or "data:image" in str(file_url):
import base64
import io
b64_data = str(file_url)
if b64_data.startswith("base64://"):
b64_data = b64_data[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
elif cq_type == "at":
qq_id = params.get("qq")
if qq_id == "all":
content += "@everyone "
else:
content += f"<@{qq_id}> "
last_end = match.end()
# 添加最后一个 CQ 码之后的纯文本
if last_end < len(segment):
content += segment[last_end:]
elif isinstance(segment, OneBotMessageSegment):
# 解析 OneBot 的 MessageSegment
seg_type = segment.type
seg_data = segment.data
if seg_type == "text":
content += seg_data.get("text", "")
elif seg_type == "image" or seg_type == "video":
# OneBot 的图片/视频通常有 file (URL或本地路径) 或 url 字段
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
# 处理 bytes 类型
if isinstance(file_url, bytes):
import io
try:
files.append(discord.File(fp=io.BytesIO(file_url), filename="image.png"))
except Exception as e:
logger.error(f"解析 bytes 图片失败: {e}")
elif str(file_url).startswith("http"):
# 如果是网络 URL直接拼接到文本中Discord 会自动解析预览
content += f"\n{file_url}"
elif str(file_url).startswith("base64://") or "data:image" in str(file_url):
# 处理 Base64 图片 (需要解码并作为文件上传)
import base64
import io
b64_data = str(file_url)
if b64_data.startswith("base64://"):
b64_data = b64_data[9:]
if b64_data.startswith("data:image"):
b64_data = b64_data.split(",", 1)[1]
try:
image_bytes = base64.b64decode(b64_data)
files.append(discord.File(fp=io.BytesIO(image_bytes), filename="image.png"))
except Exception as e:
logger.error(f"解析 Base64 图片失败: {e}")
else:
# 假设是本地文件路径
try:
files.append(discord.File(file_url))
except Exception as e:
logger.error(f"无法读取本地文件 {file_url}: {e}")
elif seg_type == "at":
qq_id = seg_data.get("qq")
if qq_id == "all":
content += "@everyone "
else:
# 尝试将 QQ 号映射回 Discord ID (这里简单处理,直接拼接)
content += f"<@{qq_id}> "
elif seg_type == "reply":
# 忽略回复段,或者你可以尝试映射 message_id
pass
# 发送消息到 Discord
try:
# 如果内容为空但有文件Discord 允许发送
if content or files:
await original_message.channel.send(content=content, files=files if files else None)
else:
logger.warning("尝试发送空消息到 Discord已拦截")
except Exception as e:
logger.error(f"发送 Discord 消息失败: {e}")