feat(web_parser): 新增通用web链接解析插件框架

refactor: 重构B站、抖音、GitHub解析器为模块化结构

fix(executor): 增强docker容器错误处理和回调稳定性

style(templates): 优化帮助页面和代码执行结果的样式

perf(web_parser): 添加API缓存和消息去重机制

docs: 更新插件元信息和注释

chore: 移除旧的独立解析器插件文件
This commit is contained in:
2026-01-22 01:58:13 +08:00
parent 5f943c1792
commit 1420d0f0b2
13 changed files with 1665 additions and 995 deletions

View File

@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
from core.managers.command_manager import matcher
from models import MessageEvent
from .parsers.bili import BiliParser
from .parsers.douyin import DouyinParser
from .parsers.github import GitHubParser
# 插件元信息
__plugin_meta__ = {
"name": "web_parser",
"description": "自动解析各种Web链接包括B站、抖音和GitHub仓库",
"usage": "(自动触发)当检测到支持的链接时,自动进行解析"
}
# 初始化解析器实例
bili_parser = BiliParser()
douyin_parser = DouyinParser()
github_parser = GitHubParser()
@matcher.on_message()
async def handle_web_links(event: MessageEvent):
"""
处理消息检测并解析各种Web链接
Args:
event (MessageEvent): 消息事件对象
"""
# 按顺序尝试各个解析器
# 1. 尝试B站解析器
await bili_parser.handle_message(event)
# 2. 尝试抖音解析器
await douyin_parser.handle_message(event)
# 3. 尝试GitHub解析器
await github_parser.handle_message(event)
# 注册GitHub仓库查询命令
@matcher.command("查仓库", "github", "github_repo")
async def handle_github_command(bot, event: MessageEvent):
"""
处理命令调用:/查仓库 作者/仓库名
Args:
bot: 机器人对象
event (MessageEvent): 消息事件对象
"""
# 提取命令参数
command_text = event.raw_message
# 移除命令前缀和命令名
prefix = command_text.split()[0] if command_text.split() else ""
params = command_text[len(prefix):].strip()
if not params:
await event.reply("请输入仓库地址,格式:/查仓库 作者/仓库名")
return
# 解析参数格式
if "/" in params:
owner, repo = params.split("/", 1)
# 移除可能的.git后缀
repo = repo.replace(".git", "")
# 构建仓库URL
repo_url = f"https://github.com/{owner}/{repo}"
# 使用GitHub解析器处理
await github_parser.process_url(event, repo_url)
else:
await event.reply("参数格式错误,请输入:/查仓库 作者/仓库名")

246
plugins/web_parser/base.py Normal file
View File

