Files
NeoBot/plugins/cross_platform.py
K2Cr2O1 3814f49fcf feat(跨平台): 增强跨平台消息互通功能
- 支持合并转发消息解析和展示
- 优化附件处理逻辑,支持文件名和类型识别
- 添加 Discord Embed 卡片支持,提升消息展示效果
- 重构消息格式化和转发逻辑,提高可维护性
- 更新代理配置和日志级别设置
2026-03-15 16:48:26 +08:00

1016 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
跨平台消息互通插件
功能:
- Discord 频道与 QQ 群之间的消息互通
- 在消息中自动标注来源平台和子频道/群组 ID
- 支持 OneBot v11 协议和数据结构
- 支持图片、视频等媒体消息
- 支持合并转发消息
"""
import asyncio
import html
import json
import os
import re
import time
from typing import Dict, List, Optional, Any
from core.managers.command_manager import matcher
from models.events.message import GroupMessageEvent, PrivateMessageEvent, MessageEvent
from models.message import MessageSegment
from core.permission import Permission
from core.utils.logger import logger
from core.managers.redis_manager import redis_manager
# --- 配置 ---
# 跨平台映射配置
# 格式: {discord_channel_id: {"qq_group_id": qq_group_id, "name": "显示名称"}}
CROSS_PLATFORM_MAP: Dict[int, Dict[str, Any]] = {
# 示例配置:
# 123456789012345678: {"qq_group_id": 123456789, "name": "主群"},
# 987654321098765432: {"qq_group_id": 987654321, "name": "测试群"},
}
# Redis 通道名称
CROSS_PLATFORM_CHANNEL = "neobot_cross_platform"
# 是否启用跨平台转发
ENABLE_CROSS_PLATFORM = True
async def parse_forward_nodes(nodes: List[Dict[str, Any]]) -> tuple[str, List[dict]]:
"""
解析 OneBot 合并转发消息节点
Args:
nodes: 合并转发消息节点列表
Returns:
格式化后的消息内容和附件列表
"""
content_parts = []
attachments = []
for node in nodes:
if not isinstance(node, dict):
continue
node_data = node.get("data", {})
node_content = node_data.get("content", "")
# 获取发送者信息
sender_name = node_data.get("name", node_data.get("uin", "Unknown"))
# 解析节点内容
if isinstance(node_content, str):
# 检查是否是 [object Object] 格式OneBot 协议的特殊格式)
if "[object Object]" in node_content:
# OneBot 协议中,合并转发消息的 content 可能是 [object Object],[object Object]
# 实际的消息内容在 nodes 中,直接使用节点作为消息内容
content = f"[合并转发消息: {sender_name}]"
content_parts.append(f"**{sender_name}**:\n{content}")
elif '[CQ:' in node_content:
# CQ 码字符串格式
content = parse_cq_code(node_content, attachments)
content_parts.append(f"**{sender_name}**:\n{content}")
else:
content = node_content
content_parts.append(f"**{sender_name}**:\n{content}")
elif isinstance(node_content, list):
# MessageSegment 列表格式
content = parse_message_segments(node_content, attachments)
content_parts.append(f"**{sender_name}**:\n{content}")
# 组合完整消息
full_content = "\n\n".join(content_parts) if content_parts else ""
return full_content, attachments
def parse_cq_code(cq_code: str, attachments: List[dict]) -> str:
"""
解析 CQ 码字符串
Args:
cq_code: CQ 码字符串
attachments: 附件列表(用于添加图片/视频 URL
Returns:
解析后的文本内容
"""
import re
# 匹配 CQ 码
cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]'
matches = list(re.finditer(cq_pattern, cq_code))
if not matches:
return cq_code
result = []
last_end = 0
for match in matches:
if match.start() > last_end:
result.append(cq_code[last_end:match.start()])
cq_type = match.group(1)
cq_params_str = match.group(2) or ""
params = {}
if cq_params_str:
for param in cq_params_str.split(','):
if '=' in param:
k, v = param.split('=', 1)
params[k] = v
if cq_type == "text":
result.append(params.get("text", ""))
elif cq_type == "image":
file_url = params.get("url") or params.get("file")
if file_url:
file_name = params.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "image"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[图片: {file_name}]\n")
elif cq_type == "video":
file_url = params.get("url") or params.get("file")
if file_url:
file_name = params.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "video"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[视频: {file_name}]\n")
elif cq_type == "at":
qq_id = params.get("qq")
if qq_id == "all":
result.append("@所有人 ")
else:
result.append(f"@{qq_id} ")
elif cq_type == "face":
face_id = params.get("id", "")
result.append(f"[表情:{face_id}] ")
elif cq_type == "reply":
reply_id = params.get("id", "")
result.append(f"[回复:{reply_id}] ")
elif cq_type == "file":
file_url = params.get("file", "")
if file_url:
file_name = os.path.basename(str(file_url).split('?')[0]) or "file"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[文件: {file_name}]\n")
last_end = match.end()
if last_end < len(cq_code):
result.append(cq_code[last_end:])
return "".join(result)
def parse_message_segments(segments: List[Any], attachments: List[dict]) -> str:
"""
解析 MessageSegment 列表
Args:
segments: MessageSegment 列表
attachments: 附件列表(用于添加图片/视频 URL
Returns:
解析后的文本内容
"""
result = []
for seg in segments:
if isinstance(seg, str):
result.append(seg)
elif isinstance(seg, MessageSegment):
seg_type = seg.type
seg_data = seg.data
if seg_type == "text":
result.append(seg_data.get("text", ""))
elif seg_type == "image":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "image"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[图片: {file_name}]\n")
elif seg_type == "video":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "video"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[视频: {file_name}]\n")
elif seg_type == "at":
qq_id = seg_data.get("qq")
if qq_id == "all":
result.append("@所有人 ")
else:
result.append(f"@{qq_id} ")
elif seg_type == "face":
face_id = seg_data.get("id", "")
result.append(f"[表情:{face_id}] ")
elif seg_type == "reply":
reply_id = seg_data.get("id", "")
result.append(f"[回复:{reply_id}] ")
elif seg_type == "file":
file_url = seg_data.get("file", "")
if file_url:
file_name = os.path.basename(str(file_url).split('?')[0]) or "file"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[文件: {file_name}]\n")
elif seg_type == "json":
# 尝试解析 JSON 数据
json_data = seg_data.get("data", "")
try:
parsed = json.loads(json_data)
if isinstance(parsed, dict):
result.append(f"\n[JSON数据: {json_data[:100]}...]\n")
except:
result.append(f"\n[JSON数据]\n")
elif seg_type == "xml":
result.append(f"\n[XML数据]\n")
elif isinstance(seg, dict):
seg_type = seg.get("type")
seg_data = seg.get("data", {})
if seg_type == "text":
result.append(seg_data.get("text", ""))
elif seg_type == "image":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "image"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[图片: {file_name}]\n")
elif seg_type == "video":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "video"
attachments.append({"url": str(file_url), "filename": file_name})
result.append(f"\n[视频: {file_name}]\n")
elif seg_type == "at":
qq_id = seg_data.get("qq")
if qq_id == "all":
result.append("@所有人 ")
else:
result.append(f"@{qq_id} ")
return "".join(result)
def get_platform_info(platform: str, identifier: Any) -> str:
"""
获取平台信息字符串,用于在消息中标注来源
Args:
platform: 平台名称 ('discord''qq')
identifier: 频道 ID 或群组 ID
Returns:
格式化的平台信息字符串
"""
if platform == "discord":
channel_id = int(identifier)
if channel_id in CROSS_PLATFORM_MAP:
group_info = CROSS_PLATFORM_MAP[channel_id]
group_name = group_info.get("name", f"群组 {group_info['qq_group_id']}")
return f"[Discord {group_name}]"
return f"[Discord]"
elif platform == "qq":
group_id = int(identifier)
return f"[PAW qq]"
return ""
async def format_discord_to_qq_content(
discord_username: str,
discord_discriminator: str,
content: str,
channel_id: int,
attachments: List[dict] = None
) -> tuple[str, List[str]]:
"""
将 Discord 消息格式化为 QQ 消息格式
Args:
discord_username: Discord 用户名
discord_discriminator: Discord discriminator (如 #1234)
content: 消息内容
channel_id: Discord 频道 ID
attachments: 附件列表
Returns:
格式化后的消息内容和图片列表
"""
platform_info = get_platform_info("discord", channel_id)
# 构建消息头(简化版,只显示名字)
message_header = f"{platform_info}\n {discord_username}:"
# 构建消息体
message_body = content.strip() if content else ""
# 组合完整消息
if message_body:
full_message = f"{message_header}\n{message_body}"
else:
full_message = message_header
# 提取图片 URL
image_list = []
if attachments:
for att in attachments:
if isinstance(att, dict):
url = att.get("url", "")
if url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
image_list.append(url)
else:
image_list.append(str(att))
return full_message, image_list
async def format_qq_to_discord_content(
qq_nickname: str,
qq_user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[dict] = None
) -> tuple[str, List[dict], dict]:
"""
将 QQ 消息格式化为 Discord 消息格式Embed 卡片)
Args:
qq_nickname: QQ 昵称
qq_user_id: QQ 用户 ID
group_name: 群名称
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表
Returns:
格式化后的消息内容、附件列表和 Embed 字典
"""
platform_info = get_platform_info("qq", group_id)
# 构建 Embed 卡片
embed = {
"type": "rich",
"color": 0x5865F2, # Discord 蓝色
"author": {
"name": f"{platform_info} {qq_nickname}",
"icon_url": f"https://q1.qlogo.cn/g?b=qq&nk={qq_user_id}&s=640"
},
"description": content if content else "",
"timestamp": None,
"footer": {
"text": f"来自 QQPAW"
}
}
# 如果有附件,添加到 description
if attachments:
image_urls = []
other_urls = []
for att in attachments:
if att.get("url", "").lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
image_urls.append(att.get("url"))
else:
other_urls.append(att.get("url"))
if image_urls:
embed["description"] = f"{content}\n\n{chr(10).join(image_urls[:3])}" if content else chr(10).join(image_urls[:3])
if len(image_urls) > 3:
embed["description"] += f"\n...还有 {len(image_urls) - 3} 张图片"
# 附加文件列表
if other_urls:
file_list = "\n".join([f"📄 {os.path.basename(u.split('?')[0])}" for u in other_urls[:5]])
embed["description"] += f"\n\n**附加文件:**\n{file_list}"
if len(other_urls) > 5:
embed["description"] += f"\n...还有 {len(other_urls) - 5} 个文件"
# 对于合并转发消息content 为空,只发送 embed
# 对于普通消息content 也为空,只发送 embed
return "", attachments or [], embed
async def send_to_discord(channel_id: int, content: str, attachments: List[dict] = None, embed: dict = None):
"""
发送消息到 Discord 频道
通过 Redis 发布消息,由 Discord 适配器接收并发送
这样可以避免跨模块导入实例的问题
Args:
channel_id: Discord 频道 ID
content: 消息内容
attachments: 附件列表,每个元素为 {"url": str, "filename": str}
embed: Discord Embed 字典
"""
try:
publish_data = {
"type": "send_message",
"channel_id": channel_id,
"content": content,
"attachments": attachments or [],
"embed": embed
}
await redis_manager.redis.publish("neobot_discord_send", json.dumps(publish_data))
logger.info(f"[CrossPlatform] 消息已发布到 Redis 供 Discord 适配器发送: {channel_id}")
except Exception as e:
logger.error(f"[CrossPlatform] 发送消息到 Discord 失败: {e}")
async def send_to_qq(group_id: int, content: str, attachments: List[dict] = None):
"""
发送消息到 QQ 群
Args:
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表,每个元素为 {"url": str, "filename": str}
"""
try:
from core.managers.bot_manager import bot_manager
from models.message import MessageSegment
# 获取所有 QQ 机器人实例
all_bots = bot_manager.get_all_bots()
if not all_bots:
logger.error(f"[CrossPlatform] 没有可用的 QQ 机器人实例")
return
logger.debug(f"[CrossPlatform] 找到 {len(all_bots)} 个 QQ 机器人实例")
for bot in all_bots:
try:
# 构建消息
message = content
# 发送消息(如果有附件,使用 OneBot 的图片格式)
if attachments:
# 构建完整消息:文本 + 图片
from models.message import MessageSegment
full_message = []
if content:
full_message.append(MessageSegment.text(content))
for attachment in attachments:
if isinstance(attachment, dict):
attachment_url = attachment.get("url", "")
else:
attachment_url = str(attachment)
full_message.append(MessageSegment.image(attachment_url, cache=True, proxy=True, timeout=30))
logger.debug(f"[CrossPlatform] 准备发送消息到 QQ 群 {group_id}: {full_message}")
# 一次性发送
await bot.send_group_msg(group_id, full_message)
logger.info(f"[CrossPlatform] 消息已发送到 QQ 群 {group_id}")
else:
# 只发送文本
await bot.send_group_msg(group_id, message)
logger.info(f"[CrossPlatform] 消息已发送到 QQ 群 {group_id}")
break # 只需要发送一次
except Exception as e:
logger.error(f"[CrossPlatform] 发送消息到 QQ 群 {group_id} 失败: {e}")
except Exception as e:
logger.error(f"[CrossPlatform] 发送消息到 QQ 失败: {e}")
async def forward_discord_to_qq(
discord_username: str,
discord_discriminator: str,
content: str,
channel_id: int,
attachments: List[dict] = None
):
"""
将 Discord 消息转发到所有映射的 QQ 群
Args:
discord_username: Discord 用户名
discord_discriminator: Discord discriminator
content: 消息内容
channel_id: Discord 频道 ID
attachments: 附件列表
"""
if channel_id not in CROSS_PLATFORM_MAP:
logger.warning(f"[CrossPlatform] 未找到 Discord 频道 {channel_id} 的映射配置")
return
group_info = CROSS_PLATFORM_MAP[channel_id]
target_qq_group = group_info["qq_group_id"]
# 格式化消息
formatted_content, image_list = await format_discord_to_qq_content(
discord_username,
discord_discriminator,
content,
channel_id,
attachments
)
# 发送到 QQ
await send_to_qq(target_qq_group, formatted_content, image_list)
logger.success(f"[CrossPlatform] Discord 频道 {channel_id} -> QQ 群 {target_qq_group}")
async def forward_qq_to_discord(
qq_nickname: str,
qq_user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[dict] = None
):
"""
将 QQ 消息转发到所有映射的 Discord 频道
Args:
qq_nickname: QQ 昵称
qq_user_id: QQ 用户 ID
group_name: 群名称
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表
"""
# 查找映射的 Discord 频道
target_channels = []
for discord_channel_id, info in CROSS_PLATFORM_MAP.items():
if info["qq_group_id"] == group_id:
target_channels.append(discord_channel_id)
if not target_channels:
logger.warning(f"[CrossPlatform] 未找到 QQ 群 {group_id} 的映射配置")
return
# 格式化消息
formatted_content, image_list, embed = await format_qq_to_discord_content(
qq_nickname,
qq_user_id,
group_name,
group_id,
content,
attachments
)
# 发送到所有映射的 Discord 频道
for channel_id in target_channels:
await send_to_discord(channel_id, formatted_content, image_list, embed)
logger.success(f"[CrossPlatform] QQ 群 {group_id} -> Discord 频道 {target_channels}")
async def publish_to_redis(platform: str, data: dict):
"""
通过 Redis 发布跨平台消息
Args:
platform: 平台名称
data: 消息数据
"""
try:
if redis_manager.redis:
publish_data = {
"platform": platform,
"data": data,
"timestamp": int(__import__('time').time())
}
await redis_manager.redis.publish(CROSS_PLATFORM_CHANNEL, json.dumps(publish_data))
logger.debug(f"[CrossPlatform] 已通过 Redis 发布消息: platform={platform}")
except Exception as e:
logger.error(f"[CrossPlatform] Redis 发布失败: {e}")
async def handle_discord_message(
username: str,
discriminator: str,
content: str,
channel_id: int,
attachments: List[dict] = None,
embed: dict = None
):
"""
处理 Discord 消息并转发
Args:
username: Discord 用户名
discriminator: Discord discriminator
content: 消息内容
channel_id: Discord 频道 ID
attachments: 附件列表
embed: Discord Embed 字典
"""
if not ENABLE_CROSS_PLATFORM:
return
logger.info(f"[CrossPlatform] 收到 Discord 消息: {username}#{discriminator} in {channel_id}")
# 转发到映射的 QQ 群
await forward_discord_to_qq(username, discriminator, content, channel_id, attachments)
async def handle_qq_message(
nickname: str,
user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[dict] = None
):
"""
处理 QQ 消息并转发
Args:
nickname: QQ 昵称
user_id: QQ 用户 ID
group_name: 群名称
group_id: QQ 群 ID
content: 消息内容
attachments: 附件列表
"""
if not ENABLE_CROSS_PLATFORM:
return
logger.info(f"[CrossPlatform] 收到 QQ 消息: {nickname} ({user_id}) in {group_name}({group_id})")
# 转发到映射的 Discord 频道
await forward_qq_to_discord(nickname, user_id, group_name, group_id, content, attachments)
@matcher.on_message()
async def handle_qq_group_message(event: GroupMessageEvent):
"""
处理 QQ 群消息,转发到 Discord
"""
if not ENABLE_CROSS_PLATFORM:
return
# 检查是否是映射的群组
group_id = event.group_id
mapped_channel = None
for discord_channel_id, info in CROSS_PLATFORM_MAP.items():
if info["qq_group_id"] == group_id:
mapped_channel = discord_channel_id
break
if mapped_channel is None:
return
# 提取消息内容
content = ""
attachments = []
if isinstance(event.message, list):
# 检查是否是合并转发消息
has_forward_node = any(isinstance(seg, MessageSegment) and seg.type == "node" for seg in event.message)
if has_forward_node:
# 解析合并转发消息
forward_nodes = [seg for seg in event.message if isinstance(seg, MessageSegment) and seg.type == "node"]
# 将 MessageSegment 转换为字典格式
forward_nodes_dict = [{"type": seg.type, "data": seg.data} for seg in forward_nodes]
content, attachments = await parse_forward_nodes(forward_nodes_dict)
else:
# 普通消息
for segment in event.message:
if isinstance(segment, MessageSegment):
if segment.type == "text":
content += segment.data.get("text", "")
elif segment.type == "image":
file_url = segment.data.get("url") or segment.data.get("file")
file_name = segment.data.get("file", "")
if file_url:
file_url = html.unescape(str(file_url))
if not file_name:
file_name = os.path.basename(file_url.split('?')[0]) or f"image_{len(attachments)}.jpg"
attachments.append({"url": file_url, "filename": file_name})
elif segment.type == "video":
file_url = segment.data.get("url") or segment.data.get("file")
file_name = segment.data.get("file", "")
if file_url:
file_url = html.unescape(str(file_url))
if not file_name:
file_name = os.path.basename(file_url.split('?')[0]) or f"video_{len(attachments)}.mp4"
attachments.append({"url": file_url, "filename": file_name})
elif segment.type == "at":
qq_id = segment.data.get("qq")
if qq_id and qq_id != "all":
content += f"@{qq_id} "
elif qq_id == "all":
content += "@所有人 "
elif isinstance(segment, str):
content += segment
elif isinstance(event.message, str):
content = event.message
# 清理多余空白
content = content.strip()
# 获取群名称
group_name = ""
try:
group_info = await event.bot.get_group_info(event.group_id)
group_name = group_info.get("group_name", "")
except Exception:
group_name = f"{group_id}"
# 处理消息
await handle_qq_message(
nickname=event.sender.nickname or event.sender.card or str(event.user_id),
user_id=event.user_id,
group_name=group_name,
group_id=group_id,
content=content,
attachments=attachments
)
@matcher.on_message()
async def handle_discord_message_event(event: Any):
"""
处理 Discord 消息事件(通过适配器注入)
"""
if not ENABLE_CROSS_PLATFORM:
return
# 检查事件是否包含 Discord 特定信息
if not hasattr(event, '_is_discord_message'):
return
discord_channel_id = getattr(event, 'discord_channel_id', None)
if discord_channel_id is None:
return
# 提取消息内容
content = event.raw_message or ""
attachments = []
# 从 raw_message 中提取附件 URLDiscord 附件已添加到 raw_message
import re
url_pattern = r'https?://[^\s<>"]+|www\.\S+'
raw_message_lines = content.split('\n')
content_lines = []
for line in raw_message_lines:
line = line.strip()
if re.match(url_pattern, line):
# 这是附件 URL
attachment_url = line
# 尝试从 URL 提取文件名
filename = os.path.basename(attachment_url.split('?')[0]) or "attachment"
attachment_item = {"url": attachment_url, "filename": filename}
if attachment_item not in attachments:
attachments.append(attachment_item)
else:
# 这是普通文本内容
if line:
content_lines.append(line)
content = '\n'.join(content_lines).strip()
# 从 message 列表中提取(备用方案)
if hasattr(event, 'message') and isinstance(event.message, list):
for segment in event.message:
if isinstance(segment, MessageSegment):
if segment.type == "text":
pass # 已经在 raw_message 中
elif segment.type == "image":
file_url = segment.data.get("url") or segment.data.get("file")
file_name = segment.data.get("file", "")
if file_url:
file_name = file_name or os.path.basename(str(file_url).split('?')[0]) or "image"
attachment_item = {"url": str(file_url), "filename": file_name}
if attachment_item not in attachments:
attachments.append(attachment_item)
elif segment.type == "video":
file_url = segment.data.get("url") or segment.data.get("file")
file_name = segment.data.get("file", "")
if file_url:
file_name = file_name or os.path.basename(str(file_url).split('?')[0]) or "video"
attachment_item = {"url": str(file_url), "filename": file_name}
if attachment_item not in attachments:
attachments.append(attachment_item)
# 获取用户信息
discord_username = getattr(event, 'discord_username', 'Unknown')
discord_discriminator = getattr(event, 'discord_discriminator', '')
# 处理消息
await handle_discord_message(
username=discord_username,
discriminator=discord_discriminator,
content=content,
channel_id=discord_channel_id,
attachments=attachments,
embed=None
)
async def cross_platform_subscription_loop():
"""
Redis 跨平台消息订阅循环
"""
if redis_manager.redis is None:
logger.warning("[CrossPlatform] Redis 未初始化,无法启动订阅")
return
try:
pubsub = redis_manager.redis.pubsub()
await pubsub.subscribe(CROSS_PLATFORM_CHANNEL)
logger.success("[CrossPlatform] 已订阅 Redis 跨平台频道")
async for message in pubsub.listen():
if message["type"] == "message":
try:
data = json.loads(message["data"])
platform = data.get("platform", "")
message_data = data.get("data", {})
logger.info(f"[CrossPlatform] 收到跨平台消息: {platform}")
if platform == "discord":
# 从 Discord 转发到 QQ
await forward_discord_to_qq(
discord_username=message_data.get("username", "Unknown"),
discord_discriminator=message_data.get("discriminator", ""),
content=message_data.get("content", ""),
channel_id=message_data.get("channel_id", 0),
attachments=message_data.get("attachments", [])
)
elif platform == "qq":
# 从 QQ 转发到 Discord
await forward_qq_to_discord(
qq_nickname=message_data.get("nickname", "Unknown"),
qq_user_id=message_data.get("user_id", 0),
group_name=message_data.get("group_name", ""),
group_id=message_data.get("group_id", 0),
content=message_data.get("content", ""),
attachments=message_data.get("attachments", []),
embed=message_data.get("embed")
)
except json.JSONDecodeError as e:
logger.error(f"[CrossPlatform] 解析消息失败: {e}")
except Exception as e:
logger.error(f"[CrossPlatform] 处理跨平台消息失败: {e}")
except Exception as e:
logger.error(f"[CrossPlatform] 订阅循环异常: {e}")
# 全局订阅任务
_subscription_task = None
async def start_cross_platform_subscription():
"""
启动跨平台消息订阅
"""
global _subscription_task
if _subscription_task is None and ENABLE_CROSS_PLATFORM:
_subscription_task = asyncio.create_task(cross_platform_subscription_loop())
logger.success("[CrossPlatform] 跨平台消息订阅已启动")
async def stop_cross_platform_subscription():
"""
停止跨平台消息订阅
"""
global _subscription_task
if _subscription_task:
_subscription_task.cancel()
try:
await _subscription_task
except asyncio.CancelledError:
pass
_subscription_task = None
logger.info("[CrossPlatform] 跨平台消息订阅已停止")
async def reload_config():
"""
重新加载配置
"""
global CROSS_PLATFORM_MAP, ENABLE_CROSS_PLATFORM
try:
import os
config_path = os.path.join(os.path.dirname(__file__), "..", "config.toml")
if os.path.exists(config_path):
try:
import tomllib
except ImportError:
import tomli as tomllib
with open(config_path, "rb") as f:
config = tomllib.load(f)
cross_platform_config = config.get("cross_platform", {})
ENABLE_CROSS_PLATFORM = cross_platform_config.get("enabled", True)
# 重新加载映射配置(支持两种格式)
mappings = cross_platform_config.get("mappings", {})
CROSS_PLATFORM_MAP = {}
# 格式1: [cross_platform.mappings.123456789012345678] 子表形式
if isinstance(mappings, dict) and mappings:
for key, value in mappings.items():
if isinstance(value, dict) and "qq_group_id" in value:
try:
discord_id = int(key) if str(key).isdigit() else int(str(key).split('.')[-1])
CROSS_PLATFORM_MAP[discord_id] = {
"qq_group_id": int(value.get("qq_group_id", 0)),
"name": value.get("name", "")
}
except (ValueError, AttributeError):
continue
# 格式2: 旧的字典形式(向后兼容)
if not CROSS_PLATFORM_MAP:
for key, value in mappings.items():
if isinstance(key, str) and key.isdigit():
CROSS_PLATFORM_MAP[int(key)] = {
"qq_group_id": int(value.get("qq_group_id", 0)),
"name": value.get("name", "")
}
logger.success(f"[CrossPlatform] 配置已重新加载: {len(CROSS_PLATFORM_MAP)} 个映射")
except Exception as e:
logger.error(f"[CrossPlatform] 重新加载配置失败: {e}")
# 插件加载时自动启动和加载配置
import asyncio
try:
asyncio.create_task(reload_config())
except Exception as e:
logger.error(f"[CrossPlatform] 重新加载配置失败: {e}")
try:
asyncio.create_task(start_cross_platform_subscription())
except Exception as e:
logger.error(f"[CrossPlatform] 启动订阅失败: {e}")
# 命令处理器
@matcher.command("cross_config", "跨平台配置", permission=Permission.ADMIN)
async def cross_config_command(event: MessageEvent):
"""
查看跨平台配置
"""
if not ENABLE_CROSS_PLATFORM:
await event.reply("跨平台功能已禁用")
return
config_lines = ["=== 跨平台映射配置 ==="]
if not CROSS_PLATFORM_MAP:
config_lines.append("当前没有配置任何映射")
else:
for discord_id, info in CROSS_PLATFORM_MAP.items():
discord_channel = f"Discord: {discord_id}"
qq_group = f"QQ: {info['qq_group_id']}"
name = info.get("name", "")
if name:
config_lines.append(f"{discord_channel}{qq_group} ({name})")
else:
config_lines.append(f"{discord_channel}{qq_group}")
await event.reply("\n".join(config_lines))
@matcher.command("cross_reload", "跨平台重载", permission=Permission.ADMIN)
async def cross_reload_command(event: MessageEvent):
"""
重新加载跨平台配置
"""
await reload_config()
await event.reply("跨平台配置已重载")
# 清理函数
def cleanup():
"""清理资源"""
asyncio.create_task(stop_cross_platform_subscription())