# -*- coding: utf-8 -*- """ 跨平台消息互通插件 功能: - Discord 频道与 QQ 群之间的消息互通 - 在消息中自动标注来源平台和子频道/群组 ID - 支持 OneBot v11 协议和数据结构 - 支持图片、视频等媒体消息 - 支持合并转发消息 """ import asyncio import html import json import os import re import time from typing import Dict, List, Optional, Any from core.managers.command_manager import matcher from models.events.message import GroupMessageEvent, PrivateMessageEvent, MessageEvent from models.message import MessageSegment from core.permission import Permission from core.utils.logger import logger from core.managers.redis_manager import redis_manager # --- 配置 --- # 跨平台映射配置 # 格式: {discord_channel_id: {"qq_group_id": qq_group_id, "name": "显示名称"}} CROSS_PLATFORM_MAP: Dict[int, Dict[str, Any]] = { # 示例配置: # 123456789012345678: {"qq_group_id": 123456789, "name": "主群"}, # 987654321098765432: {"qq_group_id": 987654321, "name": "测试群"}, } # Redis 通道名称 CROSS_PLATFORM_CHANNEL = "neobot_cross_platform" # 是否启用跨平台转发 ENABLE_CROSS_PLATFORM = True async def parse_forward_nodes(nodes: List[Dict[str, Any]]) -> tuple[str, List[dict]]: """ 解析 OneBot 合并转发消息节点 Args: nodes: 合并转发消息节点列表 Returns: 格式化后的消息内容和附件列表 """ content_parts = [] attachments = [] for node in nodes: if not isinstance(node, dict): continue node_data = node.get("data", {}) node_content = node_data.get("content", "") # 获取发送者信息 sender_name = node_data.get("name", node_data.get("uin", "Unknown")) # 解析节点内容 if isinstance(node_content, str): # 检查是否是 [object Object] 格式(OneBot 协议的特殊格式) if "[object Object]" in node_content: # OneBot 协议中,合并转发消息的 content 可能是 [object Object],[object Object] # 实际的消息内容在 nodes 中,直接使用节点作为消息内容 content = f"[合并转发消息: {sender_name}]" content_parts.append(f"**{sender_name}**:\n{content}") elif '[CQ:' in node_content: # CQ 码字符串格式 content = parse_cq_code(node_content, attachments) content_parts.append(f"**{sender_name}**:\n{content}") else: content = node_content content_parts.append(f"**{sender_name}**:\n{content}") elif isinstance(node_content, list): # MessageSegment 列表格式 content = parse_message_segments(node_content, attachments) content_parts.append(f"**{sender_name}**:\n{content}") # 组合完整消息 full_content = "\n\n".join(content_parts) if content_parts else "" return full_content, attachments def parse_cq_code(cq_code: str, attachments: List[dict]) -> str: """ 解析 CQ 码字符串 Args: cq_code: CQ 码字符串 attachments: 附件列表(用于添加图片/视频 URL) Returns: 解析后的文本内容 """ import re # 匹配 CQ 码 cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]' matches = list(re.finditer(cq_pattern, cq_code)) if not matches: return cq_code result = [] last_end = 0 for match in matches: if match.start() > last_end: result.append(cq_code[last_end:match.start()]) cq_type = match.group(1) cq_params_str = match.group(2) or "" params = {} if cq_params_str: for param in cq_params_str.split(','): if '=' in param: k, v = param.split('=', 1) params[k] = v if cq_type == "text": result.append(params.get("text", "")) elif cq_type == "image": file_url = params.get("url") or params.get("file") if file_url: file_name = params.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "image" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[图片: {file_name}]\n") elif cq_type == "video": file_url = params.get("url") or params.get("file") if file_url: file_name = params.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "video" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[视频: {file_name}]\n") elif cq_type == "at": qq_id = params.get("qq") if qq_id == "all": result.append("@所有人 ") else: result.append(f"@{qq_id} ") elif cq_type == "face": face_id = params.get("id", "") result.append(f"[表情:{face_id}] ") elif cq_type == "reply": reply_id = params.get("id", "") result.append(f"[回复:{reply_id}] ") elif cq_type == "file": file_url = params.get("file", "") if file_url: file_name = os.path.basename(str(file_url).split('?')[0]) or "file" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[文件: {file_name}]\n") last_end = match.end() if last_end < len(cq_code): result.append(cq_code[last_end:]) return "".join(result) def parse_message_segments(segments: List[Any], attachments: List[dict]) -> str: """ 解析 MessageSegment 列表 Args: segments: MessageSegment 列表 attachments: 附件列表(用于添加图片/视频 URL) Returns: 解析后的文本内容 """ result = [] for seg in segments: if isinstance(seg, str): result.append(seg) elif isinstance(seg, MessageSegment): seg_type = seg.type seg_data = seg.data if seg_type == "text": result.append(seg_data.get("text", "")) elif seg_type == "image": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "image" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[图片: {file_name}]\n") elif seg_type == "video": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "video" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[视频: {file_name}]\n") elif seg_type == "at": qq_id = seg_data.get("qq") if qq_id == "all": result.append("@所有人 ") else: result.append(f"@{qq_id} ") elif seg_type == "face": face_id = seg_data.get("id", "") result.append(f"[表情:{face_id}] ") elif seg_type == "reply": reply_id = seg_data.get("id", "") result.append(f"[回复:{reply_id}] ") elif seg_type == "file": file_url = seg_data.get("file", "") if file_url: file_name = os.path.basename(str(file_url).split('?')[0]) or "file" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[文件: {file_name}]\n") elif seg_type == "json": # 尝试解析 JSON 数据 json_data = seg_data.get("data", "") try: parsed = json.loads(json_data) if isinstance(parsed, dict): result.append(f"\n[JSON数据: {json_data[:100]}...]\n") except: result.append(f"\n[JSON数据]\n") elif seg_type == "xml": result.append(f"\n[XML数据]\n") elif isinstance(seg, dict): seg_type = seg.get("type") seg_data = seg.get("data", {}) if seg_type == "text": result.append(seg_data.get("text", "")) elif seg_type == "image": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "image" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[图片: {file_name}]\n") elif seg_type == "video": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "video" attachments.append({"url": str(file_url), "filename": file_name}) result.append(f"\n[视频: {file_name}]\n") elif seg_type == "at": qq_id = seg_data.get("qq") if qq_id == "all": result.append("@所有人 ") else: result.append(f"@{qq_id} ") return "".join(result) def get_platform_info(platform: str, identifier: Any) -> str: """ 获取平台信息字符串,用于在消息中标注来源 Args: platform: 平台名称 ('discord' 或 'qq') identifier: 频道 ID 或群组 ID Returns: 格式化的平台信息字符串 """ if platform == "discord": channel_id = int(identifier) if channel_id in CROSS_PLATFORM_MAP: group_info = CROSS_PLATFORM_MAP[channel_id] group_name = group_info.get("name", f"群组 {group_info['qq_group_id']}") return f"[Discord {group_name}]" return f"[Discord]" elif platform == "qq": group_id = int(identifier) return f"[PAW qq]" return "" async def format_discord_to_qq_content( discord_username: str, discord_discriminator: str, content: str, channel_id: int, attachments: List[dict] = None ) -> tuple[str, List[str]]: """ 将 Discord 消息格式化为 QQ 消息格式 Args: discord_username: Discord 用户名 discord_discriminator: Discord discriminator (如 #1234) content: 消息内容 channel_id: Discord 频道 ID attachments: 附件列表 Returns: 格式化后的消息内容和图片列表 """ platform_info = get_platform_info("discord", channel_id) # 构建消息头(简化版,只显示名字) message_header = f"{platform_info}\n {discord_username}:" # 构建消息体 message_body = content.strip() if content else "" # 组合完整消息 if message_body: full_message = f"{message_header}\n{message_body}" else: full_message = message_header # 提取图片 URL image_list = [] if attachments: for att in attachments: if isinstance(att, dict): url = att.get("url", "") if url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): image_list.append(url) else: image_list.append(str(att)) return full_message, image_list async def format_qq_to_discord_content( qq_nickname: str, qq_user_id: int, group_name: str, group_id: int, content: str, attachments: List[dict] = None ) -> tuple[str, List[dict], dict]: """ 将 QQ 消息格式化为 Discord 消息格式(Embed 卡片) Args: qq_nickname: QQ 昵称 qq_user_id: QQ 用户 ID group_name: 群名称 group_id: QQ 群 ID content: 消息内容 attachments: 附件列表 Returns: 格式化后的消息内容、附件列表和 Embed 字典 """ platform_info = get_platform_info("qq", group_id) # 构建 Embed 卡片 embed = { "type": "rich", "color": 0x5865F2, # Discord 蓝色 "author": { "name": f"{platform_info} {qq_nickname}", "icon_url": f"https://q1.qlogo.cn/g?b=qq&nk={qq_user_id}&s=640" }, "description": content if content else "", "timestamp": None, "footer": { "text": f"来自 QQPAW" } } # 如果有附件,添加到 description if attachments: image_urls = [] other_urls = [] for att in attachments: if att.get("url", "").lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): image_urls.append(att.get("url")) else: other_urls.append(att.get("url")) if image_urls: embed["description"] = f"{content}\n\n{chr(10).join(image_urls[:3])}" if content else chr(10).join(image_urls[:3]) if len(image_urls) > 3: embed["description"] += f"\n...还有 {len(image_urls) - 3} 张图片" # 附加文件列表 if other_urls: file_list = "\n".join([f"📄 {os.path.basename(u.split('?')[0])}" for u in other_urls[:5]]) embed["description"] += f"\n\n**附加文件:**\n{file_list}" if len(other_urls) > 5: embed["description"] += f"\n...还有 {len(other_urls) - 5} 个文件" # 对于合并转发消息,content 为空,只发送 embed # 对于普通消息,content 也为空,只发送 embed return "", attachments or [], embed async def send_to_discord(channel_id: int, content: str, attachments: List[dict] = None, embed: dict = None): """ 发送消息到 Discord 频道 通过 Redis 发布消息,由 Discord 适配器接收并发送 这样可以避免跨模块导入实例的问题 Args: channel_id: Discord 频道 ID content: 消息内容 attachments: 附件列表,每个元素为 {"url": str, "filename": str} embed: Discord Embed 字典 """ try: publish_data = { "type": "send_message", "channel_id": channel_id, "content": content, "attachments": attachments or [], "embed": embed } await redis_manager.redis.publish("neobot_discord_send", json.dumps(publish_data)) logger.info(f"[CrossPlatform] 消息已发布到 Redis 供 Discord 适配器发送: {channel_id}") except Exception as e: logger.error(f"[CrossPlatform] 发送消息到 Discord 失败: {e}") async def send_to_qq(group_id: int, content: str, attachments: List[dict] = None): """ 发送消息到 QQ 群 Args: group_id: QQ 群 ID content: 消息内容 attachments: 附件列表,每个元素为 {"url": str, "filename": str} """ try: from core.managers.bot_manager import bot_manager from models.message import MessageSegment # 获取所有 QQ 机器人实例 all_bots = bot_manager.get_all_bots() if not all_bots: logger.error(f"[CrossPlatform] 没有可用的 QQ 机器人实例") return logger.debug(f"[CrossPlatform] 找到 {len(all_bots)} 个 QQ 机器人实例") for bot in all_bots: try: # 构建消息 message = content # 发送消息(如果有附件,使用 OneBot 的图片格式) if attachments: # 构建完整消息:文本 + 图片 from models.message import MessageSegment full_message = [] if content: full_message.append(MessageSegment.text(content)) for attachment in attachments: if isinstance(attachment, dict): attachment_url = attachment.get("url", "") else: attachment_url = str(attachment) full_message.append(MessageSegment.image(attachment_url, cache=True, proxy=True, timeout=30)) logger.debug(f"[CrossPlatform] 准备发送消息到 QQ 群 {group_id}: {full_message}") # 一次性发送 await bot.send_group_msg(group_id, full_message) logger.info(f"[CrossPlatform] 消息已发送到 QQ 群 {group_id}") else: # 只发送文本 await bot.send_group_msg(group_id, message) logger.info(f"[CrossPlatform] 消息已发送到 QQ 群 {group_id}") break # 只需要发送一次 except Exception as e: logger.error(f"[CrossPlatform] 发送消息到 QQ 群 {group_id} 失败: {e}") except Exception as e: logger.error(f"[CrossPlatform] 发送消息到 QQ 失败: {e}") async def forward_discord_to_qq( discord_username: str, discord_discriminator: str, content: str, channel_id: int, attachments: List[dict] = None ): """ 将 Discord 消息转发到所有映射的 QQ 群 Args: discord_username: Discord 用户名 discord_discriminator: Discord discriminator content: 消息内容 channel_id: Discord 频道 ID attachments: 附件列表 """ if channel_id not in CROSS_PLATFORM_MAP: logger.warning(f"[CrossPlatform] 未找到 Discord 频道 {channel_id} 的映射配置") return group_info = CROSS_PLATFORM_MAP[channel_id] target_qq_group = group_info["qq_group_id"] # 格式化消息 formatted_content, image_list = await format_discord_to_qq_content( discord_username, discord_discriminator, content, channel_id, attachments ) # 发送到 QQ await send_to_qq(target_qq_group, formatted_content, image_list) logger.success(f"[CrossPlatform] Discord 频道 {channel_id} -> QQ 群 {target_qq_group}") async def forward_qq_to_discord( qq_nickname: str, qq_user_id: int, group_name: str, group_id: int, content: str, attachments: List[dict] = None ): """ 将 QQ 消息转发到所有映射的 Discord 频道 Args: qq_nickname: QQ 昵称 qq_user_id: QQ 用户 ID group_name: 群名称 group_id: QQ 群 ID content: 消息内容 attachments: 附件列表 """ # 查找映射的 Discord 频道 target_channels = [] for discord_channel_id, info in CROSS_PLATFORM_MAP.items(): if info["qq_group_id"] == group_id: target_channels.append(discord_channel_id) if not target_channels: logger.warning(f"[CrossPlatform] 未找到 QQ 群 {group_id} 的映射配置") return # 格式化消息 formatted_content, image_list, embed = await format_qq_to_discord_content( qq_nickname, qq_user_id, group_name, group_id, content, attachments ) # 发送到所有映射的 Discord 频道 for channel_id in target_channels: await send_to_discord(channel_id, formatted_content, image_list, embed) logger.success(f"[CrossPlatform] QQ 群 {group_id} -> Discord 频道 {target_channels}") async def publish_to_redis(platform: str, data: dict): """ 通过 Redis 发布跨平台消息 Args: platform: 平台名称 data: 消息数据 """ try: if redis_manager.redis: publish_data = { "platform": platform, "data": data, "timestamp": int(__import__('time').time()) } await redis_manager.redis.publish(CROSS_PLATFORM_CHANNEL, json.dumps(publish_data)) logger.debug(f"[CrossPlatform] 已通过 Redis 发布消息: platform={platform}") except Exception as e: logger.error(f"[CrossPlatform] Redis 发布失败: {e}") async def handle_discord_message( username: str, discriminator: str, content: str, channel_id: int, attachments: List[dict] = None, embed: dict = None ): """ 处理 Discord 消息并转发 Args: username: Discord 用户名 discriminator: Discord discriminator content: 消息内容 channel_id: Discord 频道 ID attachments: 附件列表 embed: Discord Embed 字典 """ if not ENABLE_CROSS_PLATFORM: return logger.info(f"[CrossPlatform] 收到 Discord 消息: {username}#{discriminator} in {channel_id}") # 转发到映射的 QQ 群 await forward_discord_to_qq(username, discriminator, content, channel_id, attachments) async def handle_qq_message( nickname: str, user_id: int, group_name: str, group_id: int, content: str, attachments: List[dict] = None ): """ 处理 QQ 消息并转发 Args: nickname: QQ 昵称 user_id: QQ 用户 ID group_name: 群名称 group_id: QQ 群 ID content: 消息内容 attachments: 附件列表 """ if not ENABLE_CROSS_PLATFORM: return logger.info(f"[CrossPlatform] 收到 QQ 消息: {nickname} ({user_id}) in {group_name}({group_id})") # 转发到映射的 Discord 频道 await forward_qq_to_discord(nickname, user_id, group_name, group_id, content, attachments) @matcher.on_message() async def handle_qq_group_message(event: GroupMessageEvent): """ 处理 QQ 群消息,转发到 Discord """ if not ENABLE_CROSS_PLATFORM: return # 检查是否是映射的群组 group_id = event.group_id mapped_channel = None for discord_channel_id, info in CROSS_PLATFORM_MAP.items(): if info["qq_group_id"] == group_id: mapped_channel = discord_channel_id break if mapped_channel is None: return # 提取消息内容 content = "" attachments = [] if isinstance(event.message, list): # 检查是否是合并转发消息 has_forward_node = any(isinstance(seg, MessageSegment) and seg.type == "node" for seg in event.message) if has_forward_node: # 解析合并转发消息 forward_nodes = [seg for seg in event.message if isinstance(seg, MessageSegment) and seg.type == "node"] # 将 MessageSegment 转换为字典格式 forward_nodes_dict = [{"type": seg.type, "data": seg.data} for seg in forward_nodes] content, attachments = await parse_forward_nodes(forward_nodes_dict) else: # 普通消息 for segment in event.message: if isinstance(segment, MessageSegment): if segment.type == "text": content += segment.data.get("text", "") elif segment.type == "image": file_url = segment.data.get("url") or segment.data.get("file") file_name = segment.data.get("file", "") if file_url: file_url = html.unescape(str(file_url)) if not file_name: file_name = os.path.basename(file_url.split('?')[0]) or f"image_{len(attachments)}.jpg" attachments.append({"url": file_url, "filename": file_name}) elif segment.type == "video": file_url = segment.data.get("url") or segment.data.get("file") file_name = segment.data.get("file", "") if file_url: file_url = html.unescape(str(file_url)) if not file_name: file_name = os.path.basename(file_url.split('?')[0]) or f"video_{len(attachments)}.mp4" attachments.append({"url": file_url, "filename": file_name}) elif segment.type == "at": qq_id = segment.data.get("qq") if qq_id and qq_id != "all": content += f"@{qq_id} " elif qq_id == "all": content += "@所有人 " elif isinstance(segment, str): content += segment elif isinstance(event.message, str): content = event.message # 清理多余空白 content = content.strip() # 获取群名称 group_name = "" try: group_info = await event.bot.get_group_info(event.group_id) group_name = group_info.get("group_name", "") except Exception: group_name = f"群{group_id}" # 处理消息 await handle_qq_message( nickname=event.sender.nickname or event.sender.card or str(event.user_id), user_id=event.user_id, group_name=group_name, group_id=group_id, content=content, attachments=attachments ) @matcher.on_message() async def handle_discord_message_event(event: Any): """ 处理 Discord 消息事件(通过适配器注入) """ if not ENABLE_CROSS_PLATFORM: return # 检查事件是否包含 Discord 特定信息 if not hasattr(event, '_is_discord_message'): return discord_channel_id = getattr(event, 'discord_channel_id', None) if discord_channel_id is None: return # 提取消息内容 content = event.raw_message or "" attachments = [] # 从 raw_message 中提取附件 URL(Discord 附件已添加到 raw_message) import re url_pattern = r'https?://[^\s<>"]+|www\.\S+' raw_message_lines = content.split('\n') content_lines = [] for line in raw_message_lines: line = line.strip() if re.match(url_pattern, line): # 这是附件 URL attachment_url = line # 尝试从 URL 提取文件名 filename = os.path.basename(attachment_url.split('?')[0]) or "attachment" attachment_item = {"url": attachment_url, "filename": filename} if attachment_item not in attachments: attachments.append(attachment_item) else: # 这是普通文本内容 if line: content_lines.append(line) content = '\n'.join(content_lines).strip() # 从 message 列表中提取(备用方案) if hasattr(event, 'message') and isinstance(event.message, list): for segment in event.message: if isinstance(segment, MessageSegment): if segment.type == "text": pass # 已经在 raw_message 中 elif segment.type == "image": file_url = segment.data.get("url") or segment.data.get("file") file_name = segment.data.get("file", "") if file_url: file_name = file_name or os.path.basename(str(file_url).split('?')[0]) or "image" attachment_item = {"url": str(file_url), "filename": file_name} if attachment_item not in attachments: attachments.append(attachment_item) elif segment.type == "video": file_url = segment.data.get("url") or segment.data.get("file") file_name = segment.data.get("file", "") if file_url: file_name = file_name or os.path.basename(str(file_url).split('?')[0]) or "video" attachment_item = {"url": str(file_url), "filename": file_name} if attachment_item not in attachments: attachments.append(attachment_item) # 获取用户信息 discord_username = getattr(event, 'discord_username', 'Unknown') discord_discriminator = getattr(event, 'discord_discriminator', '') # 处理消息 await handle_discord_message( username=discord_username, discriminator=discord_discriminator, content=content, channel_id=discord_channel_id, attachments=attachments, embed=None ) async def cross_platform_subscription_loop(): """ Redis 跨平台消息订阅循环 """ if redis_manager.redis is None: logger.warning("[CrossPlatform] Redis 未初始化,无法启动订阅") return try: pubsub = redis_manager.redis.pubsub() await pubsub.subscribe(CROSS_PLATFORM_CHANNEL) logger.success("[CrossPlatform] 已订阅 Redis 跨平台频道") async for message in pubsub.listen(): if message["type"] == "message": try: data = json.loads(message["data"]) platform = data.get("platform", "") message_data = data.get("data", {}) logger.info(f"[CrossPlatform] 收到跨平台消息: {platform}") if platform == "discord": # 从 Discord 转发到 QQ await forward_discord_to_qq( discord_username=message_data.get("username", "Unknown"), discord_discriminator=message_data.get("discriminator", ""), content=message_data.get("content", ""), channel_id=message_data.get("channel_id", 0), attachments=message_data.get("attachments", []) ) elif platform == "qq": # 从 QQ 转发到 Discord await forward_qq_to_discord( qq_nickname=message_data.get("nickname", "Unknown"), qq_user_id=message_data.get("user_id", 0), group_name=message_data.get("group_name", ""), group_id=message_data.get("group_id", 0), content=message_data.get("content", ""), attachments=message_data.get("attachments", []), embed=message_data.get("embed") ) except json.JSONDecodeError as e: logger.error(f"[CrossPlatform] 解析消息失败: {e}") except Exception as e: logger.error(f"[CrossPlatform] 处理跨平台消息失败: {e}") except Exception as e: logger.error(f"[CrossPlatform] 订阅循环异常: {e}") # 全局订阅任务 _subscription_task = None async def start_cross_platform_subscription(): """ 启动跨平台消息订阅 """ global _subscription_task if _subscription_task is None and ENABLE_CROSS_PLATFORM: _subscription_task = asyncio.create_task(cross_platform_subscription_loop()) logger.success("[CrossPlatform] 跨平台消息订阅已启动") async def stop_cross_platform_subscription(): """ 停止跨平台消息订阅 """ global _subscription_task if _subscription_task: _subscription_task.cancel() try: await _subscription_task except asyncio.CancelledError: pass _subscription_task = None logger.info("[CrossPlatform] 跨平台消息订阅已停止") async def reload_config(): """ 重新加载配置 """ global CROSS_PLATFORM_MAP, ENABLE_CROSS_PLATFORM try: import os config_path = os.path.join(os.path.dirname(__file__), "..", "config.toml") if os.path.exists(config_path): try: import tomllib except ImportError: import tomli as tomllib with open(config_path, "rb") as f: config = tomllib.load(f) cross_platform_config = config.get("cross_platform", {}) ENABLE_CROSS_PLATFORM = cross_platform_config.get("enabled", True) # 重新加载映射配置(支持两种格式) mappings = cross_platform_config.get("mappings", {}) CROSS_PLATFORM_MAP = {} # 格式1: [cross_platform.mappings.123456789012345678] 子表形式 if isinstance(mappings, dict) and mappings: for key, value in mappings.items(): if isinstance(value, dict) and "qq_group_id" in value: try: discord_id = int(key) if str(key).isdigit() else int(str(key).split('.')[-1]) CROSS_PLATFORM_MAP[discord_id] = { "qq_group_id": int(value.get("qq_group_id", 0)), "name": value.get("name", "") } except (ValueError, AttributeError): continue # 格式2: 旧的字典形式(向后兼容) if not CROSS_PLATFORM_MAP: for key, value in mappings.items(): if isinstance(key, str) and key.isdigit(): CROSS_PLATFORM_MAP[int(key)] = { "qq_group_id": int(value.get("qq_group_id", 0)), "name": value.get("name", "") } logger.success(f"[CrossPlatform] 配置已重新加载: {len(CROSS_PLATFORM_MAP)} 个映射") except Exception as e: logger.error(f"[CrossPlatform] 重新加载配置失败: {e}") # 插件加载时自动启动和加载配置 import asyncio try: asyncio.create_task(reload_config()) except Exception as e: logger.error(f"[CrossPlatform] 重新加载配置失败: {e}") try: asyncio.create_task(start_cross_platform_subscription()) except Exception as e: logger.error(f"[CrossPlatform] 启动订阅失败: {e}") # 命令处理器 @matcher.command("cross_config", "跨平台配置", permission=Permission.ADMIN) async def cross_config_command(event: MessageEvent): """ 查看跨平台配置 """ if not ENABLE_CROSS_PLATFORM: await event.reply("跨平台功能已禁用") return config_lines = ["=== 跨平台映射配置 ==="] if not CROSS_PLATFORM_MAP: config_lines.append("当前没有配置任何映射") else: for discord_id, info in CROSS_PLATFORM_MAP.items(): discord_channel = f"Discord: {discord_id}" qq_group = f"QQ: {info['qq_group_id']}" name = info.get("name", "") if name: config_lines.append(f"• {discord_channel} ↔ {qq_group} ({name})") else: config_lines.append(f"• {discord_channel} ↔ {qq_group}") await event.reply("\n".join(config_lines)) @matcher.command("cross_reload", "跨平台重载", permission=Permission.ADMIN) async def cross_reload_command(event: MessageEvent): """ 重新加载跨平台配置 """ await reload_config() await event.reply("跨平台配置已重载") # 清理函数 def cleanup(): """清理资源""" asyncio.create_task(stop_cross_platform_subscription())