@@ -0,0 +1,246 @@
# -*- coding: utf-8 -*-
import re
import json
import abc
import aiohttp
from typing import Optional, Dict, Any, List, Union
from cachetools import TTLCache
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
class BaseParser(metaclass=abc.ABCMeta):
"""
解析器基类定义所有web解析器共有的方法和属性
"""
# 插件元信息
__plugin_meta__ = {
"name": "web_parser",
"description": "Web链接解析插件",
"usage": "自动解析各种Web链接"
}
# 请求头
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# 全局共享的ClientSession
_session: Optional[aiohttp.ClientSession] = None
def __init__(self):
"""
初始化解析器
"""
self.name = "Base Parser"
self.url_pattern = re.compile(r"https?://[^\s]+")
@classmethod
def get_session(cls) -> aiohttp.ClientSession:
"""
获取或创建全局的aiohttp ClientSession
Returns:
aiohttp.ClientSession: 客户端会话对象
"""
if cls._session is None or cls._session.closed:
cls._session = aiohttp.ClientSession(headers=cls.HEADERS)
return cls._session
@abc.abstractmethod
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析URL获取信息
Args:
url (str): 要解析的URL
Returns:
Optional[Dict[str, Any]]: 解析结果如果失败则返回None
"""
pass
@abc.abstractmethod
async def get_real_url(self, short_url: str) -> Optional[str]:
"""
获取短链接的真实URL
Args:
short_url (str): 短链接
Returns:
Optional[str]: 真实URL如果失败则返回None
"""
pass
@abc.abstractmethod
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
"""
格式化响应消息
Args:
event (MessageEvent): 消息事件对象
data (Dict[str, Any]): 解析结果数据
Returns:
List[Any]: 消息段列表
"""
pass
def extract_url_from_json_segments(self, segments):
"""
从消息的JSON段中提取URL
Args:
segments: 消息段列表
Returns:
Optional[str]: 提取到的URL或None
"""
for segment in segments:
if segment.type == "json":
logger.info(f"[{self.name}] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if short_url:
logger.success(f"[{self.name}] 成功从JSON卡片中提取到链接: {short_url}")
return short_url
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[{self.name}] 解析JSON失败: {e}")
continue
return None
def extract_url_from_text_segments(self, segments):
"""
从消息的文本段中提取URL
Args:
segments: 消息段列表
Returns:
Optional[str]: 提取到的URL或None
"""
for segment in segments:
if segment.type == "text":
text_content = segment.data.get("text", "")
match = self.url_pattern.search(text_content)
if match:
extracted_url = match.group(0)
logger.success(f"[{self.name}] 成功从文本中提取到链接: {extracted_url}")
return extracted_url
return None
async def process_url(self, event: MessageEvent, url: str):
"""
处理URL获取信息并回复
Args:
event (MessageEvent): 消息事件对象
url (str): 待处理的URL
"""
try:
# 检查是否是短链接
if self.is_short_url(url):
real_url = await self.get_real_url(url)
if not real_url:
logger.error(f"[{self.name}] 无法从 {url} 获取真实URL。")
await event.reply("无法解析短链接。")
return
else:
real_url = url
# 解析URL
data = await self.parse(real_url)
if not data:
logger.error(f"[{self.name}] 无法从 {real_url} 解析信息。")
await event.reply("无法获取链接信息,可能是接口变动或链接不存在。")
return
# 格式化响应
response = await self.format_response(event, data)
if response:
# 发送响应
await event.bot.send_forwarded_messages(target=event, nodes=response)
else:
await event.reply("解析成功,但无法生成响应。")
except Exception as e:
logger.error(f"[{self.name}] 处理链接时发生错误: {e}")
await event.reply("处理链接时发生错误,请稍后再试。")
def is_short_url(self, url: str) -> bool:
"""
判断是否是短链接
Args:
url (str): URL
Returns:
bool: 是否是短链接
"""
short_domains = ["b23.tv", "v.douyin.com", "t.cn", "url.cn"]
return any(domain in url for domain in short_domains)
async def handle_message(self, event: MessageEvent):
"""
处理消息,检测链接并解析
Args:
event (MessageEvent): 消息事件对象
"""
# 消息去重
if event.message_id in self.processed_messages:
return
self.processed_messages[event.message_id] = True
# 忽略机器人自己发送的消息
if event.user_id == event.self_id:
return
# 1. 优先解析JSON卡片中的链接
url_to_process = self.extract_url_from_json_segments(event.message)
# 2. 如果未在JSON卡片中找到链接则在文本消息中查找
if not url_to_process:
url_to_process = self.extract_url_from_text_segments(event.message)
# 3. 如果找到了链接,则进行处理
if url_to_process and self.should_handle_url(url_to_process):
await self.process_url(event, url_to_process)
def should_handle_url(self, url: str) -> bool:
"""
判断是否应该处理该URL
Args:
url (str): URL
Returns:
bool: 是否应该处理
"""
# 基类默认实现,子类应覆盖此方法
return bool(self.url_pattern.search(url))
@staticmethod
def format_count(num: Union[int, str]) -> str:
"""
格式化数字为易读形式
Args:
num (Union[int, str]): 要格式化的数字
Returns:
str: 格式化后的字符串
"""
try:
n = int(num)
if n < 10000:
return str(n)
return f"{n / 10000:.1f}"
except (ValueError, TypeError):
return str(num)

View File

@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from typing import Optional, Dict, Any, List
from bs4 import BeautifulSoup
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from ..base import BaseParser
from ..utils import format_duration, clean_url
from cachetools import TTLCache
class BiliParser(BaseParser):
"""
B站视频解析器
"""
def __init__(self):
super().__init__()
self.name = "B站解析器"
self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
self.nickname = "B站视频解析"
# 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析B站视频信息
Args:
url (str): B站视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
try:
# 清理URL
clean_url = url.split('?')[0]
if '#/' in clean_url:
clean_url = clean_url.split('#/')[0]
session = self.get_session()
async with session.get(clean_url, headers=self.HEADERS, timeout=5) as response:
response.raise_for_status()
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
# 尝试多种方式获取视频数据
# 方式1: 尝试获取 __INITIAL_STATE__
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string:
# 方式2: 尝试获取 __PLAYINFO__
script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__'))
if not script_tag or not script_tag.string:
# 方式3: 尝试获取页面标题和其他信息
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# 提取BV号
bv_match = re.search(r'(BV\w{10})', clean_url)
bvid = bv_match.group(1) if bv_match else '未知BV号'
return {
"title": title.replace('_哔哩哔哩_bilibili', '').strip(),
"bvid": bvid,
"duration": 0,
"cover_url": '',
"play": 0,
"like": 0,
"coin": 0,
"favorite": 0,
"share": 0,
"owner_name": '未知UP主',
"owner_avatar": '',
"followers": 0,
}
return None
# 原始解析逻辑
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string)
if not match:
# 尝试另一种正则表达式
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL)
if not match:
return None
json_str = match.group(1)
# 清理JSON字符串中的潜在问题字符
json_str = json_str.strip().rstrip(';')
try:
data = json.loads(json_str)
except json.JSONDecodeError:
# 如果直接解析失败尝试清理JSON字符串
# 移除可能的注释或无效字符
cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释
cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释
data = json.loads(cleaned_json)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"[{self.name}] 解析视频信息失败: {e}")
logger.debug(f"失败的URL: {url}")
except Exception as e:
logger.error(f"[{self.name}] 解析视频信息时发生未知错误: {e}")
logger.debug(f"失败的URL: {url}")
return None
async def get_real_url(self, short_url: str) -> Optional[str]:
"""
获取B站短链接的真实URL
Args:
short_url (str): B站短链接
Returns:
Optional[str]: 真实URL如果失败则返回None
"""
try:
session = self.get_session()
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302:
return response.headers.get('Location')
except Exception as e:
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
return None
async def get_direct_video_url(self, video_url: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
Args:
video_url (str): B站视频的完整URL
Returns:
Optional[str]: 视频直链URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, headers=self.HEADERS, timeout=10) as response:
response.raise_for_status()
# 使用 content_type=None 来忽略 Content-Type 检查
data = await response.json(content_type=None)
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e:
logger.error(f"[{self.name}] 调用第三方API解析视频失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
"""
格式化B站视频响应消息
Args:
event (MessageEvent): 消息事件对象
data (Dict[str, Any]): 视频信息
Returns:
List[Any]: 消息段列表
"""
# 检查视频时长
if data['duration'] > 1200: # 20分钟 = 1200秒
video_message = "视频时长超过20分钟不进行解析。"
else:
# 构建完整的B站视频URL
video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}"
direct_url = await self.get_direct_video_url(video_url)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
video_message = "视频解析失败,无法获取直链。"
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" UP主: {data['owner_name']}\n"
f" 粉丝: {self.format_count(data['followers'])}\n"
f"--------------------\n"
f" 标题: {data['title']}\n"
f" BV号: {data['bvid']}\n"
f" 时长: {format_duration(data['duration'])}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {self.format_count(data['play'])}\n"
f" 点赞: {self.format_count(data['like'])}\n"
f" 投币: {self.format_count(data['coin'])}\n"
f" 收藏: {self.format_count(data['favorite'])}\n"
f" 转发: {self.format_count(data['share'])}\n"
)
image_message_segment = [
MessageSegment.text("B站封面"),
MessageSegment.image(data['cover_url'])
]
up_info_segment = [
MessageSegment.text("UP主头像"),
MessageSegment.image(data['owner_avatar'])
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=video_message)
]
return nodes
def should_handle_url(self, url: str) -> bool:
"""
判断是否应该处理该URL
Args:
url (str): URL
Returns:
bool: 是否应该处理
"""
# 检查是否是B站相关域名包括短链接
return bool(self.url_pattern.search(url))

View File

@@ -0,0 +1,261 @@
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from typing import Optional, Dict, Any, List
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from ..base import BaseParser
from ..utils import extract_original_text
from cachetools import TTLCache
class DouyinParser(BaseParser):
"""
抖音视频解析器
"""
def __init__(self):
super().__init__()
self.name = "抖音解析器"
self.url_pattern = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE)
self.short_pattern = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE)
self.nickname = "抖音视频解析"
# 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析抖音视频信息
Args:
url (str): 抖音视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
try:
# 使用第三方API解析抖音视频
api_url = f"http://api.xhus.cn/api/douyin?url={url}"
session = self.get_session()
async with session.get(api_url, headers=self.HEADERS, timeout=10) as response:
if response.status != 200:
logger.error(f"[{self.name}] API请求失败状态码: {response.status}")
return None
response_data = await response.json()
if not isinstance(response_data, dict):
logger.error(f"[{self.name}] API返回格式错误: {response_data}")
return None
if response_data.get("code") != 200:
logger.error(f"[{self.name}] API返回错误: {response_data}")
return None
data = response_data.get("data", {})
if not data:
logger.error(f"[{self.name}] API返回数据为空")
return None
# 转换API响应格式
return {
"type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image",
"video_url": data.get("url", ""),
"video_url_HQ": data.get("url", ""),
"nickname": data.get("author", "未知作者"),
"desc": data.get("title", "无描述"),
"aweme_id": data.get("uid", ""),
"like": data.get("like", 0),
"cover": data.get("cover", ""),
"time": data.get("time", 0),
"author_avatar": data.get("avatar", ""),
"music": data.get("music", {}),
}
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"[{self.name}] 解析抖音视频信息失败: {e}")
logger.debug(f"失败的URL: {url}")
except Exception as e:
logger.error(f"[{self.name}] 解析抖音视频时发生未知错误: {e}")
logger.debug(f"失败的URL: {url}")
return None
async def get_real_url(self, short_url: str) -> Optional[str]:
"""
获取抖音短链接的真实URL
Args:
short_url (str): 抖音短链接
Returns:
Optional[str]: 真实URL如果失败则返回None
"""
try:
# 首先尝试获取重定向后的URL
async with aiohttp.ClientSession() as session:
# 添加更多头部信息模拟移动端访问
mobile_headers = self.HEADERS.copy()
mobile_headers.update({
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
# 模拟移动设备的额外头部
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'https://www.douyin.com/'
})
async with session.get(short_url, headers=mobile_headers, allow_redirects=True, timeout=10) as response:
redirected_url = str(response.url)
# 检查重定向后的URL是否包含视频ID
if 'video/' in redirected_url or '/note/' in redirected_url:
logger.info(f"[{self.name}] 重定向后的视频URL: {redirected_url}")
return redirected_url
elif 'share_item' in redirected_url:
logger.info(f"[{self.name}] 重定向后的分享URL: {redirected_url}")
return redirected_url
else:
logger.warning(f"[{self.name}] 重定向到了非预期页面: {redirected_url}")
return redirected_url
except Exception as e:
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
"""
格式化抖音视频响应消息
Args:
event (MessageEvent): 消息事件对象
data (Dict[str, Any]): 视频信息
Returns:
List[Any]: 消息段列表
"""
# 构建回复消息,包含原分享中的文本内容(如果有)
original_text = extract_original_text(event.message, self.url_pattern)
# 构建回复消息
text_parts = ["抖音视频解析"]
text_parts.append("--------------------")
if original_text:
text_parts.append(f" 分享内容: {original_text}")
text_parts.append("--------------------")
text_parts.append(f" 作者: {data['nickname']}")
text_parts.append(f" 抖音号: {data['aweme_id']}")
text_parts.append(f" 标题: {data['desc']}")
text_parts.append(f" 点赞: {self.format_count(data['like'])}")
text_parts.append(f" 类型: {data['type']}")
# 如果是音乐,添加音乐信息
if data.get('music'):
music_info = data['music']
text_parts.append("--------------------")
text_parts.append(" 背景音乐:")
text_parts.append(f" 标题: {music_info.get('title', '')}")
text_parts.append(f" 作者: {music_info.get('author', '')}")
text_parts.append("--------------------")
text_message = "\n".join(text_parts)
# 准备转发消息节点
nodes = []
# 添加文本信息节点
text_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=text_message
)
nodes.append(text_node)
# 添加封面图片节点(如果有)
if data.get('cover'):
try:
cover_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=[
MessageSegment.text("抖音视频封面:\n"),
MessageSegment.image(data['cover'])
]
)
nodes.append(cover_node)
except Exception as e:
logger.warning(f"[{self.name}] 无法添加封面图片: {e}")
# 添加作者头像节点(如果有)
if data.get('author_avatar'):
try:
avatar_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=[
MessageSegment.text("作者头像:\n"),
MessageSegment.image(data['author_avatar'])
]
)
nodes.append(avatar_node)
except Exception as e:
logger.warning(f"[{self.name}] 无法添加作者头像: {e}")
# 尝试添加视频直链(单独节点)
video_success = False
try:
if data.get('video_url'):
video_url = data.get('video_url', '')
# 检查视频类型
if data.get('type') == 'video':
video_message = MessageSegment.video(video_url)
video_type_text = "视频直链:"
else: # image类型
video_message = MessageSegment.image(video_url) # 单个图片
video_type_text = "图集首图:"
# 构建视频/图片节点
video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=[
MessageSegment.text(video_type_text + "\n"),
video_message
]
)
nodes.append(video_node)
video_success = True
except Exception as e:
logger.error(f"[{self.name}] 无法添加视频/图片: {e}")
# 如果无法添加视频,添加提示信息
if not video_success:
no_video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message="视频解析成功,但无法获取直链或播放视频。"
)
nodes.append(no_video_node)
return nodes
def should_handle_url(self, url: str) -> bool:
"""
判断是否应该处理该URL
Args:
url (str): URL
Returns:
bool: 是否应该处理
"""
# 检查是否是抖音相关域名
return ('douyin.com' in url or bool(self.url_pattern.search(url)) or bool(self.short_pattern.search(url)))

View File

@@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from typing import Optional, Dict, Any, List
from cachetools import TTLCache
from core.utils.logger import logger
from core.managers.image_manager import image_manager
from models import MessageEvent, MessageSegment
from ..base import BaseParser
class GitHubParser(BaseParser):
"""
GitHub仓库解析器
"""
def __init__(self):
super().__init__()
self.name = "GitHub解析器"
self.url_pattern = re.compile(r"https?://(?:www\.)?github\.com/([\w\-]+)/([\w\-\.]+)(?:/[^\s]*)?")
self.nickname = "GitHub仓库信息"
# 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
# 缓存GitHub API响应避免频繁请求
self.api_cache = TTLCache(maxsize=100, ttl=3600) # 100个缓存项1小时过期
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析GitHub仓库信息
Args:
url (str): GitHub仓库URL
Returns:
Optional[Dict[str, Any]]: 仓库信息字典如果失败则返回None
"""
# 从URL中提取owner和repo
match = self.url_pattern.search(url)
if not match:
return None
owner = match.group(1)
repo = match.group(2)
# 移除可能的.git后缀
repo = repo.replace(".git", "")
return await self.get_github_repo_info(owner, repo)
async def get_real_url(self, short_url: str) -> Optional[str]:
"""
获取短链接的真实URL
Args:
short_url (str): 短链接
Returns:
Optional[str]: 真实URL如果失败则返回None
"""
try:
session = self.get_session()
async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302:
return response.headers.get('Location')
except Exception as e:
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
return None
async def get_github_repo_info(self, owner: str, repo: str) -> Optional[Dict[str, Any]]:
"""
通过GitHub API获取仓库信息
Args:
owner (str): 仓库所有者用户名
repo (str): 仓库名称
Returns:
Optional[Dict[str, Any]]: 仓库信息字典如果失败则返回None
"""
cache_key = f"{owner}/{repo}"
if cache_key in self.api_cache:
logger.info(f"[{self.name}] 使用缓存的仓库信息: {cache_key}")
return self.api_cache[cache_key]
api_url = f"https://api.github.com/repos/{owner}/{repo}"
try:
session = self.get_session()
async with session.get(api_url, timeout=10) as response:
response.raise_for_status()
repo_data = await response.json()
# 将数据存入缓存
self.api_cache[cache_key] = repo_data
logger.info(f"[{self.name}] 成功获取仓库信息并缓存: {cache_key}")
return repo_data
except aiohttp.ClientError as e:
logger.error(f"[{self.name}] GitHub API请求失败: {e}")
except json.JSONDecodeError as e:
logger.error(f"[{self.name}] 解析GitHub API响应失败: {e}")
except Exception as e:
logger.error(f"[{self.name}] 获取仓库信息时发生未知错误: {e}")
return None
async def generate_repo_image(self, repo_data: Dict[str, Any]) -> Optional[str]:
"""
使用Jinja2模板渲染仓库信息为图片
Args:
repo_data (Dict[str, Any]): 仓库信息字典
Returns:
Optional[str]: 生成的图片Base64编码如果失败则返回None
"""
try:
# 准备模板数据
template_data = {
"full_name": repo_data.get("full_name", ""),
"description": repo_data.get("description", "暂无描述"),
"owner_avatar": repo_data.get("owner", {}).get("avatar_url", ""),
"stargazers_count": repo_data.get("stargazers_count", 0),
"forks_count": repo_data.get("forks_count", 0),
"open_issues_count": repo_data.get("open_issues_count", 0),
"watchers_count": repo_data.get("watchers_count", 0),
}
# 渲染模板为图片,使用高质量设置
base64_image = await image_manager.render_template_to_base64(
template_name="github_repo.html",
data=template_data,
output_name=f"github_{repo_data.get('name', 'repo')}.png",
quality=100,
image_type="png"
)
return base64_image
except Exception as e:
logger.error(f"[{self.name}] 生成仓库信息图片失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
"""
格式化GitHub仓库响应消息
Args:
event (MessageEvent): 消息事件对象
data (Dict[str, Any]): 仓库信息
Returns:
List[Any]: 消息段列表
"""
nodes = []
# 生成图片
image_base64 = await self.generate_repo_image(data)
if image_base64:
# 发送图片
image_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=MessageSegment.image(image_base64)
)
nodes.append(image_node)
else:
# 如果图片生成失败,发送文本信息
text_message = (
f"GitHub 仓库信息\n"
f"--------------------\n"
f"仓库: {data.get('full_name', '')}\n"
f"描述: {data.get('description', '暂无描述')}\n"
f"--------------------\n"
f"数据:\n"
f" 星标: {data.get('stargazers_count', 0)}\n"
f" Fork: {data.get('forks_count', 0)}\n"
f" Issues: {data.get('open_issues_count', 0)}\n"
f" 关注: {data.get('watchers_count', 0)}\n"
)
text_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=text_message
)
nodes.append(text_node)
return nodes
def should_handle_url(self, url: str) -> bool:
"""
判断是否应该处理该URL
Args:
url (str): URL
Returns:
bool: 是否应该处理
"""
# 检查是否是GitHub相关域名
return bool(self.url_pattern.search(url)) and 'github.com' in url

