Merge branch 'main' into dev
This commit is contained in:
228
plugins/github_parser.py
Normal file
228
plugins/github_parser.py
Normal file
@@ -0,0 +1,228 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
import json
|
||||
import aiohttp
|
||||
from typing import Optional, Dict, Any, Union
|
||||
from cachetools import TTLCache
|
||||
|
||||
from core.utils.logger import logger
|
||||
from core.managers.command_manager import matcher
|
||||
from core.managers.image_manager import image_manager
|
||||
from models import MessageEvent, MessageSegment
|
||||
|
||||
# 插件元数据
|
||||
__plugin_meta__ = {
|
||||
"name": "github_parser",
|
||||
"description": "自动解析GitHub仓库链接,或通过命令查询仓库信息。",
|
||||
"usage": "(自动触发)当检测到GitHub仓库链接时,自动发送仓库信息。\n(命令触发)/查仓库 作者/仓库名",
|
||||
}
|
||||
|
||||
# 常量定义
|
||||
GITHUB_NICKNAME = "GitHub仓库信息"
|
||||
|
||||
HEADERS = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||||
}
|
||||
|
||||
# 全局共享的 ClientSession
|
||||
_session: Optional[aiohttp.ClientSession] = None
|
||||
|
||||
# 缓存GitHub API响应,避免频繁请求
|
||||
api_cache = TTLCache(maxsize=100, ttl=3600) # 100个缓存项,1小时过期
|
||||
|
||||
|
||||
def get_session() -> aiohttp.ClientSession:
|
||||
"""
|
||||
获取或创建全局的aiohttp ClientSession
|
||||
|
||||
Returns:
|
||||
aiohttp.ClientSession: 客户端会话对象
|
||||
"""
|
||||
global _session
|
||||
if _session is None or _session.closed:
|
||||
_session = aiohttp.ClientSession(headers=HEADERS)
|
||||
return _session
|
||||
|
||||
|
||||
async def get_github_repo_info(owner: str, repo: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
通过GitHub API获取仓库信息
|
||||
|
||||
Args:
|
||||
owner (str): 仓库所有者用户名
|
||||
repo (str): 仓库名称
|
||||
|
||||
Returns:
|
||||
Optional[Dict[str, Any]]: 仓库信息字典,如果失败则返回None
|
||||
"""
|
||||
cache_key = f"{owner}/{repo}"
|
||||
if cache_key in api_cache:
|
||||
logger.info(f"[github_parser] 使用缓存的仓库信息: {cache_key}")
|
||||
return api_cache[cache_key]
|
||||
|
||||
api_url = f"https://api.github.com/repos/{owner}/{repo}"
|
||||
try:
|
||||
session = get_session()
|
||||
async with session.get(api_url, timeout=10) as response:
|
||||
response.raise_for_status()
|
||||
repo_data = await response.json()
|
||||
|
||||
# 将数据存入缓存
|
||||
api_cache[cache_key] = repo_data
|
||||
logger.info(f"[github_parser] 成功获取仓库信息并缓存: {cache_key}")
|
||||
return repo_data
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
logger.error(f"[github_parser] GitHub API请求失败: {e}")
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"[github_parser] 解析GitHub API响应失败: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"[github_parser] 获取仓库信息时发生未知错误: {e}")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def generate_repo_image(repo_data: Dict[str, Any]) -> Optional[str]:
|
||||
"""
|
||||
使用Jinja2模板渲染仓库信息为图片
|
||||
|
||||
Args:
|
||||
repo_data (Dict[str, Any]): 仓库信息字典
|
||||
|
||||
Returns:
|
||||
Optional[str]: 生成的图片Base64编码,如果失败则返回None
|
||||
"""
|
||||
try:
|
||||
# 准备模板数据
|
||||
template_data = {
|
||||
"full_name": repo_data.get("full_name", ""),
|
||||
"description": repo_data.get("description", "暂无描述"),
|
||||
"owner_avatar": repo_data.get("owner", {}).get("avatar_url", ""),
|
||||
"stargazers_count": repo_data.get("stargazers_count", 0),
|
||||
"forks_count": repo_data.get("forks_count", 0),
|
||||
"open_issues_count": repo_data.get("open_issues_count", 0),
|
||||
"watchers_count": repo_data.get("watchers_count", 0),
|
||||
}
|
||||
|
||||
# 渲染模板为图片,使用高质量设置
|
||||
base64_image = await image_manager.render_template_to_base64(
|
||||
template_name="github_repo.html",
|
||||
data=template_data,
|
||||
output_name=f"github_{repo_data.get('name', 'repo')}.png",
|
||||
quality=100, # 使用最高质量
|
||||
image_type="png" # PNG格式为无损压缩
|
||||
)
|
||||
|
||||
return base64_image
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[github_parser] 生成仓库信息图片失败: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def process_github_repo(event: MessageEvent, owner: str, repo: str):
|
||||
"""
|
||||
处理GitHub仓库信息查询,获取信息并回复
|
||||
|
||||
Args:
|
||||
event (MessageEvent): 消息事件对象
|
||||
owner (str): 仓库所有者用户名
|
||||
repo (str): 仓库名称
|
||||
"""
|
||||
try:
|
||||
# 获取仓库信息
|
||||
repo_data = await get_github_repo_info(owner, repo)
|
||||
if not repo_data:
|
||||
logger.error(f"[github_parser] 无法获取仓库信息: {owner}/{repo}")
|
||||
await event.reply("无法获取仓库信息,可能是仓库不存在或网络问题。")
|
||||
return
|
||||
|
||||
# 生成图片
|
||||
image_base64 = await generate_repo_image(repo_data)
|
||||
if image_base64:
|
||||
# 发送图片
|
||||
await event.reply(MessageSegment.image(image_base64))
|
||||
else:
|
||||
# 如果图片生成失败,发送文本信息
|
||||
text_message = (
|
||||
f"GitHub 仓库信息\n"
|
||||
f"--------------------\n"
|
||||
f"仓库: {repo_data.get('full_name', '')}\n"
|
||||
f"描述: {repo_data.get('description', '暂无描述')}\n"
|
||||
f"--------------------\n"
|
||||
f"数据:\n"
|
||||
f" 星标: {repo_data.get('stargazers_count', 0)}\n"
|
||||
f" Fork: {repo_data.get('forks_count', 0)}\n"
|
||||
f" Issues: {repo_data.get('open_issues_count', 0)}\n"
|
||||
f" 关注: {repo_data.get('watchers_count', 0)}\n"
|
||||
)
|
||||
await event.reply(text_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[github_parser] 处理仓库信息时发生错误: {e}")
|
||||
await event.reply("处理仓库信息时发生错误,请稍后再试。")
|
||||
|
||||
|
||||
# GitHub仓库链接正则表达式
|
||||
GITHUB_URL_PATTERN = re.compile(r"https?://(?:www\.)?github\.com/([\w\-]+)/([\w\-\.]+)(?:/[^\s]*)?")
|
||||
|
||||
|
||||
# 注册命令处理器
|
||||
@matcher.command("查仓库", "github", "github_repo")
|
||||
async def handle_github_command(bot, event: MessageEvent):
|
||||
"""
|
||||
处理命令调用:/查仓库 作者/仓库名
|
||||
|
||||
Args:
|
||||
bot: 机器人对象
|
||||
event (MessageEvent): 消息事件对象
|
||||
"""
|
||||
# 提取命令参数
|
||||
command_text = event.raw_message
|
||||
# 移除命令前缀和命令名
|
||||
prefix = command_text.split()[0] if command_text.split() else ""
|
||||
params = command_text[len(prefix):].strip()
|
||||
|
||||
if not params:
|
||||
await event.reply("请输入仓库地址,格式:/查仓库 作者/仓库名")
|
||||
return
|
||||
|
||||
# 解析参数格式
|
||||
if "/" in params:
|
||||
owner, repo = params.split("/", 1)
|
||||
# 移除可能的.git后缀
|
||||
repo = repo.replace(".git", "")
|
||||
await process_github_repo(event, owner, repo)
|
||||
else:
|
||||
await event.reply("参数格式错误,请输入:/查仓库 作者/仓库名")
|
||||
|
||||
|
||||
# 注册消息处理器
|
||||
@matcher.on_message()
|
||||
async def handle_github_link(event: MessageEvent):
|
||||
"""
|
||||
处理消息,检测GitHub仓库链接并自动解析
|
||||
|
||||
Args:
|
||||
event (MessageEvent): 消息事件对象
|
||||
"""
|
||||
# 忽略机器人自己发送的消息,防止无限循环
|
||||
if hasattr(event, "user_id") and hasattr(event, "self_id") and event.user_id == event.self_id:
|
||||
return
|
||||
|
||||
# 提取消息文本
|
||||
message_text = ""
|
||||
for segment in event.message:
|
||||
if segment.type == "text":
|
||||
message_text += segment.data.get("text", "")
|
||||
|
||||
# 查找GitHub仓库链接
|
||||
match = GITHUB_URL_PATTERN.search(message_text)
|
||||
if match:
|
||||
owner = match.group(1)
|
||||
repo = match.group(2)
|
||||
# 移除可能的.git后缀
|
||||
repo = repo.replace(".git", "")
|
||||
|
||||
logger.info(f"[github_parser] 检测到GitHub仓库链接: {owner}/{repo}")
|
||||
await process_github_repo(event, owner, repo)
|
||||
Reference in New Issue
Block a user