Files
NeoBot/plugins/discord-cross/parser.py
镀铬酸钾 be552a90ea Dev (#75)
* fix(discord): 修复 WebSocket 连接检测并增强跨平台文件处理

修复 Discord WebSocket 连接检测逻辑,使用正确的属性检查连接状态
为跨平台消息处理添加文件类型支持,并增加详细的调试日志
优化附件处理逻辑,确保所有文件类型都能正确识别和转发

* feat(跨平台): 优化消息处理并添加纯文本提取功能

添加 extract_text_only 函数过滤非文本标记
修改翻译逻辑仅处理纯文本内容
完善附件处理和消息内容拼接
修复仅包含表情时的消息处理问题

* refactor(discord-cross): 使用模块专用日志记录器替换全局日志记录器

将各模块中的全局日志记录器替换为模块专用日志记录器,以提供更清晰的日志来源标识
同时在适配器中添加会话状态检查和重连机制,提升消息发送的可靠性
2026-03-21 18:04:35 +08:00

394 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
跨平台消息互通插件解析器模块
"""
import os
import json
import re
from typing import Dict, List, Any
from models.message import MessageSegment
from core.utils.logger import ModuleLogger
from .config import config
# 创建模块专用日志记录器
logger = ModuleLogger("CrossPlatformParser")
def extract_text_only(content: str) -> str:
"""从消息内容中提取纯文本,过滤掉非文本标记"""
if not content:
return ""
# 移除所有 [图片: xxx]、[视频: xxx]、[语音: xxx]、[文件: xxx] 等标记
text_only = re.sub(r'\s*\[(图片|视频|语音|文件):[^\]]+\]\s*', ' ', content)
# 移除连续空格
text_only = re.sub(r'\s+', ' ', text_only).strip()
return text_only
async def parse_forward_nodes(nodes: List[Dict[str, Any]]) -> tuple[str, List[dict]]:
"""解析 OneBot 合并转发消息节点"""
content_parts = []
attachments = []
for node in nodes:
if not isinstance(node, dict):
continue
node_data = node.get("data", {})
node_content = node_data.get("content", "")
sender_name = node_data.get("name", node_data.get("uin", "Unknown"))
if isinstance(node_content, str):
if "[object Object]" in node_content:
content = f"[合并转发消息: {sender_name}]"
content_parts.append(f"**{sender_name}**:\n{content}")
elif '[CQ:' in node_content:
content = parse_cq_code(node_content, attachments)
content_parts.append(f"**{sender_name}**:\n{content}")
else:
content = node_content
content_parts.append(f"**{sender_name}**:\n{content}")
elif isinstance(node_content, list):
content = parse_message_segments(node_content, attachments)
content_parts.append(f"**{sender_name}**:\n{content}")
full_content = "\n\n".join(content_parts) if content_parts else ""
return full_content, attachments
def parse_cq_code(cq_code: str, attachments: List[dict]) -> str:
"""解析 CQ 码字符串"""
import re
cq_pattern = r'\[CQ:([^,]+)(?:,([^\]]+))?\]'
matches = list(re.finditer(cq_pattern, cq_code))
if not matches:
return cq_code
result = []
last_end = 0
for match in matches:
if match.start() > last_end:
result.append(cq_code[last_end:match.start()])
cq_type = match.group(1)
cq_params_str = match.group(2) or ""
params = {}
if cq_params_str:
for param in cq_params_str.split(','):
if '=' in param:
k, v = param.split('=', 1)
params[k] = v
if cq_type == "text":
result.append(params.get("text", ""))
elif cq_type == "image":
file_url = params.get("url") or params.get("file")
if file_url:
file_name = params.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "image"
attachments.append({"type": "image", "url": str(file_url), "filename": file_name})
result.append(f"\n[图片: {file_name}]\n")
elif cq_type == "video":
file_url = params.get("url") or params.get("file")
if file_url:
file_name = params.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "video"
attachments.append({"type": "video", "url": str(file_url), "filename": file_name})
result.append(f"\n[视频: {file_name}]\n")
elif cq_type == "record":
file_url = params.get("url") or params.get("file")
if file_url:
file_name = params.get("file", "")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "record"
attachments.append({"type": "record", "url": str(file_url), "filename": file_name})
result.append(f"\n[语音: {file_name}]\n")
elif cq_type == "at":
qq_id = params.get("qq")
if qq_id == "all":
result.append("@所有人 ")
else:
result.append(f"@{qq_id} ")
elif cq_type == "face":
face_id = params.get("id", "")
result.append(f"[表情:{face_id}] ")
elif cq_type == "reply":
reply_id = params.get("id", "")
result.append(f"[回复:{reply_id}] ")
elif cq_type == "file":
file_url = params.get("file", "")
if file_url:
file_name = os.path.basename(str(file_url).split('?')[0]) or "file"
attachments.append({"type": "file", "url": str(file_url), "filename": file_name})
result.append(f"\n[文件: {file_name}]\n")
last_end = match.end()
if last_end < len(cq_code):
result.append(cq_code[last_end:])
return "".join(result)
def parse_message_segments(segments: List[Any], attachments: List[dict]) -> str:
"""解析 MessageSegment 列表"""
result = []
for seg in segments:
if isinstance(seg, str):
result.append(seg)
elif isinstance(seg, MessageSegment):
seg_type = seg.type
seg_data = seg.data
if seg_type == "text":
result.append(seg_data.get("text", ""))
elif seg_type == "image":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("filename")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "image"
attachments.append({"type": "image", "url": str(file_url), "filename": file_name})
result.append(f"\n[图片: {file_name}]\n")
elif seg_type == "video":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("filename")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "video"
attachments.append({"type": "video", "url": str(file_url), "filename": file_name})
result.append(f"\n[视频: {file_name}]\n")
elif seg_type == "record":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("filename")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "record"
attachments.append({"type": "record", "url": str(file_url), "filename": file_name})
result.append(f"\n[语音: {file_name}]\n")
elif seg_type == "at":
qq_id = seg_data.get("qq")
if qq_id == "all":
result.append("@所有人 ")
else:
result.append(f"@{qq_id} ")
elif seg_type == "face":
face_id = seg_data.get("id", "")
result.append(f"[表情:{face_id}] ")
elif seg_type == "reply":
reply_id = seg_data.get("id", "")
result.append(f"[回复:{reply_id}] ")
elif seg_type == "file":
file_url = seg_data.get("file", "")
if file_url:
file_name = os.path.basename(str(file_url).split('?')[0]) or "file"
attachments.append({"type": "file", "url": str(file_url), "filename": file_name})
result.append(f"\n[文件: {file_name}]\n")
elif seg_type == "json":
json_data = seg_data.get("data", "")
try:
parsed = json.loads(json_data)
if isinstance(parsed, dict):
result.append(f"\n[JSON数据: {json_data[:100]}...]\n")
except:
result.append(f"\n[JSON数据]\n")
elif seg_type == "xml":
result.append(f"\n[XML数据]\n")
elif isinstance(seg, dict):
seg_type = seg.get("type")
seg_data = seg.get("data", {})
if seg_type == "text":
result.append(seg_data.get("text", ""))
elif seg_type == "image":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("filename")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "image"
attachments.append({"type": "image", "url": str(file_url), "filename": file_name})
result.append(f"\n[图片: {file_name}]\n")
elif seg_type == "video":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("filename")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "video"
attachments.append({"type": "video", "url": str(file_url), "filename": file_name})
result.append(f"\n[视频: {file_name}]\n")
elif seg_type == "record":
file_url = seg_data.get("url") or seg_data.get("file")
if file_url:
file_name = seg_data.get("filename")
if not file_name:
file_name = os.path.basename(str(file_url).split('?')[0]) or "record"
attachments.append({"type": "record", "url": str(file_url), "filename": file_name})
result.append(f"\n[语音: {file_name}]\n")
elif seg_type == "at":
qq_id = seg_data.get("qq")
if qq_id == "all":
result.append("@所有人 ")
else:
result.append(f"@{qq_id} ")
return "".join(result)
def get_platform_info(platform: str, identifier: Any) -> str:
"""获取平台信息字符串"""
if platform == "discord":
channel_id = int(identifier)
if channel_id in config.CROSS_PLATFORM_MAP:
group_info = config.CROSS_PLATFORM_MAP[channel_id]
group_name = group_info.get("name", f"群组 {group_info['qq_group_id']}")
return f"[Discord {group_name}]"
return f"[Discord]"
elif platform == "qq":
group_id = int(identifier)
return f"[PAW qq]"
return ""
async def format_discord_to_qq_content(
discord_username: str,
discord_discriminator: str,
content: str,
channel_id: int,
attachments: List[dict] = None
) -> tuple[str, List[dict]]:
"""将 Discord 消息格式化为 QQ 消息格式"""
logger.debug(f"[CrossPlatform] format_discord_to_qq_content: username={discord_username}, content='{content}', attachments={attachments}")
platform_info = get_platform_info("discord", channel_id)
message_header = f"{discord_username}:"
message_body = content.strip() if content else ""
if message_body:
full_message = f"{message_header}\n{message_body}"
else:
full_message = message_header
processed_attachments = []
if attachments:
logger.debug(f"[CrossPlatform] 处理附件: {attachments}")
for att in attachments:
if isinstance(att, dict):
url = att.get("url", "")
filename = att.get("filename", "").lower()
att_type = att.get("type", "")
if att_type == "image" or filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
processed_attachments.append({"type": "image", "url": url})
elif att_type == "record" or filename.endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')):
processed_attachments.append({"type": "record", "url": url})
elif att_type == "video" or filename.endswith(('.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv')):
processed_attachments.append({"type": "video", "url": url})
else:
processed_attachments.append({"type": "file", "url": url, "filename": filename})
logger.debug(f"[CrossPlatform] Discord 消息格式化: 识别为文件 {filename}")
else:
url = str(att)
logger.debug(f"[CrossPlatform] 处理非字典附件: {url}")
if url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
processed_attachments.append({"type": "image", "url": url})
elif url.lower().endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')):
processed_attachments.append({"type": "record", "url": url})
elif url.lower().endswith(('.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv')):
processed_attachments.append({"type": "video", "url": url})
else:
filename = os.path.basename(url.split('?')[0]) or "file"
processed_attachments.append({"type": "file", "url": url, "filename": filename})
logger.debug(f"[CrossPlatform] Discord 消息格式化: 通过扩展名识别为文件 {filename}")
logger.debug(f"[CrossPlatform] format_discord_to_qq_content 完成: full_message='{full_message}', processed_attachments={processed_attachments}")
return full_message, processed_attachments
async def format_qq_to_discord_content(
qq_nickname: str,
qq_user_id: int,
group_name: str,
group_id: int,
content: str,
attachments: List[dict] = None
) -> tuple[str, List[dict], dict]:
"""将 QQ 消息格式化为 Discord 消息格式Embed 卡片)"""
platform_info = get_platform_info("qq", group_id)
embed = {
"type": "rich",
"color": 0x5865F2,
"author": {
"name": f"{qq_nickname}",
"icon_url": f"https://q1.qlogo.cn/g?b=qq&nk={qq_user_id}&s=640"
},
"description": content if content else "",
"timestamp": None,
"footer": {
"text": f"来自 QQ"
}
}
if attachments:
image_urls = []
voice_urls = []
video_urls = []
other_urls = []
filtered_attachments = []
for att in attachments:
url = att.get("url", "")
filename = att.get("filename", "").lower()
att_type = att.get("type", "")
if att_type == "image" or filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
image_urls.append(url)
if len(image_urls) > 1:
filtered_attachments.append(att)
elif att_type == "record" or filename.endswith(('.amr', '.silk', '.mp3', '.wav', '.ogg', '.m4a')):
voice_urls.append(url)
other_urls.append(url)
filtered_attachments.append(att)
elif att_type == "video" or filename.endswith(('.mp4', '.avi', '.mkv', '.mov', '.flv', '.wmv')):
video_urls.append(url)
other_urls.append(url)
filtered_attachments.append(att)
else:
other_urls.append(url)
filtered_attachments.append(att)
attachments = filtered_attachments
embed["description"] = content if content else ""
if image_urls:
embed["image"] = {"url": image_urls[0]}
if voice_urls:
voice_filenames = [att.get("filename", "voice") for att in attachments if att.get("url") in voice_urls]
voice_list = "\n".join([f"🎤 {fname}" for fname in voice_filenames[:5]])
embed["description"] += f"\n\n**语音消息:**\n{voice_list}"
if len(voice_urls) > 5:
embed["description"] += f"\n...还有 {len(voice_urls) - 5} 条语音"
if video_urls:
video_filenames = [att.get("filename", "video") for att in attachments if att.get("url") in video_urls]
video_list = "\n".join([f"🎬 {fname}" for fname in video_filenames[:5]])
embed["description"] += f"\n\n**视频文件:**\n{video_list}"
if len(video_urls) > 5:
embed["description"] += f"\n...还有 {len(video_urls) - 5} 个视频"
non_media_other_urls = [u for u in other_urls if u not in voice_urls and u not in video_urls]
if non_media_other_urls:
file_filenames = [att.get("filename", "file") for att in attachments if att.get("url") in non_media_other_urls]
file_list = "\n".join([f"📄 {fname}" for fname in file_filenames[:5]])
embed["description"] += f"\n\n**附加文件:**\n{file_list}"
if len(non_media_other_urls) > 5:
embed["description"] += f"\n...还有 {len(non_media_other_urls) - 5} 个文件"
return "", attachments or [], embed