# -*- coding: utf-8 -*- import re import json import abc import aiohttp from typing import Optional, Dict, Any, List, Union from cachetools import TTLCache from core.utils.logger import logger from models import MessageEvent, MessageSegment class BaseParser(metaclass=abc.ABCMeta): """ 解析器基类,定义所有web解析器共有的方法和属性 """ # 插件元信息 __plugin_meta__ = { "name": "web_parser", "description": "Web链接解析插件", "usage": "自动解析各种Web链接" } # 请求头 HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # 全局共享的ClientSession _session: Optional[aiohttp.ClientSession] = None def __init__(self): """ 初始化解析器 """ self.name = "Base Parser" self.url_pattern = re.compile(r"https?://[^\s]+") @classmethod def get_session(cls) -> aiohttp.ClientSession: """ 获取或创建全局的aiohttp ClientSession Returns: aiohttp.ClientSession: 客户端会话对象 """ if cls._session is None or cls._session.closed: cls._session = aiohttp.ClientSession(headers=cls.HEADERS) return cls._session @abc.abstractmethod async def parse(self, url: str) -> Optional[Dict[str, Any]]: """ 解析URL获取信息 Args: url (str): 要解析的URL Returns: Optional[Dict[str, Any]]: 解析结果,如果失败则返回None """ pass @abc.abstractmethod async def get_real_url(self, short_url: str) -> Optional[str]: """ 获取短链接的真实URL Args: short_url (str): 短链接 Returns: Optional[str]: 真实URL,如果失败则返回None """ pass @abc.abstractmethod async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: """ 格式化响应消息 Args: event (MessageEvent): 消息事件对象 data (Dict[str, Any]): 解析结果数据 Returns: List[Any]: 消息段列表 """ pass def extract_url_from_json_segments(self, segments): """ 从消息的JSON段中提取URL Args: segments: 消息段列表 Returns: Optional[str]: 提取到的URL或None """ for segment in segments: if segment.type == "json": logger.info(f"[{self.name}] 检测到JSON CQ码: {segment.data}") try: json_data = json.loads(segment.data.get("data", "{}")) short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl") if short_url: logger.success(f"[{self.name}] 成功从JSON卡片中提取到链接: {short_url}") return short_url except (json.JSONDecodeError, KeyError) as e: logger.error(f"[{self.name}] 解析JSON失败: {e}") continue return None def extract_url_from_text_segments(self, segments): """ 从消息的文本段中提取URL Args: segments: 消息段列表 Returns: Optional[str]: 提取到的URL或None """ for segment in segments: if segment.type == "text": text_content = segment.data.get("text", "") match = self.url_pattern.search(text_content) if match: extracted_url = match.group(0) logger.success(f"[{self.name}] 成功从文本中提取到链接: {extracted_url}") return extracted_url return None async def process_url(self, event: MessageEvent, url: str): """ 处理URL,获取信息并回复 Args: event (MessageEvent): 消息事件对象 url (str): 待处理的URL """ try: # 检查是否是短链接 if self.is_short_url(url): real_url = await self.get_real_url(url) if not real_url: logger.error(f"[{self.name}] 无法从 {url} 获取真实URL。") await event.reply("无法解析短链接。") return else: real_url = url # 解析URL data = await self.parse(real_url) if not data: logger.error(f"[{self.name}] 无法从 {real_url} 解析信息。") await event.reply("无法获取链接信息,可能是接口变动或链接不存在。") return # 格式化响应 response = await self.format_response(event, data) if response: # 发送响应 await event.bot.send_forwarded_messages(target=event, nodes=response) else: await event.reply("解析成功,但无法生成响应。") except Exception as e: logger.error(f"[{self.name}] 处理链接时发生错误: {e}") await event.reply("处理链接时发生错误,请稍后再试。") def is_short_url(self, url: str) -> bool: """ 判断是否是短链接 Args: url (str): URL Returns: bool: 是否是短链接 """ short_domains = ["b23.tv", "v.douyin.com", "t.cn", "url.cn"] return any(domain in url for domain in short_domains) async def handle_message(self, event: MessageEvent): """ 处理消息,检测链接并解析 Args: event (MessageEvent): 消息事件对象 """ # 消息去重 if event.message_id in self.processed_messages: return self.processed_messages[event.message_id] = True # 忽略机器人自己发送的消息 if event.user_id == event.self_id: return # 1. 优先解析JSON卡片中的链接 url_to_process = self.extract_url_from_json_segments(event.message) # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 if not url_to_process: url_to_process = self.extract_url_from_text_segments(event.message) # 3. 如果找到了链接,则进行处理 if url_to_process and self.should_handle_url(url_to_process): await self.process_url(event, url_to_process) def should_handle_url(self, url: str) -> bool: """ 判断是否应该处理该URL Args: url (str): URL Returns: bool: 是否应该处理 """ # 基类默认实现,子类应覆盖此方法 return bool(self.url_pattern.search(url)) @staticmethod def format_count(num: Union[int, str]) -> str: """ 格式化数字为易读形式 Args: num (Union[int, str]): 要格式化的数字 Returns: str: 格式化后的字符串 """ try: n = int(num) if n < 10000: return str(n) return f"{n / 10000:.1f}万" except (ValueError, TypeError): return str(num)