# -*- coding: utf-8 -*- import re import orjson import abc import aiohttp from typing import Optional, Dict, Any, List, Union from core.utils.logger import logger from models import MessageEvent class BaseParser(metaclass=abc.ABCMeta): """ 解析器基类,定义所有web解析器共有的方法和属性 """ # 插件元信息 __plugin_meta__ = { "name": "web_parser", "description": "Web链接解析插件", "usage": "自动解析各种Web链接" } # 请求头 HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # 全局共享的ClientSession _session: Optional[aiohttp.ClientSession] = None def __init__(self): """ 初始化解析器 """ self.name = "Base Parser" self.url_pattern = re.compile(r"https?://[^\s]+") self.processed_messages = {} # 用于存储已处理的消息ID,防止重复处理 @classmethod def get_session(cls) -> aiohttp.ClientSession: """ 获取或创建全局的aiohttp ClientSession Returns: aiohttp.ClientSession: 客户端会话对象 """ if cls._session is None or cls._session.closed: cls._session = aiohttp.ClientSession(headers=cls.HEADERS) return cls._session @abc.abstractmethod async def parse(self, url: str) -> Optional[Dict[str, Any]]: """ 解析URL获取信息 Args: url (str): 要解析的URL Returns: Optional[Dict[str, Any]]: 解析结果,如果失败则返回None """ pass @abc.abstractmethod async def get_real_url(self, short_url: str) -> Optional[str]: """ 获取短链接的真实URL Args: short_url (str): 短链接 Returns: Optional[str]: 真实URL,如果失败则返回None """ pass @abc.abstractmethod async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: """ 格式化响应消息 Args: event (MessageEvent): 消息事件对象 data (Dict[str, Any]): 解析结果数据 Returns: List[Any]: 消息段列表 """ pass def extract_url_from_json_segments(self, segments): """ 从消息的JSON段中提取URL Args: segments: 消息段列表 Returns: Optional[str]: 提取到的URL或None """ for segment in segments: if segment.type == "json": logger.info(f"[{self.name}] 检测到JSON CQ码: {segment.data}") try: json_data = orjson.loads(segment.data.get("data", "{}")) short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl") if short_url: logger.success(f"[{self.name}] 成功从JSON卡片中提取到链接: {short_url}") return short_url except (orjson.JSONDecodeError, KeyError) as e: logger.error(f"[{self.name}] 解析JSON失败: {e}") continue return None def extract_url_from_text_segments(self, segments): """ 从消息的文本段中提取URL Args: segments: 消息段列表 Returns: Optional[str]: 提取到的URL或None """ for segment in segments: if segment.type == "text": text_content = segment.data.get("text", "") match = self.url_pattern.search(text_content) if match: extracted_url = match.group(0) logger.success(f"[{self.name}] 成功从文本中提取到链接: {extracted_url}") return extracted_url return None async def process_url(self, event: MessageEvent, url: str): """ 处理URL,获取信息并回复 Args: event (MessageEvent): 消息事件对象 url (str): 待处理的URL """ try: # 检查是否是短链接 if self.is_short_url(url): real_url = await self.get_real_url(url) if not real_url: logger.error(f"[{self.name}] 无法从 {url} 获取真实URL。") await event.reply("无法解析短链接。") return else: real_url = url # 解析URL data = await self.parse(real_url) if not data: logger.error(f"[{self.name}] 无法从 {real_url} 解析信息。") await event.reply("无法获取链接信息,可能是接口变动或链接不存在。") return # 格式化响应 response = await self.format_response(event, data) if response: # 发送响应 await event.bot.send_forwarded_messages(target=event, nodes=response) else: await event.reply("解析成功,但无法生成响应。") except Exception as e: logger.error(f"[{self.name}] 处理链接时发生错误: {e}") await event.reply("处理链接时发生错误,请稍后再试。") def is_short_url(self, url: str) -> bool: """ 判断是否是短链接 Args: url (str): URL Returns: bool: 是否是短链接 """ short_domains = ["b23.tv", "v.douyin.com", "t.cn", "url.cn"] return any(domain in url for domain in short_domains) async def handle_message(self, event: MessageEvent): """ 处理消息,检测链接并解析 Args: event (MessageEvent): 消息事件对象 """ # 消息去重 if event.message_id in self.processed_messages: return self.processed_messages[event.message_id] = True # 忽略机器人自己发送的消息 if event.user_id == event.self_id: return # 1. 优先解析JSON卡片中的链接 url_to_process = self.extract_url_from_json_segments(event.message) # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 if not url_to_process: url_to_process = self.extract_url_from_text_segments(event.message) # 3. 如果找到了链接,则进行处理 if url_to_process and self.should_handle_url(url_to_process): await self.process_url(event, url_to_process) def should_handle_url(self, url: str) -> bool: """ 判断是否应该处理该URL Args: url (str): URL Returns: bool: 是否应该处理 """ # 基类默认实现,子类应覆盖此方法 return bool(self.url_pattern.search(url)) @staticmethod def format_count(num: Union[int, str]) -> str: """ 格式化数字为易读形式 Args: num (Union[int, str]): 要格式化的数字 Returns: str: 格式化后的字符串 """ try: n = int(num) if n < 10000: return str(n) return f"{n / 10000:.1f}万" except (ValueError, TypeError): return str(num)