# -*- coding: utf-8 -*- """ 跨平台消息互通插件解析器模块 """ import os import json from typing import Dict, List, Any from models.message import MessageSegment from .config import config async def parse_forward_nodes(nodes: List[Dict[str, Any]]) -> tuple[str, List[dict]]: """解析 OneBot 合并转发消息节点""" content_parts = [] attachments = [] for node in nodes: if not isinstance(node, dict): continue node_data = node.get("data", {}) node_content = node_data.get("content", "") sender_name = node_data.get("name", node_data.get("uin", "Unknown")) if isinstance(node_content, str): if "[object Object]" in node_content: content = f"[合并转发消息: {sender_name}]" content_parts.append(f"**{sender_name}**:\n{content}") elif '[CQ:' in node_content: content = parse_cq_code(node_content, attachments) content_parts.append(f"**{sender_name}**:\n{content}") else: content = node_content content_parts.append(f"**{sender_name}**:\n{content}") elif isinstance(node_content, list): content = parse_message_segments(node_content, attachments) content_parts.append(f"**{sender_name}**:\n{content}") full_content = "\n\n".join(content_parts) if content_parts else "" return full_content, attachments def parse_cq_code(cq_code: str, attachments: List[dict]) -> str: """解析 CQ 码字符串""" import re cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]' matches = list(re.finditer(cq_pattern, cq_code)) if not matches: return cq_code result = [] last_end = 0 for match in matches: if match.start() > last_end: result.append(cq_code[last_end:match.start()]) cq_type = match.group(1) cq_params_str = match.group(2) or "" params = {} if cq_params_str: for param in cq_params_str.split(','): if '=' in param: k, v = param.split('=', 1) params[k] = v if cq_type == "text": result.append(params.get("text", "")) elif cq_type == "image": file_url = params.get("url") or params.get("file") if file_url: file_name = params.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "image" attachments.append({"type": "image", "url": str(file_url), "filename": file_name}) result.append(f"\n[图片: {file_name}]\n") elif cq_type == "video": file_url = params.get("url") or params.get("file") if file_url: file_name = params.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "video" attachments.append({"type": "video", "url": str(file_url), "filename": file_name}) result.append(f"\n[视频: {file_name}]\n") elif cq_type == "record": file_url = params.get("url") or params.get("file") if file_url: file_name = params.get("file", "") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "record" attachments.append({"type": "record", "url": str(file_url), "filename": file_name}) result.append(f"\n[语音: {file_name}]\n") elif cq_type == "at": qq_id = params.get("qq") if qq_id == "all": result.append("@所有人 ") else: result.append(f"@{qq_id} ") elif cq_type == "face": face_id = params.get("id", "") result.append(f"[表情:{face_id}] ") elif cq_type == "reply": reply_id = params.get("id", "") result.append(f"[回复:{reply_id}] ") elif cq_type == "file": file_url = params.get("file", "") if file_url: file_name = os.path.basename(str(file_url).split('?')[0]) or "file" attachments.append({"type": "file", "url": str(file_url), "filename": file_name}) result.append(f"\n[文件: {file_name}]\n") last_end = match.end() if last_end < len(cq_code): result.append(cq_code[last_end:]) return "".join(result) def parse_message_segments(segments: List[Any], attachments: List[dict]) -> str: """解析 MessageSegment 列表""" result = [] for seg in segments: if isinstance(seg, str): result.append(seg) elif isinstance(seg, MessageSegment): seg_type = seg.type seg_data = seg.data if seg_type == "text": result.append(seg_data.get("text", "")) elif seg_type == "image": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("filename") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "image" attachments.append({"type": "image", "url": str(file_url), "filename": file_name}) result.append(f"\n[图片: {file_name}]\n") elif seg_type == "video": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("filename") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "video" attachments.append({"type": "video", "url": str(file_url), "filename": file_name}) result.append(f"\n[视频: {file_name}]\n") elif seg_type == "record": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("filename") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "record" attachments.append({"type": "record", "url": str(file_url), "filename": file_name}) result.append(f"\n[语音: {file_name}]\n") elif seg_type == "at": qq_id = seg_data.get("qq") if qq_id == "all": result.append("@所有人 ") else: result.append(f"@{qq_id} ") elif seg_type == "face": face_id = seg_data.get("id", "") result.append(f"[表情:{face_id}] ") elif seg_type == "reply": reply_id = seg_data.get("id", "") result.append(f"[回复:{reply_id}] ") elif seg_type == "file": file_url = seg_data.get("file", "") if file_url: file_name = os.path.basename(str(file_url).split('?')[0]) or "file" attachments.append({"type": "file", "url": str(file_url), "filename": file_name}) result.append(f"\n[文件: {file_name}]\n") elif seg_type == "json": json_data = seg_data.get("data", "") try: parsed = json.loads(json_data) if isinstance(parsed, dict): result.append(f"\n[JSON数据: {json_data[:100]}...]\n") except: result.append(f"\n[JSON数据]\n") elif seg_type == "xml": result.append(f"\n[XML数据]\n") elif isinstance(seg, dict): seg_type = seg.get("type") seg_data = seg.get("data", {}) if seg_type == "text": result.append(seg_data.get("text", "")) elif seg_type == "image": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("filename") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "image" attachments.append({"type": "image", "url": str(file_url), "filename": file_name}) result.append(f"\n[图片: {file_name}]\n") elif seg_type == "video": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("filename") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "video" attachments.append({"type": "video", "url": str(file_url), "filename": file_name}) result.append(f"\n[视频: {file_name}]\n") elif seg_type == "record": file_url = seg_data.get("url") or seg_data.get("file") if file_url: file_name = seg_data.get("filename") if not file_name: file_name = os.path.basename(str(file_url).split('?')[0]) or "record" attachments.append({"type": "record", "url": str(file_url), "filename": file_name}) result.append(f"\n[语音: {file_name}]\n") elif seg_type == "at": qq_id = seg_data.get("qq") if qq_id == "all": result.append("@所有人 ") else: result.append(f"@{qq_id} ") return "".join(result) def get_platform_info(platform: str, identifier: Any) -> str: """获取平台信息字符串""" if platform == "discord": channel_id = int(identifier) if channel_id in config.CROSS_PLATFORM_MAP: group_info = config.CROSS_PLATFORM_MAP[channel_id] group_name = group_info.get("name", f"群组 {group_info['qq_group_id']}") return f"[Discord {group_name}]" return f"[Discord]" elif platform == "qq": group_id = int(identifier) return f"[PAW qq]" return "" async def format_discord_to_qq_content( discord_username: str, discord_discriminator: str, content: str, channel_id: int, attachments: List[dict] = None ) -> tuple[str, List[dict]]: """将 Discord 消息格式化为 QQ 消息格式""" logger.debug(f"[CrossPlatform] format_discord_to_qq_content: username={discord_username}, content='{content}', attachments={attachments}") platform_info = get_platform_info("discord", channel_id) message_header = f"{discord_username}:" message_body = content.strip() if content else "" if message_body: full_message = f"{message_header}\n{message_body}" else: full_message = message_header processed_attachments = [] if attachments: logger.debug(f"[CrossPlatform] 处理附件: {attachments}") for att in attachments: if isinstance(att, dict): url = att.get("url", "") filename = att.get("filename", "").lower() att_type = att.get("type", "") if att_type == "image" or filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): processed_attachments.append({"type": "image", "url": url}) elif att_type == "record" or filename.endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')): processed_attachments.append({"type": "record", "url": url}) elif att_type == "video" or filename.endswith(('.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv')): processed_attachments.append({"type": "video", "url": url}) else: processed_attachments.append({"type": "file", "url": url, "filename": filename}) logger.debug(f"[CrossPlatform] Discord 消息格式化: 识别为文件 {filename}") else: url = str(att) logger.debug(f"[CrossPlatform] 处理非字典附件: {url}") if url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): processed_attachments.append({"type": "image", "url": url}) elif url.lower().endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')): processed_attachments.append({"type": "record", "url": url}) elif url.lower().endswith(('.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv')): processed_attachments.append({"type": "video", "url": url}) else: filename = os.path.basename(url.split('?')[0]) or "file" processed_attachments.append({"type": "file", "url": url, "filename": filename}) logger.debug(f"[CrossPlatform] Discord 消息格式化: 通过扩展名识别为文件 {filename}") logger.debug(f"[CrossPlatform] format_discord_to_qq_content 完成: full_message='{full_message}', processed_attachments={processed_attachments}") return full_message, processed_attachments async def format_qq_to_discord_content( qq_nickname: str, qq_user_id: int, group_name: str, group_id: int, content: str, attachments: List[dict] = None ) -> tuple[str, List[dict], dict]: """将 QQ 消息格式化为 Discord 消息格式(Embed 卡片)""" platform_info = get_platform_info("qq", group_id) embed = { "type": "rich", "color": 0x5865F2, "author": { "name": f"{qq_nickname}", "icon_url": f"https://q1.qlogo.cn/g?b=qq&nk={qq_user_id}&s=640" }, "description": content if content else "", "timestamp": None, "footer": { "text": f"来自 QQ" } } if attachments: image_urls = [] voice_urls = [] video_urls = [] other_urls = [] filtered_attachments = [] for att in attachments: url = att.get("url", "") filename = att.get("filename", "").lower() att_type = att.get("type", "") if att_type == "image" or filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): image_urls.append(url) if len(image_urls) > 1: filtered_attachments.append(att) elif att_type == "record" or filename.endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')): voice_urls.append(url) other_urls.append(url) filtered_attachments.append(att) elif att_type == "video" or filename.endswith(('.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv')): video_urls.append(url) other_urls.append(url) filtered_attachments.append(att) else: other_urls.append(url) filtered_attachments.append(att) attachments = filtered_attachments embed["description"] = content if content else "" if image_urls: embed["image"] = {"url": image_urls[0]} if voice_urls: voice_filenames = [att.get("filename", "voice") for att in attachments if att.get("url") in voice_urls] voice_list = "\n".join([f"🎤 {fname}" for fname in voice_filenames[:5]]) embed["description"] += f"\n\n**语音消息:**\n{voice_list}" if len(voice_urls) > 5: embed["description"] += f"\n...还有 {len(voice_urls) - 5} 条语音" if video_urls: video_filenames = [att.get("filename", "video") for att in attachments if att.get("url") in video_urls] video_list = "\n".join([f"🎬 {fname}" for fname in video_filenames[:5]]) embed["description"] += f"\n\n**视频文件:**\n{video_list}" if len(video_urls) > 5: embed["description"] += f"\n...还有 {len(video_urls) - 5} 个视频" non_media_other_urls = [u for u in other_urls if u not in voice_urls and u not in video_urls] if non_media_other_urls: file_filenames = [att.get("filename", "file") for att in attachments if att.get("url") in non_media_other_urls] file_list = "\n".join([f"📄 {fname}" for fname in file_filenames[:5]]) embed["description"] += f"\n\n**附加文件:**\n{file_list}" if len(non_media_other_urls) > 5: embed["description"] += f"\n...还有 {len(non_media_other_urls) - 5} 个文件" return "", attachments or [], embed