252 lines
8.0 KiB
Python
252 lines
8.0 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
import orjson
|
||
import abc
|
||
import aiohttp
|
||
from typing import Optional, Dict, Any, List, Union
|
||
|
||
from core.utils.logger import logger
|
||
from models import MessageEvent
|
||
|
||
|
||
class BaseParser(metaclass=abc.ABCMeta):
|
||
"""
|
||
解析器基类,定义所有web解析器共有的方法和属性
|
||
"""
|
||
|
||
# 插件元信息
|
||
__plugin_meta__ = {
|
||
"name": "web_parser",
|
||
"description": "Web链接解析插件",
|
||
"usage": "自动解析各种Web链接"
|
||
}
|
||
|
||
|
||
|
||
# 请求头
|
||
HEADERS = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
# 全局共享的ClientSession
|
||
_session: Optional[aiohttp.ClientSession] = None
|
||
|
||
def __init__(self):
|
||
"""
|
||
初始化解析器
|
||
"""
|
||
self.name = "Base Parser"
|
||
self.url_pattern = re.compile(r"https?://[^\s]+")
|
||
self.processed_messages = {} # 用于存储已处理的消息ID,防止重复处理
|
||
|
||
@classmethod
|
||
def get_session(cls) -> aiohttp.ClientSession:
|
||
"""
|
||
获取或创建全局的aiohttp ClientSession
|
||
|
||
Returns:
|
||
aiohttp.ClientSession: 客户端会话对象
|
||
"""
|
||
if cls._session is None or cls._session.closed:
|
||
cls._session = aiohttp.ClientSession(headers=cls.HEADERS)
|
||
return cls._session
|
||
|
||
@abc.abstractmethod
|
||
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
解析URL获取信息
|
||
|
||
Args:
|
||
url (str): 要解析的URL
|
||
|
||
Returns:
|
||
Optional[Dict[str, Any]]: 解析结果,如果失败则返回None
|
||
"""
|
||
pass
|
||
|
||
@abc.abstractmethod
|
||
async def get_real_url(self, short_url: str) -> Optional[str]:
|
||
"""
|
||
获取短链接的真实URL
|
||
|
||
Args:
|
||
short_url (str): 短链接
|
||
|
||
Returns:
|
||
Optional[str]: 真实URL,如果失败则返回None
|
||
"""
|
||
pass
|
||
|
||
@abc.abstractmethod
|
||
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
|
||
"""
|
||
格式化响应消息
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
data (Dict[str, Any]): 解析结果数据
|
||
|
||
Returns:
|
||
List[Any]: 消息段列表
|
||
"""
|
||
pass
|
||
|
||
def extract_url_from_json_segments(self, segments):
|
||
"""
|
||
从消息的JSON段中提取URL
|
||
|
||
Args:
|
||
segments: 消息段列表
|
||
|
||
Returns:
|
||
Optional[str]: 提取到的URL或None
|
||
"""
|
||
for segment in segments:
|
||
if segment.type == "json":
|
||
logger.info(f"[{self.name}] 检测到JSON CQ码: {segment.data}")
|
||
try:
|
||
json_data = orjson.loads(segment.data.get("data", "{}"))
|
||
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
|
||
if short_url:
|
||
logger.success(f"[{self.name}] 成功从JSON卡片中提取到链接: {short_url}")
|
||
return short_url
|
||
except (orjson.JSONDecodeError, KeyError) as e:
|
||
logger.error(f"[{self.name}] 解析JSON失败: {e}")
|
||
continue
|
||
return None
|
||
|
||
def extract_url_from_text_segments(self, segments):
|
||
"""
|
||
从消息的文本段中提取URL,会合并所有文本段来处理被分割的链接。
|
||
|
||
Args:
|
||
segments: 消息段列表
|
||
|
||
Returns:
|
||
Optional[str]: 提取到的URL或None
|
||
"""
|
||
# 1. 拼接所有文本段内容,保留空格
|
||
full_text = "".join([segment.data.get("text", "") for segment in segments if segment.type == "text"])
|
||
|
||
# 2. 使用解析器自身的url_pattern进行匹配,通常是匹配到第一个空格为止
|
||
match = self.url_pattern.search(full_text)
|
||
|
||
if match:
|
||
extracted_url = match.group(0)
|
||
# 清理一下链接末尾可能误包含的标点符号
|
||
extracted_url = re.sub(r'[,.!?]$', '', extracted_url)
|
||
logger.success(f"[{self.name}] 成功从合并后的文本中提取到链接: {extracted_url}")
|
||
return extracted_url
|
||
|
||
return None
|
||
|
||
async def process_url(self, event: MessageEvent, url: str):
|
||
"""
|
||
处理URL,获取信息并回复
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
url (str): 待处理的URL
|
||
"""
|
||
try:
|
||
# 检查是否是短链接
|
||
if self.is_short_url(url):
|
||
real_url = await self.get_real_url(url)
|
||
if not real_url:
|
||
logger.error(f"[{self.name}] 无法从 {url} 获取真实URL。")
|
||
await event.reply("无法解析短链接。")
|
||
return
|
||
else:
|
||
real_url = url
|
||
|
||
# 解析URL
|
||
data = await self.parse(real_url)
|
||
if not data:
|
||
logger.error(f"[{self.name}] 无法从 {real_url} 解析信息。")
|
||
await event.reply("无法获取链接信息,可能是接口变动或链接不存在。")
|
||
return
|
||
|
||
# 格式化响应
|
||
response = await self.format_response(event, data)
|
||
if response:
|
||
# 发送响应
|
||
await event.bot.send_forwarded_messages(target=event, nodes=response)
|
||
else:
|
||
await event.reply("解析成功,但无法生成响应。")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[{self.name}] 处理链接时发生错误: {e}")
|
||
await event.reply("处理链接时发生错误,请稍后再试。")
|
||
|
||
def is_short_url(self, url: str) -> bool:
|
||
"""
|
||
判断是否是短链接
|
||
|
||
Args:
|
||
url (str): URL
|
||
|
||
Returns:
|
||
bool: 是否是短链接
|
||
"""
|
||
short_domains = ["b23.tv", "v.douyin.com", "t.cn", "url.cn"]
|
||
return any(domain in url for domain in short_domains)
|
||
|
||
async def handle_message(self, event: MessageEvent):
|
||
"""
|
||
处理消息,检测链接并解析
|
||
|
||
Args:
|
||
event (MessageEvent): 消息事件对象
|
||
"""
|
||
# 消息去重
|
||
if event.message_id in self.processed_messages:
|
||
return
|
||
self.processed_messages[event.message_id] = True
|
||
|
||
# 忽略机器人自己发送的消息
|
||
if event.user_id == event.self_id:
|
||
return
|
||
|
||
# 1. 优先解析JSON卡片中的链接
|
||
url_to_process = self.extract_url_from_json_segments(event.message)
|
||
|
||
# 2. 如果未在JSON卡片中找到链接,则在文本消息中查找
|
||
if not url_to_process:
|
||
url_to_process = self.extract_url_from_text_segments(event.message)
|
||
|
||
# 3. 如果找到了链接,则进行处理
|
||
if url_to_process and self.should_handle_url(url_to_process):
|
||
await self.process_url(event, url_to_process)
|
||
|
||
def should_handle_url(self, url: str) -> bool:
|
||
"""
|
||
判断是否应该处理该URL
|
||
|
||
Args:
|
||
url (str): URL
|
||
|
||
Returns:
|
||
bool: 是否应该处理
|
||
"""
|
||
# 基类默认实现,子类应覆盖此方法
|
||
return bool(self.url_pattern.search(url))
|
||
|
||
@staticmethod
|
||
def format_count(num: Union[int, str]) -> str:
|
||
"""
|
||
格式化数字为易读形式
|
||
|
||
Args:
|
||
num (Union[int, str]): 要格式化的数字
|
||
|
||
Returns:
|
||
str: 格式化后的字符串
|
||
"""
|
||
try:
|
||
n = int(num)
|
||
if n < 10000:
|
||
return str(n)
|
||
return f"{n / 10000:.1f}万"
|
||
except (ValueError, TypeError):
|
||
return str(num)
|