144
plugins/web_parser/utils.py Normal file
View File

@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
import re
import json
from typing import Optional, Dict, Any, Union, List
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
def format_duration(seconds: int) -> str:
"""
将秒数格式化为 MM:SS 的形式
Args:
seconds (int): 秒数
Returns:
str: 格式化后的时间字符串
"""
if not isinstance(seconds, int) or seconds < 0:
return "00:00"
minutes, seconds = divmod(seconds, 60)
return f"{minutes:02d}:{seconds:02d}"
def clean_url(url: str) -> str:
"""
清理URL去掉不必要的查询参数
Args:
url (str): 原始URL
Returns:
str: 清理后的URL
"""
clean_url = url.split('?')[0]
if '#/' in clean_url:
clean_url = clean_url.split('#/')[0]
return clean_url
def extract_original_text(segments: List[Any], url_pattern: re.Pattern) -> str:
"""
从消息段中提取原始文本(去除链接)
Args:
segments (List[Any]): 消息段列表
url_pattern (re.Pattern): URL正则表达式模式
Returns:
str: 提取的原始文本
"""
for segment in segments:
if segment.type == "text":
text_content = segment.data.get("text", "")
# 移除链接
cleaned_text = re.sub(url_pattern, '', text_content)
# 移除常见的分享提示
cleaned_text = re.sub(r'复制此链接.*?打开.*?搜索.*?直接观看视频!', '', cleaned_text)
cleaned_text = cleaned_text.strip()
if cleaned_text:
return cleaned_text
return ""
def build_forward_nodes(event: MessageEvent, nickname: str, messages: List[Any]) -> List[Any]:
"""
构建转发消息节点
Args:
event (MessageEvent): 消息事件对象
nickname (str): 发送者昵称
messages (List[Any]): 消息内容列表
Returns:
List[Any]: 转发消息节点列表
"""
nodes = []
for msg in messages:
if isinstance(msg, str):
node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=nickname,
message=msg
)
nodes.append(node)
elif isinstance(msg, list):
node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=nickname,
message=msg
)
nodes.append(node)
return nodes
def safe_get(data: Dict[str, Any], keys: List[str], default: Any = None) -> Any:
"""
安全地从嵌套字典中获取值
Args:
data (Dict[str, Any]): 嵌套字典
keys (List[str]): 键路径列表
default (Any, optional): 默认值. Defaults to None.
Returns:
Any: 获取的值或默认值
"""
result = data
for key in keys:
if isinstance(result, dict) and key in result:
result = result[key]
else:
return default
return result
def normalize_url(url: str) -> str:
"""
规范化URL
Args:
url (str): 原始URL
Returns:
str: 规范化后的URL
"""
if not url.startswith('http'):
url = 'https://' + url
return url
def validate_url(url: str) -> bool:
"""
验证URL格式是否正确
Args:
url (str): URL
Returns:
bool: URL格式是否正确
"""
url_pattern = re.compile(r'https?://[^]+')
return bool(url_pattern.match(url))