Python 代码执行结果
+{{ timestamp }}
+diff --git a/core/utils/executor.py b/core/utils/executor.py index 79f2103..1c9843d 100644 --- a/core/utils/executor.py +++ b/core/utils/executor.py @@ -110,7 +110,8 @@ class CodeExecutor: logger.error(f"[CodeExecutor] 镜像 '{self.sandbox_image}' 不存在!") result_message = f"执行失败:沙箱基础镜像 '{self.sandbox_image}' 不存在,请联系管理员构建。" except docker.errors.ContainerError as e: - error_output = e.stderr.decode('utf-8').strip() + # 确保 stderr 是字符串 + error_output = e.stderr.decode('utf-8').strip() if isinstance(e.stderr, bytes) else e.stderr.strip() result_message = f"代码执行出错:\n{error_output}" logger.warning(f"[CodeExecutor] 代码执行时发生错误: {error_output}") except docker.errors.APIError as e: @@ -124,7 +125,11 @@ class CodeExecutor: result_message = "执行引擎发生内部错误,请联系管理员。" # 调用回调函数回复结果 - await task['callback'](result_message) + try: + await task['callback'](result_message) + except Exception as callback_error: + logger.error(f"[CodeExecutor] 执行回调函数时发生错误: {callback_error}") + # 即使回调失败,也要确保任务被标记为完成 self.task_queue.task_done() @@ -160,8 +165,10 @@ class CodeExecutor: # 5. 检查退出码,如果不为 0,则手动抛出 ContainerError if result.get('StatusCode', 0) != 0: + # 确保 stderr 是字符串 + error_message = stderr.decode('utf-8') if isinstance(stderr, bytes) else stderr raise docker.errors.ContainerError( - container, result['StatusCode'], f"python -c '{code}'", self.sandbox_image, stderr.decode('utf-8') + container, result['StatusCode'], f"python -c '{code}'", self.sandbox_image, error_message ) return stdout diff --git a/plugins/bili_parser.py b/plugins/bili_parser.py deleted file mode 100644 index 5ea5003..0000000 --- a/plugins/bili_parser.py +++ /dev/null @@ -1,340 +0,0 @@ -# -*- coding: utf-8 -*- -import re -import json -import aiohttp -from bs4 import BeautifulSoup -from typing import Optional, Dict, Any, Union -from cachetools import TTLCache - -from core.utils.logger import logger -from core.managers.command_manager import matcher -from models import MessageEvent, MessageSegment - -# 创建一个TTL缓存,最大容量100,缓存时间10秒 -processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) - -# 插件元数据 -__plugin_meta__ = { - "name": "bili_parser", - "description": "自动解析B站分享卡片,提取视频封面和播放量等信息。", - "usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。", -} - -# 常量定义 -BILI_NICKNAME = "B站视频解析" - -HEADERS = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' -} - -# 全局共享的 ClientSession -_session: Optional[aiohttp.ClientSession] = None - -def get_session() -> aiohttp.ClientSession: - global _session - if _session is None or _session.closed: - _session = aiohttp.ClientSession(headers=HEADERS) - return _session - - -def format_count(num: int) -> str: - if not isinstance(num, int): - return str(num) - if num < 10000: - return str(num) - return f"{num / 10000:.1f}万" - - -def format_duration(seconds: int) -> str: - """将秒数格式化为 MM:SS 的形式""" - if not isinstance(seconds, int) or seconds < 0: - return "滚木" - minutes, seconds = divmod(seconds, 60) - return f"{minutes:02d}:{seconds:02d}" - - -async def get_real_url(short_url: str) -> Optional[str]: - try: - session = get_session() - async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response: - if response.status == 302: - return response.headers.get('Location') - except Exception as e: - logger.error(f"获取真实URL失败: {e}") - return None - -async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: - try: - # 清理URL,去掉不必要的查询参数,只保留基本的视频URL - clean_url = video_url.split('?')[0] - if '#/' in clean_url: - clean_url = clean_url.split('#/')[0] - - session = get_session() - async with session.get(clean_url, headers=HEADERS, timeout=5) as response: - response.raise_for_status() - text = await response.text() - soup = BeautifulSoup(text, 'html.parser') - - # 尝试多种方式获取视频数据 - # 方式1: 尝试获取 __INITIAL_STATE__ - script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) - if not script_tag or not script_tag.string: - # 方式2: 尝试获取 __PLAYINFO__ - script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__')) - - if not script_tag or not script_tag.string: - # 方式3: 尝试获取页面标题和其他信息 - title_tag = soup.find('title') - if title_tag: - title = title_tag.get_text().strip() - # 提取BV号 - bv_match = re.search(r'(BV\w{10})', clean_url) - bvid = bv_match.group(1) if bv_match else '未知BV号' - - return { - "title": title.replace('_哔哩哔哩_bilibili', '').strip(), - "bvid": bvid, - "duration": 0, - "cover_url": '', - "play": 0, - "like": 0, - "coin": 0, - "favorite": 0, - "share": 0, - "owner_name": '未知UP主', - "owner_avatar": '', - "followers": 0, - } - return None - - # 原始解析逻辑 - match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string) - if not match: - # 尝试另一种正则表达式 - match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL) - - if not match: - return None - - json_str = match.group(1) - # 清理JSON字符串中的潜在问题字符 - json_str = json_str.strip().rstrip(';') - - try: - data = json.loads(json_str) - except json.JSONDecodeError: - # 如果直接解析失败,尝试清理JSON字符串 - # 移除可能的注释或无效字符 - cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号 - cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释 - cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释 - data = json.loads(cleaned_json) - - video_data = data.get('videoData', {}) - up_data = data.get('upData', {}) - stat = video_data.get('stat', {}) - owner = video_data.get('owner', {}) - - cover_url = video_data.get('pic', '') - if cover_url: - cover_url = cover_url.split('@')[0] - if cover_url.startswith('//'): - cover_url = 'https:' + cover_url - - owner_avatar = owner.get('face', '') - if owner_avatar: - if owner_avatar.startswith('//'): - owner_avatar = 'https:' + owner_avatar - owner_avatar = owner_avatar.split('@')[0] - - return { - "title": video_data.get('title', '未知标题'), - "bvid": video_data.get('bvid', '未知BV号'), - "duration": video_data.get('duration', 0), - "cover_url": cover_url, - "play": stat.get('view', 0), - "like": stat.get('like', 0), - "coin": stat.get('coin', 0), - "favorite": stat.get('favorite', 0), - "share": stat.get('share', 0), - "owner_name": owner.get('name', '未知UP主'), - "owner_avatar": owner_avatar, - "followers": up_data.get('fans', 0), - } - - except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: - logger.error(f"解析视频信息失败: {e}") - logger.debug(f"失败的URL: {video_url}") - except Exception as e: - logger.error(f"解析视频信息时发生未知错误: {e}") - logger.debug(f"失败的URL: {video_url}") - - return None - -async def get_direct_video_url(video_url: str) -> Optional[str]: - """ - 调用第三方API解析B站视频直链 - :param video_url: B站视频的完整URL - :return: 视频直链URL,如果失败则返回None - """ - api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json" - try: - async with aiohttp.ClientSession() as session: - async with session.get(api_url, headers=HEADERS, timeout=10) as response: - response.raise_for_status() - # 使用 content_type=None 来忽略 Content-Type 检查 - # 因为 API 返回 text/json 而不是标准的 application/json - data = await response.json(content_type=None) - if data.get("code") == 200 and data.get("data"): - return data["data"][0].get("video_url") - except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e: - logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}") - return None - -BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)") - - -def extract_url_from_json_segments(segments): - """ - 从消息的JSON段中提取B站链接 - :param segments: 消息段列表 - :return: 提取到的URL或None - """ - for segment in segments: - if segment.type == "json": - logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}") - try: - json_data = json.loads(segment.data.get("data", "{}")) - short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl") - - if short_url and "b23.tv" in short_url: - extracted_url = short_url.split('?')[0] - logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {extracted_url}") - return extracted_url - except (json.JSONDecodeError, KeyError) as e: - logger.error(f"[bili_parser] 解析JSON失败: {e}") - continue - return None - -def extract_url_from_text_segments(segments): - """ - 从消息的文本段中提取B站链接 - :param segments: 消息段列表 - :return: 提取到的URL或None - """ - for segment in segments: - if segment.type == "text": - text_content = segment.data.get("text", "") - match = BILI_URL_PATTERN.search(text_content) - if match: - extracted_url = match.group(0) - logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {extracted_url}") - return extracted_url - return None - -@matcher.on_message() -async def handle_bili_share(event: MessageEvent): - """ - 处理消息,检测B站分享链接(JSON卡片或文本链接)并进行解析。 - :param event: 消息事件对象 - """ - # 消息去重 - if event.message_id in processed_messages: - return - processed_messages[event.message_id] = True - - # 忽略机器人自己发送的消息,防止无限循环 - if event.user_id == event.self_id: - return - - # 1. 优先解析JSON卡片中的短链接 - url_to_process = extract_url_from_json_segments(event.message) - - # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 - if not url_to_process: - url_to_process = extract_url_from_text_segments(event.message) - - # 3. 如果找到了任何类型的B站链接,则进行处理 - if url_to_process: - await process_bili_link(event, url_to_process) - -async def process_bili_link(event: MessageEvent, url: str): - """ - 处理B站链接(长链接或短链接),获取信息并回复 - :param event: 消息事件对象 - :param url: 待处理的B站链接 - """ - try: - if "b23.tv" in url: - real_url = await get_real_url(url) - if not real_url: - logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。") - await event.reply("无法解析B站短链接。") - return - else: - # 清理URL,移除复杂查询参数,只保留基本的视频URL - real_url = url.split('?')[0] - if '#/' in real_url: - real_url = real_url.split('#/')[0] - - video_info = await parse_video_info(real_url) - if not video_info: - logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") - await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") - return - except Exception as e: - logger.error(f"[bili_parser] 处理B站链接时发生错误: {e}") - await event.reply("处理B站链接时发生错误,请稍后再试。") - return - - # 检查视频时长 - video_message: Union[str, MessageSegment] - if video_info['duration'] > 1200: # 5分钟 = 300秒 - video_message = "视频时长超过5分钟,不进行解析。" - else: - direct_url = await get_direct_video_url(real_url) - if direct_url: - video_message = MessageSegment.video(direct_url) - else: - video_message = "视频解析失败,无法获取直链。" - - text_message = ( - f"BiliBili 视频解析\n" - f"--------------------\n" - f" UP主: {video_info['owner_name']}\n" - f" 粉丝: {format_count(video_info['followers'])}\n" - f"--------------------\n" - f" 标题: {video_info['title']}\n" - f" BV号: {video_info['bvid']}\n" - f" 时长: {format_duration(video_info['duration'])}\n" - f"--------------------\n" - f" 数据:\n" - f" 播放: {format_count(video_info['play'])}\n" - f" 点赞: {format_count(video_info['like'])}\n" - f" 投币: {format_count(video_info['coin'])}\n" - f" 收藏: {format_count(video_info['favorite'])}\n" - f" 转发: {format_count(video_info['share'])}\n" - f" B站链接: {url}" - ) - - image_message_segment = [ - MessageSegment.text("B站封面:"), - MessageSegment.image(video_info['cover_url']) - ] - - up_info_segment = [ - MessageSegment.text("UP主头像:"), - MessageSegment.image(video_info['owner_avatar']) - ] - - nodes = [ - event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=text_message), - event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=image_message_segment), - event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=up_info_segment), - event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=video_message) - ] - - logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}") - # 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊 - await event.bot.send_forwarded_messages(target=event, nodes=nodes) diff --git a/plugins/code_py.py b/plugins/code_py.py index 6119f5e..5f241b2 100644 --- a/plugins/code_py.py +++ b/plugins/code_py.py @@ -3,15 +3,19 @@ import html import textwrap import asyncio from typing import Dict +import datetime +import sys from core.managers.command_manager import matcher from models.events.message import MessageEvent from core.permission import Permission from core.utils.logger import logger +from core.managers.image_manager import image_manager +from models.message import MessageSegment __plugin_meta__ = { "name": "Python 代码执行", - "description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和转发回复。", + "description": "在安全的沙箱环境中执行 Python 代码片段,支持单行、多行和图片输出。", "usage": "/py <单行代码>\n/code_py <单行代码>\n/py (进入多行输入模式)", } @@ -19,48 +23,88 @@ __plugin_meta__ = { # 结构: {(user_id, group_id): asyncio.TimerHandle} multi_line_sessions: Dict[tuple, asyncio.TimerHandle] = {} -async def reply_as_forward(event: MessageEvent, input_code: str, output_result: str): +async def generate_and_send_code_image(event: MessageEvent, input_code: str, output_result: str): """ - 将输入和输出打包成转发消息进行回复。 - 参考 forward_test.py 的实现,兼容私聊和群聊。 + 生成代码执行结果的图片并发送,如果发送失败则降级为文本消息。 + + Args: + event (MessageEvent): 消息事件对象 + input_code (str): 用户输入的代码 + output_result (str): 代码执行结果 """ - bot = event.bot - - # 1. 构建消息节点列表 - nodes = [ - bot.build_forward_node( - user_id=event.user_id, - nickname=event.sender.nickname if event.sender else str(event.user_id), - message=f"--- Your Code ---\n{input_code}" - ), - bot.build_forward_node( - user_id=event.self_id, - nickname="Code Executor", - message=f"--- Execution Result ---\n{output_result}" - ) - ] - try: - # 2. 发送合并转发消息 - await bot.send_forwarded_messages(event, nodes) + # 准备模板数据 + user_nickname = event.sender.nickname if event.sender else str(event.user_id) + user_id = event.user_id + avatar_initial = user_nickname[0] if user_nickname else "U" + + # 构建QQ头像URL + qq_avatar_url = f"https://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640" + + template_data = { + "user_nickname": user_nickname, + "user_id": user_id, + "avatar_initial": avatar_initial, + "qq_avatar_url": qq_avatar_url, + "code": input_code, + "result": output_result, + "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "execution_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", + "result_title": "执行成功" if "Traceback" not in output_result and "Error" not in output_result else "执行出错", + "result_class": "result-success" if "Traceback" not in output_result and "Error" not in output_result else "result-error" + } + + # 渲染模板为图片 + image_base64 = await image_manager.render_template_to_base64( + template_name="code_execution.html", + data=template_data, + output_name=f"code_execution_{event.user_id}_{int(datetime.datetime.now().timestamp())}.png", + quality=90, + image_type="png" + ) + + if image_base64: + # 发送图片 + await event.reply(MessageSegment.image(image_base64)) + else: + # 如果图片生成失败,降级为文本消息 + await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}") + except Exception as e: - logger.error(f"[code_py] 发送转发消息失败: {e}") - # 降级为普通消息回复 + logger.error(f"[code_py] 生成代码执行图片失败: {e}") + # 降级为文本消息 await event.reply(f"--- 你的代码 ---\n{input_code}\n--- 执行结果 ---\n{output_result}") async def execute_code(event: MessageEvent, code: str): """ 核心代码执行逻辑。 + + Args: + event (MessageEvent): 消息事件对象 + code (str): 要执行的Python代码 """ code_executor = getattr(event.bot, 'code_executor', None) if not code_executor or not code_executor.docker_client: await event.reply("代码执行服务当前不可用,请检查 Docker 连接配置。") return - # 修改 add_task,让它能直接接收回复函数 + # 定义一个包装回调函数,确保正确处理异步操作和异常 + async def callback_wrapper(result): + try: + await generate_and_send_code_image(event, code, result) + except Exception as e: + logger.error(f"[code_py] 执行回调时发生错误: {e}") + # 即使回调失败,也要确保任务被标记为完成 + # 降级为简单文本回复 + try: + await event.reply(f"代码执行结果:\n{result}") + except Exception as reply_error: + logger.error(f"[code_py] 发送降级回复时也失败: {reply_error}") + await code_executor.add_task( code, - lambda result: reply_as_forward(event, code, result) + callback_wrapper ) await event.reply("代码已提交至沙箱执行队列,请稍候...") diff --git a/plugins/douyin_parser.py b/plugins/douyin_parser.py deleted file mode 100644 index 5fe6a88..0000000 --- a/plugins/douyin_parser.py +++ /dev/null @@ -1,391 +0,0 @@ -# -*- coding: utf-8 -*- -import re -import json -import aiohttp -from typing import Optional, Dict, Any, Union -from cachetools import TTLCache - -from core.utils.logger import logger -from core.managers.command_manager import matcher -from models import MessageEvent, MessageSegment - -# 创建一个TTL缓存,最大容量100,缓存时间10秒 -processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) - -# 插件元数据 -__plugin_meta__ = { - "name": "douyin_parser", - "description": "自动解析抖音分享链接,提取视频信息和直链。", - "usage": "(自动触发)当检测到抖音分享链接时,自动发送视频信息。", -} - -# 常量定义 -DOUYIN_NICKNAME = "抖音视频解析" - -HEADERS = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', - 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', - 'Accept-Encoding': 'gzip, deflate, br', # 重新启用br编码支持 - 'Connection': 'keep-alive', - 'Upgrade-Insecure-Requests': '1' -} - -# 全局共享的 ClientSession -_session: Optional[aiohttp.ClientSession] = None - -async def get_session() -> aiohttp.ClientSession: - global _session - if _session is None or _session.closed: - _session = aiohttp.ClientSession(headers=HEADERS) - return _session - - -def format_count(num: Union[int, str]) -> str: - try: - n = int(num) - if n < 10000: - return str(n) - return f"{n / 10000:.1f}万" - except (ValueError, TypeError): - return str(num) - - -DOUYIN_URL_PATTERN = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线 -DOUYIN_SHORT_PATTERN = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线 - - -def extract_url_from_json_segments(segments): - """ - 从消息的JSON段中提取抖音链接 - :param segments: 消息段列表 - :return: 提取到的URL或None - """ - for segment in segments: - if segment.type == "json": - logger.info(f"[douyin_parser] 检测到JSON CQ码: {segment.data}") - try: - json_data = json.loads(segment.data.get("data", "{}")) - # 检查是否是抖音分享卡片 - meta = json_data.get("meta", {}) - if "detail_1" in meta: - detail = meta["detail_1"] - if "qqdocurl" in detail: - url = detail["qqdocurl"] - if "douyin.com" in url or "iesdouyin.com" in url: - logger.success(f"[douyin_parser] 成功从JSON卡片中提取到抖音链接: {url}") - return url - except (json.JSONDecodeError, KeyError) as e: - logger.error(f"[douyin_parser] 解析JSON失败: {e}") - continue - return None - - -def extract_url_from_text_segments(segments): - """ - 从消息的文本段中提取抖音链接 - :param segments: 消息段列表 - :return: 提取到的URL或None - """ - for segment in segments: - if segment.type == "text": - text_content = segment.data.get("text", "") - # 查找抖音链接 - match = DOUYIN_URL_PATTERN.search(text_content) - if match: - extracted_url = match.group(0) - logger.success(f"[douyin_parser] 成功从文本中提取到抖音链接: {extracted_url}") - return extracted_url - # 也检查是否有v.douyin.com格式的链接 - short_match = DOUYIN_SHORT_PATTERN.search(text_content) - if short_match: - extracted_url = short_match.group(0) - logger.success(f"[douyin_parser] 成功从文本中提取到抖音短链接: {extracted_url}") - return extracted_url - return None - - -@matcher.on_message() -async def handle_douyin_share(event: MessageEvent): - """ - 处理消息,检测抖音分享链接(JSON卡片或文本链接)并进行解析。 - :param event: 消息事件对象 - """ - # 消息去重 - if event.message_id in processed_messages: - return - processed_messages[event.message_id] = True - - # 忽略机器人自己发送的消息,防止无限循环 - if event.user_id == event.self_id: - return - - # 1. 优先解析JSON卡片中的链接 - url_to_process = extract_url_from_json_segments(event.message) - - # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 - if not url_to_process: - url_to_process = extract_url_from_text_segments(event.message) - - # 3. 如果找到了抖音链接,则进行处理 - if url_to_process: - await process_douyin_link(event, url_to_process) - - -async def get_real_url(short_url: str) -> Optional[str]: - """ - 获取抖音短链接的真实URL - :param short_url: 抖音短链接 - :return: 真实URL或None - """ - try: - # 首先尝试获取重定向后的URL - async with aiohttp.ClientSession() as session: - # 添加更多头部信息模拟移动端访问 - mobile_headers = HEADERS.copy() # 使用更新后的完整请求头 - mobile_headers.update({ - 'Sec-Fetch-Dest': 'document', - 'Sec-Fetch-Mode': 'navigate', - 'Sec-Fetch-Site': 'none', - 'Cache-Control': 'max-age=0', - # 模拟移动设备的额外头部 - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1', - 'X-Requested-With': 'XMLHttpRequest', - 'Referer': 'https://www.douyin.com/' - }) - - async with session.get(short_url, headers=mobile_headers, allow_redirects=True, timeout=10) as response: - redirected_url = str(response.url) - - # 检查重定向后的URL是否包含视频ID - # 抖音视频页通常包含 aweme_id 或 sec_uid 参数 - if 'video/' in redirected_url or '/note/' in redirected_url: - logger.info(f"[douyin_parser] 重定向后的视频URL: {redirected_url}") - return redirected_url - elif 'share_item' in redirected_url: - # 如果URL中有share_item参数,尝试从中提取视频信息 - logger.info(f"[douyin_parser] 重定向后的分享URL: {redirected_url}") - return redirected_url - else: - # 如果重定向到了主页或其他非视频页面,尝试从响应中提取信息 - logger.warning(f"[douyin_parser] 重定向到了非预期页面: {redirected_url}") - return redirected_url - - except Exception as e: - logger.error(f"[douyin_parser] 获取真实URL失败: {e}") - return None - - -async def parse_douyin_video(video_url: str) -> Optional[Dict[str, Any]]: - """ - 解析抖音视频信息 - :param video_url: 抖音视频链接 - :return: 视频信息字典或None - """ - try: - # 使用新的第三方API解析抖音视频 - api_url = f"http://api.xhus.cn/api/douyin?url={video_url}" - - session = await get_session() - async with session.get(api_url, headers=HEADERS, timeout=10) as response: - if response.status != 200: - logger.error(f"[douyin_parser] API请求失败,状态码: {response.status}") - return None - - response_data = await response.json() - - if not isinstance(response_data, dict): - logger.error(f"[douyin_parser] API返回格式错误: {response_data}") - return None - - if response_data.get("code") != 200: - logger.error(f"[douyin_parser] API返回错误: {response_data}") - return None - - data = response_data.get("data", {}) - if not data: - logger.error("[douyin_parser] API返回数据为空") - return None - - # 新API的响应格式转换 - return { - "type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image", - "video_url": data.get("url", ""), # 核心字段:视频播放地址 - "video_url_HQ": data.get("url", ""), # 新API没有HQ字段,使用同一个地址 - "nickname": data.get("author", "未知作者"), - "desc": data.get("title", "无描述"), - "aweme_id": data.get("uid", ""), - "like": data.get("like", 0), - "cover": data.get("cover", ""), - "time": data.get("time", 0), - "author_avatar": data.get("avatar", ""), - "music": data.get("music", {}), - } - except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: - logger.error(f"[douyin_parser] 解析抖音视频信息失败: {e}") - logger.debug(f"失败的URL: {video_url}") - except Exception as e: - logger.error(f"[douyin_parser] 解析抖音视频时发生未知错误: {e}") - logger.debug(f"失败的URL: {video_url}") - - return None - - -async def process_douyin_link(event: MessageEvent, url: str): - """ - 处理抖音链接,获取信息并回复 - :param event: 消息事件对象 - :param url: 待处理的抖音链接 - """ - try: - # 直接将原始链接传递给API,不需要获取真实URL - video_info = await parse_douyin_video(url) - if not video_info: - logger.error(f"[douyin_parser] 无法从 {url} 解析视频信息。") - await event.reply("无法获取视频信息,可能是抖音接口变动或视频不存在。") - return - - # 构建回复消息,包含原分享中的文本内容(如果有) - original_text = "" - for segment in event.message: - if segment.type == "text": - text_content = segment.data.get("text", "") - # 提取除了链接以外的文本内容 - # 移除链接和复制提示 - cleaned_text = re.sub(DOUYIN_URL_PATTERN, '', text_content) - cleaned_text = re.sub(DOUYIN_SHORT_PATTERN, '', cleaned_text) - cleaned_text = re.sub(r'复制此链接,打开Dou音搜索,直接观看视频!', '', cleaned_text) - cleaned_text = cleaned_text.strip() - if cleaned_text: - original_text = cleaned_text - break - - # 构建回复消息 - text_parts = ["抖音视频解析"] - text_parts.append("--------------------") - - if original_text: - text_parts.append(f" 分享内容: {original_text}") - text_parts.append("--------------------") - - text_parts.append(f" 作者: {video_info['nickname']}") - text_parts.append(f" 抖音号: {video_info['aweme_id']}") - text_parts.append(f" 标题: {video_info['desc']}") - text_parts.append(f" 点赞: {format_count(video_info['like'])}") - text_parts.append(f" 类型: {video_info['type']}") - - # 如果是音乐,添加音乐信息 - if video_info.get('music'): - music_info = video_info['music'] - text_parts.append("--------------------") - text_parts.append(" 背景音乐:") - text_parts.append(f" 标题: {music_info.get('title', '')}") - text_parts.append(f" 作者: {music_info.get('author', '')}") - - text_parts.append("--------------------") - text_parts.append(f" 原始链接: {url}") - - text_message = "\n".join(text_parts) - - # 准备转发消息节点 - nodes = [] - - # 添加文本信息节点 - text_node = event.bot.build_forward_node( - user_id=event.self_id, - nickname=DOUYIN_NICKNAME, - message=text_message - ) - nodes.append(text_node) - - # 添加封面图片节点(如果有) - if video_info.get('cover'): - try: - cover_node = event.bot.build_forward_node( - user_id=event.self_id, - nickname=DOUYIN_NICKNAME, - message=[ - MessageSegment.text("抖音视频封面:\n"), - MessageSegment.image(video_info['cover']) - ] - ) - nodes.append(cover_node) - except Exception as e: - logger.warning(f"[douyin_parser] 无法添加封面图片: {e}") - - # 添加作者头像节点(如果有) - if video_info.get('author_avatar'): - try: - avatar_node = event.bot.build_forward_node( - user_id=event.self_id, - nickname=DOUYIN_NICKNAME, - message=[ - MessageSegment.text("作者头像:\n"), - MessageSegment.image(video_info['author_avatar']) - ] - ) - nodes.append(avatar_node) - except Exception as e: - logger.warning(f"[douyin_parser] 无法添加作者头像: {e}") - - # 尝试添加视频直链(单独节点) - video_success = False - try: - if video_info.get('video_url'): - video_url = video_info.get('video_url', '') - # 检查视频类型 - if video_info.get('type') == 'video': - video_message = MessageSegment.video(video_url) - video_type_text = "视频直链:" - else: # image类型 - video_message = MessageSegment.image(video_url) # 单个图片 - video_type_text = "图集首图:" - - # 构建视频/图片节点 - video_node = event.bot.build_forward_node( - user_id=event.self_id, - nickname=DOUYIN_NICKNAME, - message=[ - MessageSegment.text(video_type_text + "\n"), - video_message - ] - ) - nodes.append(video_node) - video_success = True - except Exception as e: - logger.error(f"[douyin_parser] 无法添加视频/图片: {e}") - - # 如果无法添加视频,添加提示信息 - if not video_success: - no_video_node = event.bot.build_forward_node( - user_id=event.self_id, - nickname=DOUYIN_NICKNAME, - message="视频解析成功,但无法获取直链或播放视频。" - ) - nodes.append(no_video_node) - - logger.success(f"[douyin_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['desc'][:20]}...") - - # 发送合并转发消息 - try: - # 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊 - await event.bot.send_forwarded_messages(target=event, nodes=nodes) - except Exception as e: - # 如果发送合并转发失败,尝试单独发送文本信息 - logger.error(f"[douyin_parser] 发送合并转发失败: {e}") - - # 构建替代的简单文本回复,避免电脑端显示问题 - simple_reply = f"抖音视频解析成功\n{text_message}\n\n如果无法查看视频内容,请复制原始链接到浏览器打开:{url}" - await event.reply(simple_reply) - - # 如果有封面,尝试单独发送 - if video_info.get('cover'): - try: - await event.reply(MessageSegment.image(video_info['cover'])) - except Exception: - pass - - except Exception as e: - logger.error(f"[douyin_parser] 处理抖音链接时发生错误: {e}") - await event.reply("处理抖音链接时发生错误,请稍后再试。") - return \ No newline at end of file diff --git a/plugins/web_parser/__init__.py b/plugins/web_parser/__init__.py new file mode 100644 index 0000000..003478f --- /dev/null +++ b/plugins/web_parser/__init__.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- + +from core.managers.command_manager import matcher +from models import MessageEvent +from .parsers.bili import BiliParser +from .parsers.douyin import DouyinParser +from .parsers.github import GitHubParser + +# 插件元信息 +__plugin_meta__ = { + "name": "web_parser", + "description": "自动解析各种Web链接,包括B站、抖音和GitHub仓库", + "usage": "(自动触发)当检测到支持的链接时,自动进行解析" +} + +# 初始化解析器实例 +bili_parser = BiliParser() +douyin_parser = DouyinParser() +github_parser = GitHubParser() + + +@matcher.on_message() +async def handle_web_links(event: MessageEvent): + """ + 处理消息,检测并解析各种Web链接 + + Args: + event (MessageEvent): 消息事件对象 + """ + # 按顺序尝试各个解析器 + # 1. 尝试B站解析器 + await bili_parser.handle_message(event) + + # 2. 尝试抖音解析器 + await douyin_parser.handle_message(event) + + # 3. 尝试GitHub解析器 + await github_parser.handle_message(event) + + +# 注册GitHub仓库查询命令 +@matcher.command("查仓库", "github", "github_repo") +async def handle_github_command(bot, event: MessageEvent): + """ + 处理命令调用:/查仓库 作者/仓库名 + + Args: + bot: 机器人对象 + event (MessageEvent): 消息事件对象 + """ + # 提取命令参数 + command_text = event.raw_message + # 移除命令前缀和命令名 + prefix = command_text.split()[0] if command_text.split() else "" + params = command_text[len(prefix):].strip() + + if not params: + await event.reply("请输入仓库地址,格式:/查仓库 作者/仓库名") + return + + # 解析参数格式 + if "/" in params: + owner, repo = params.split("/", 1) + # 移除可能的.git后缀 + repo = repo.replace(".git", "") + + # 构建仓库URL + repo_url = f"https://github.com/{owner}/{repo}" + # 使用GitHub解析器处理 + await github_parser.process_url(event, repo_url) + else: + await event.reply("参数格式错误,请输入:/查仓库 作者/仓库名") diff --git a/plugins/web_parser/base.py b/plugins/web_parser/base.py new file mode 100644 index 0000000..b8bd2f1 --- /dev/null +++ b/plugins/web_parser/base.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- +import re +import json +import abc +import aiohttp +from typing import Optional, Dict, Any, List, Union +from cachetools import TTLCache + +from core.utils.logger import logger +from models import MessageEvent, MessageSegment + + +class BaseParser(metaclass=abc.ABCMeta): + """ + 解析器基类,定义所有web解析器共有的方法和属性 + """ + + # 插件元信息 + __plugin_meta__ = { + "name": "web_parser", + "description": "Web链接解析插件", + "usage": "自动解析各种Web链接" + } + + + + # 请求头 + HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + # 全局共享的ClientSession + _session: Optional[aiohttp.ClientSession] = None + + def __init__(self): + """ + 初始化解析器 + """ + self.name = "Base Parser" + self.url_pattern = re.compile(r"https?://[^\s]+") + + @classmethod + def get_session(cls) -> aiohttp.ClientSession: + """ + 获取或创建全局的aiohttp ClientSession + + Returns: + aiohttp.ClientSession: 客户端会话对象 + """ + if cls._session is None or cls._session.closed: + cls._session = aiohttp.ClientSession(headers=cls.HEADERS) + return cls._session + + @abc.abstractmethod + async def parse(self, url: str) -> Optional[Dict[str, Any]]: + """ + 解析URL获取信息 + + Args: + url (str): 要解析的URL + + Returns: + Optional[Dict[str, Any]]: 解析结果,如果失败则返回None + """ + pass + + @abc.abstractmethod + async def get_real_url(self, short_url: str) -> Optional[str]: + """ + 获取短链接的真实URL + + Args: + short_url (str): 短链接 + + Returns: + Optional[str]: 真实URL,如果失败则返回None + """ + pass + + @abc.abstractmethod + async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: + """ + 格式化响应消息 + + Args: + event (MessageEvent): 消息事件对象 + data (Dict[str, Any]): 解析结果数据 + + Returns: + List[Any]: 消息段列表 + """ + pass + + def extract_url_from_json_segments(self, segments): + """ + 从消息的JSON段中提取URL + + Args: + segments: 消息段列表 + + Returns: + Optional[str]: 提取到的URL或None + """ + for segment in segments: + if segment.type == "json": + logger.info(f"[{self.name}] 检测到JSON CQ码: {segment.data}") + try: + json_data = json.loads(segment.data.get("data", "{}")) + short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl") + if short_url: + logger.success(f"[{self.name}] 成功从JSON卡片中提取到链接: {short_url}") + return short_url + except (json.JSONDecodeError, KeyError) as e: + logger.error(f"[{self.name}] 解析JSON失败: {e}") + continue + return None + + def extract_url_from_text_segments(self, segments): + """ + 从消息的文本段中提取URL + + Args: + segments: 消息段列表 + + Returns: + Optional[str]: 提取到的URL或None + """ + for segment in segments: + if segment.type == "text": + text_content = segment.data.get("text", "") + match = self.url_pattern.search(text_content) + if match: + extracted_url = match.group(0) + logger.success(f"[{self.name}] 成功从文本中提取到链接: {extracted_url}") + return extracted_url + return None + + async def process_url(self, event: MessageEvent, url: str): + """ + 处理URL,获取信息并回复 + + Args: + event (MessageEvent): 消息事件对象 + url (str): 待处理的URL + """ + try: + # 检查是否是短链接 + if self.is_short_url(url): + real_url = await self.get_real_url(url) + if not real_url: + logger.error(f"[{self.name}] 无法从 {url} 获取真实URL。") + await event.reply("无法解析短链接。") + return + else: + real_url = url + + # 解析URL + data = await self.parse(real_url) + if not data: + logger.error(f"[{self.name}] 无法从 {real_url} 解析信息。") + await event.reply("无法获取链接信息,可能是接口变动或链接不存在。") + return + + # 格式化响应 + response = await self.format_response(event, data) + if response: + # 发送响应 + await event.bot.send_forwarded_messages(target=event, nodes=response) + else: + await event.reply("解析成功,但无法生成响应。") + + except Exception as e: + logger.error(f"[{self.name}] 处理链接时发生错误: {e}") + await event.reply("处理链接时发生错误,请稍后再试。") + + def is_short_url(self, url: str) -> bool: + """ + 判断是否是短链接 + + Args: + url (str): URL + + Returns: + bool: 是否是短链接 + """ + short_domains = ["b23.tv", "v.douyin.com", "t.cn", "url.cn"] + return any(domain in url for domain in short_domains) + + async def handle_message(self, event: MessageEvent): + """ + 处理消息,检测链接并解析 + + Args: + event (MessageEvent): 消息事件对象 + """ + # 消息去重 + if event.message_id in self.processed_messages: + return + self.processed_messages[event.message_id] = True + + # 忽略机器人自己发送的消息 + if event.user_id == event.self_id: + return + + # 1. 优先解析JSON卡片中的链接 + url_to_process = self.extract_url_from_json_segments(event.message) + + # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 + if not url_to_process: + url_to_process = self.extract_url_from_text_segments(event.message) + + # 3. 如果找到了链接,则进行处理 + if url_to_process and self.should_handle_url(url_to_process): + await self.process_url(event, url_to_process) + + def should_handle_url(self, url: str) -> bool: + """ + 判断是否应该处理该URL + + Args: + url (str): URL + + Returns: + bool: 是否应该处理 + """ + # 基类默认实现,子类应覆盖此方法 + return bool(self.url_pattern.search(url)) + + @staticmethod + def format_count(num: Union[int, str]) -> str: + """ + 格式化数字为易读形式 + + Args: + num (Union[int, str]): 要格式化的数字 + + Returns: + str: 格式化后的字符串 + """ + try: + n = int(num) + if n < 10000: + return str(n) + return f"{n / 10000:.1f}万" + except (ValueError, TypeError): + return str(num) diff --git a/plugins/web_parser/parsers/bili.py b/plugins/web_parser/parsers/bili.py new file mode 100644 index 0000000..6d55fdf --- /dev/null +++ b/plugins/web_parser/parsers/bili.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- +import re +import json +import aiohttp +from typing import Optional, Dict, Any, List +from bs4 import BeautifulSoup + +from core.utils.logger import logger +from models import MessageEvent, MessageSegment +from ..base import BaseParser +from ..utils import format_duration, clean_url + +from cachetools import TTLCache + +class BiliParser(BaseParser): + """ + B站视频解析器 + """ + + def __init__(self): + super().__init__() + self.name = "B站解析器" + self.url_pattern = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)") + self.nickname = "B站视频解析" + # 消息去重缓存 + self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) + + async def parse(self, url: str) -> Optional[Dict[str, Any]]: + """ + 解析B站视频信息 + + Args: + url (str): B站视频URL + + Returns: + Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None + """ + try: + # 清理URL + clean_url = url.split('?')[0] + if '#/' in clean_url: + clean_url = clean_url.split('#/')[0] + + session = self.get_session() + async with session.get(clean_url, headers=self.HEADERS, timeout=5) as response: + response.raise_for_status() + text = await response.text() + soup = BeautifulSoup(text, 'html.parser') + + # 尝试多种方式获取视频数据 + # 方式1: 尝试获取 __INITIAL_STATE__ + script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) + if not script_tag or not script_tag.string: + # 方式2: 尝试获取 __PLAYINFO__ + script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__')) + + if not script_tag or not script_tag.string: + # 方式3: 尝试获取页面标题和其他信息 + title_tag = soup.find('title') + if title_tag: + title = title_tag.get_text().strip() + # 提取BV号 + bv_match = re.search(r'(BV\w{10})', clean_url) + bvid = bv_match.group(1) if bv_match else '未知BV号' + + return { + "title": title.replace('_哔哩哔哩_bilibili', '').strip(), + "bvid": bvid, + "duration": 0, + "cover_url": '', + "play": 0, + "like": 0, + "coin": 0, + "favorite": 0, + "share": 0, + "owner_name": '未知UP主', + "owner_avatar": '', + "followers": 0, + } + return None + + # 原始解析逻辑 + match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string) + if not match: + # 尝试另一种正则表达式 + match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL) + + if not match: + return None + + json_str = match.group(1) + # 清理JSON字符串中的潜在问题字符 + json_str = json_str.strip().rstrip(';') + + try: + data = json.loads(json_str) + except json.JSONDecodeError: + # 如果直接解析失败,尝试清理JSON字符串 + # 移除可能的注释或无效字符 + cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号 + cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释 + cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释 + data = json.loads(cleaned_json) + + video_data = data.get('videoData', {}) + up_data = data.get('upData', {}) + stat = video_data.get('stat', {}) + owner = video_data.get('owner', {}) + + cover_url = video_data.get('pic', '') + if cover_url: + cover_url = cover_url.split('@')[0] + if cover_url.startswith('//'): + cover_url = 'https:' + cover_url + + owner_avatar = owner.get('face', '') + if owner_avatar: + if owner_avatar.startswith('//'): + owner_avatar = 'https:' + owner_avatar + owner_avatar = owner_avatar.split('@')[0] + + return { + "title": video_data.get('title', '未知标题'), + "bvid": video_data.get('bvid', '未知BV号'), + "duration": video_data.get('duration', 0), + "cover_url": cover_url, + "play": stat.get('view', 0), + "like": stat.get('like', 0), + "coin": stat.get('coin', 0), + "favorite": stat.get('favorite', 0), + "share": stat.get('share', 0), + "owner_name": owner.get('name', '未知UP主'), + "owner_avatar": owner_avatar, + "followers": up_data.get('fans', 0), + } + + except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: + logger.error(f"[{self.name}] 解析视频信息失败: {e}") + logger.debug(f"失败的URL: {url}") + except Exception as e: + logger.error(f"[{self.name}] 解析视频信息时发生未知错误: {e}") + logger.debug(f"失败的URL: {url}") + + return None + + async def get_real_url(self, short_url: str) -> Optional[str]: + """ + 获取B站短链接的真实URL + + Args: + short_url (str): B站短链接 + + Returns: + Optional[str]: 真实URL,如果失败则返回None + """ + try: + session = self.get_session() + async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response: + if response.status == 302: + return response.headers.get('Location') + except Exception as e: + logger.error(f"[{self.name}] 获取真实URL失败: {e}") + return None + + async def get_direct_video_url(self, video_url: str) -> Optional[str]: + """ + 调用第三方API解析B站视频直链 + + Args: + video_url (str): B站视频的完整URL + + Returns: + Optional[str]: 视频直链URL,如果失败则返回None + """ + api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json" + try: + async with aiohttp.ClientSession() as session: + async with session.get(api_url, headers=self.HEADERS, timeout=10) as response: + response.raise_for_status() + # 使用 content_type=None 来忽略 Content-Type 检查 + data = await response.json(content_type=None) + if data.get("code") == 200 and data.get("data"): + return data["data"][0].get("video_url") + except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e: + logger.error(f"[{self.name}] 调用第三方API解析视频失败: {e}") + return None + + async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: + """ + 格式化B站视频响应消息 + + Args: + event (MessageEvent): 消息事件对象 + data (Dict[str, Any]): 视频信息 + + Returns: + List[Any]: 消息段列表 + """ + # 检查视频时长 + if data['duration'] > 1200: # 20分钟 = 1200秒 + video_message = "视频时长超过20分钟,不进行解析。" + else: + # 构建完整的B站视频URL + video_url = f"https://www.bilibili.com/video/{data.get('bvid', '')}" + direct_url = await self.get_direct_video_url(video_url) + if direct_url: + video_message = MessageSegment.video(direct_url) + else: + video_message = "视频解析失败,无法获取直链。" + + text_message = ( + f"BiliBili 视频解析\n" + f"--------------------\n" + f" UP主: {data['owner_name']}\n" + f" 粉丝: {self.format_count(data['followers'])}\n" + f"--------------------\n" + f" 标题: {data['title']}\n" + f" BV号: {data['bvid']}\n" + f" 时长: {format_duration(data['duration'])}\n" + f"--------------------\n" + f" 数据:\n" + f" 播放: {self.format_count(data['play'])}\n" + f" 点赞: {self.format_count(data['like'])}\n" + f" 投币: {self.format_count(data['coin'])}\n" + f" 收藏: {self.format_count(data['favorite'])}\n" + f" 转发: {self.format_count(data['share'])}\n" + ) + + image_message_segment = [ + MessageSegment.text("B站封面:"), + MessageSegment.image(data['cover_url']) + ] + + up_info_segment = [ + MessageSegment.text("UP主头像:"), + MessageSegment.image(data['owner_avatar']) + ] + + nodes = [ + event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=text_message), + event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=image_message_segment), + event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=up_info_segment), + event.bot.build_forward_node(user_id=event.self_id, nickname=self.nickname, message=video_message) + ] + + return nodes + + def should_handle_url(self, url: str) -> bool: + """ + 判断是否应该处理该URL + + Args: + url (str): URL + + Returns: + bool: 是否应该处理 + """ + # 检查是否是B站相关域名,包括短链接 + return bool(self.url_pattern.search(url)) diff --git a/plugins/web_parser/parsers/douyin.py b/plugins/web_parser/parsers/douyin.py new file mode 100644 index 0000000..4d9a0bf --- /dev/null +++ b/plugins/web_parser/parsers/douyin.py @@ -0,0 +1,261 @@ +# -*- coding: utf-8 -*- +import re +import json +import aiohttp +from typing import Optional, Dict, Any, List + +from core.utils.logger import logger +from models import MessageEvent, MessageSegment +from ..base import BaseParser +from ..utils import extract_original_text +from cachetools import TTLCache + + +class DouyinParser(BaseParser): + """ + 抖音视频解析器 + """ + + def __init__(self): + super().__init__() + self.name = "抖音解析器" + self.url_pattern = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) + self.short_pattern = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) + self.nickname = "抖音视频解析" + # 消息去重缓存 + self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) + + async def parse(self, url: str) -> Optional[Dict[str, Any]]: + """ + 解析抖音视频信息 + + Args: + url (str): 抖音视频URL + + Returns: + Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None + """ + try: + # 使用第三方API解析抖音视频 + api_url = f"http://api.xhus.cn/api/douyin?url={url}" + + session = self.get_session() + async with session.get(api_url, headers=self.HEADERS, timeout=10) as response: + if response.status != 200: + logger.error(f"[{self.name}] API请求失败,状态码: {response.status}") + return None + + response_data = await response.json() + + if not isinstance(response_data, dict): + logger.error(f"[{self.name}] API返回格式错误: {response_data}") + return None + + if response_data.get("code") != 200: + logger.error(f"[{self.name}] API返回错误: {response_data}") + return None + + data = response_data.get("data", {}) + if not data: + logger.error(f"[{self.name}] API返回数据为空") + return None + + # 转换API响应格式 + return { + "type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image", + "video_url": data.get("url", ""), + "video_url_HQ": data.get("url", ""), + "nickname": data.get("author", "未知作者"), + "desc": data.get("title", "无描述"), + "aweme_id": data.get("uid", ""), + "like": data.get("like", 0), + "cover": data.get("cover", ""), + "time": data.get("time", 0), + "author_avatar": data.get("avatar", ""), + "music": data.get("music", {}), + } + + except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: + logger.error(f"[{self.name}] 解析抖音视频信息失败: {e}") + logger.debug(f"失败的URL: {url}") + except Exception as e: + logger.error(f"[{self.name}] 解析抖音视频时发生未知错误: {e}") + logger.debug(f"失败的URL: {url}") + + return None + + async def get_real_url(self, short_url: str) -> Optional[str]: + """ + 获取抖音短链接的真实URL + + Args: + short_url (str): 抖音短链接 + + Returns: + Optional[str]: 真实URL,如果失败则返回None + """ + try: + # 首先尝试获取重定向后的URL + async with aiohttp.ClientSession() as session: + # 添加更多头部信息模拟移动端访问 + mobile_headers = self.HEADERS.copy() + mobile_headers.update({ + 'Sec-Fetch-Dest': 'document', + 'Sec-Fetch-Mode': 'navigate', + 'Sec-Fetch-Site': 'none', + 'Cache-Control': 'max-age=0', + # 模拟移动设备的额外头部 + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1', + 'X-Requested-With': 'XMLHttpRequest', + 'Referer': 'https://www.douyin.com/' + }) + + async with session.get(short_url, headers=mobile_headers, allow_redirects=True, timeout=10) as response: + redirected_url = str(response.url) + + # 检查重定向后的URL是否包含视频ID + if 'video/' in redirected_url or '/note/' in redirected_url: + logger.info(f"[{self.name}] 重定向后的视频URL: {redirected_url}") + return redirected_url + elif 'share_item' in redirected_url: + logger.info(f"[{self.name}] 重定向后的分享URL: {redirected_url}") + return redirected_url + else: + logger.warning(f"[{self.name}] 重定向到了非预期页面: {redirected_url}") + return redirected_url + + except Exception as e: + logger.error(f"[{self.name}] 获取真实URL失败: {e}") + return None + + async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: + """ + 格式化抖音视频响应消息 + + Args: + event (MessageEvent): 消息事件对象 + data (Dict[str, Any]): 视频信息 + + Returns: + List[Any]: 消息段列表 + """ + # 构建回复消息,包含原分享中的文本内容(如果有) + original_text = extract_original_text(event.message, self.url_pattern) + + # 构建回复消息 + text_parts = ["抖音视频解析"] + text_parts.append("--------------------") + + if original_text: + text_parts.append(f" 分享内容: {original_text}") + text_parts.append("--------------------") + + text_parts.append(f" 作者: {data['nickname']}") + text_parts.append(f" 抖音号: {data['aweme_id']}") + text_parts.append(f" 标题: {data['desc']}") + text_parts.append(f" 点赞: {self.format_count(data['like'])}") + text_parts.append(f" 类型: {data['type']}") + + # 如果是音乐,添加音乐信息 + if data.get('music'): + music_info = data['music'] + text_parts.append("--------------------") + text_parts.append(" 背景音乐:") + text_parts.append(f" 标题: {music_info.get('title', '')}") + text_parts.append(f" 作者: {music_info.get('author', '')}") + + text_parts.append("--------------------") + + text_message = "\n".join(text_parts) + + # 准备转发消息节点 + nodes = [] + + # 添加文本信息节点 + text_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message=text_message + ) + nodes.append(text_node) + + # 添加封面图片节点(如果有) + if data.get('cover'): + try: + cover_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message=[ + MessageSegment.text("抖音视频封面:\n"), + MessageSegment.image(data['cover']) + ] + ) + nodes.append(cover_node) + except Exception as e: + logger.warning(f"[{self.name}] 无法添加封面图片: {e}") + + # 添加作者头像节点(如果有) + if data.get('author_avatar'): + try: + avatar_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message=[ + MessageSegment.text("作者头像:\n"), + MessageSegment.image(data['author_avatar']) + ] + ) + nodes.append(avatar_node) + except Exception as e: + logger.warning(f"[{self.name}] 无法添加作者头像: {e}") + + # 尝试添加视频直链(单独节点) + video_success = False + try: + if data.get('video_url'): + video_url = data.get('video_url', '') + # 检查视频类型 + if data.get('type') == 'video': + video_message = MessageSegment.video(video_url) + video_type_text = "视频直链:" + else: # image类型 + video_message = MessageSegment.image(video_url) # 单个图片 + video_type_text = "图集首图:" + + # 构建视频/图片节点 + video_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message=[ + MessageSegment.text(video_type_text + "\n"), + video_message + ] + ) + nodes.append(video_node) + video_success = True + except Exception as e: + logger.error(f"[{self.name}] 无法添加视频/图片: {e}") + + # 如果无法添加视频,添加提示信息 + if not video_success: + no_video_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message="视频解析成功,但无法获取直链或播放视频。" + ) + nodes.append(no_video_node) + + return nodes + + def should_handle_url(self, url: str) -> bool: + """ + 判断是否应该处理该URL + + Args: + url (str): URL + + Returns: + bool: 是否应该处理 + """ + # 检查是否是抖音相关域名 + return ('douyin.com' in url or bool(self.url_pattern.search(url)) or bool(self.short_pattern.search(url))) diff --git a/plugins/web_parser/parsers/github.py b/plugins/web_parser/parsers/github.py new file mode 100644 index 0000000..4eb631c --- /dev/null +++ b/plugins/web_parser/parsers/github.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +import re +import json +import aiohttp +from typing import Optional, Dict, Any, List +from cachetools import TTLCache + +from core.utils.logger import logger +from core.managers.image_manager import image_manager +from models import MessageEvent, MessageSegment +from ..base import BaseParser + + +class GitHubParser(BaseParser): + """ + GitHub仓库解析器 + """ + + def __init__(self): + super().__init__() + self.name = "GitHub解析器" + self.url_pattern = re.compile(r"https?://(?:www\.)?github\.com/([\w\-]+)/([\w\-\.]+)(?:/[^\s]*)?") + self.nickname = "GitHub仓库信息" + # 消息去重缓存 + self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) + # 缓存GitHub API响应,避免频繁请求 + self.api_cache = TTLCache(maxsize=100, ttl=3600) # 100个缓存项,1小时过期 + + async def parse(self, url: str) -> Optional[Dict[str, Any]]: + """ + 解析GitHub仓库信息 + + Args: + url (str): GitHub仓库URL + + Returns: + Optional[Dict[str, Any]]: 仓库信息字典,如果失败则返回None + """ + # 从URL中提取owner和repo + match = self.url_pattern.search(url) + if not match: + return None + + owner = match.group(1) + repo = match.group(2) + # 移除可能的.git后缀 + repo = repo.replace(".git", "") + + return await self.get_github_repo_info(owner, repo) + + async def get_real_url(self, short_url: str) -> Optional[str]: + """ + 获取短链接的真实URL + + Args: + short_url (str): 短链接 + + Returns: + Optional[str]: 真实URL,如果失败则返回None + """ + try: + session = self.get_session() + async with session.head(short_url, headers=self.HEADERS, allow_redirects=False, timeout=5) as response: + if response.status == 302: + return response.headers.get('Location') + except Exception as e: + logger.error(f"[{self.name}] 获取真实URL失败: {e}") + return None + + async def get_github_repo_info(self, owner: str, repo: str) -> Optional[Dict[str, Any]]: + """ + 通过GitHub API获取仓库信息 + + Args: + owner (str): 仓库所有者用户名 + repo (str): 仓库名称 + + Returns: + Optional[Dict[str, Any]]: 仓库信息字典,如果失败则返回None + """ + cache_key = f"{owner}/{repo}" + if cache_key in self.api_cache: + logger.info(f"[{self.name}] 使用缓存的仓库信息: {cache_key}") + return self.api_cache[cache_key] + + api_url = f"https://api.github.com/repos/{owner}/{repo}" + try: + session = self.get_session() + async with session.get(api_url, timeout=10) as response: + response.raise_for_status() + repo_data = await response.json() + + # 将数据存入缓存 + self.api_cache[cache_key] = repo_data + logger.info(f"[{self.name}] 成功获取仓库信息并缓存: {cache_key}") + return repo_data + + except aiohttp.ClientError as e: + logger.error(f"[{self.name}] GitHub API请求失败: {e}") + except json.JSONDecodeError as e: + logger.error(f"[{self.name}] 解析GitHub API响应失败: {e}") + except Exception as e: + logger.error(f"[{self.name}] 获取仓库信息时发生未知错误: {e}") + + return None + + async def generate_repo_image(self, repo_data: Dict[str, Any]) -> Optional[str]: + """ + 使用Jinja2模板渲染仓库信息为图片 + + Args: + repo_data (Dict[str, Any]): 仓库信息字典 + + Returns: + Optional[str]: 生成的图片Base64编码,如果失败则返回None + """ + try: + # 准备模板数据 + template_data = { + "full_name": repo_data.get("full_name", ""), + "description": repo_data.get("description", "暂无描述"), + "owner_avatar": repo_data.get("owner", {}).get("avatar_url", ""), + "stargazers_count": repo_data.get("stargazers_count", 0), + "forks_count": repo_data.get("forks_count", 0), + "open_issues_count": repo_data.get("open_issues_count", 0), + "watchers_count": repo_data.get("watchers_count", 0), + } + + # 渲染模板为图片,使用高质量设置 + base64_image = await image_manager.render_template_to_base64( + template_name="github_repo.html", + data=template_data, + output_name=f"github_{repo_data.get('name', 'repo')}.png", + quality=100, + image_type="png" + ) + + return base64_image + + except Exception as e: + logger.error(f"[{self.name}] 生成仓库信息图片失败: {e}") + return None + + async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]: + """ + 格式化GitHub仓库响应消息 + + Args: + event (MessageEvent): 消息事件对象 + data (Dict[str, Any]): 仓库信息 + + Returns: + List[Any]: 消息段列表 + """ + nodes = [] + + # 生成图片 + image_base64 = await self.generate_repo_image(data) + if image_base64: + # 发送图片 + image_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message=MessageSegment.image(image_base64) + ) + nodes.append(image_node) + else: + # 如果图片生成失败,发送文本信息 + text_message = ( + f"GitHub 仓库信息\n" + f"--------------------\n" + f"仓库: {data.get('full_name', '')}\n" + f"描述: {data.get('description', '暂无描述')}\n" + f"--------------------\n" + f"数据:\n" + f" 星标: {data.get('stargazers_count', 0)}\n" + f" Fork: {data.get('forks_count', 0)}\n" + f" Issues: {data.get('open_issues_count', 0)}\n" + f" 关注: {data.get('watchers_count', 0)}\n" + ) + text_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=self.nickname, + message=text_message + ) + nodes.append(text_node) + + return nodes + + def should_handle_url(self, url: str) -> bool: + """ + 判断是否应该处理该URL + + Args: + url (str): URL + + Returns: + bool: 是否应该处理 + """ + # 检查是否是GitHub相关域名 + return bool(self.url_pattern.search(url)) and 'github.com' in url diff --git a/plugins/web_parser/utils.py b/plugins/web_parser/utils.py new file mode 100644 index 0000000..7742d43 --- /dev/null +++ b/plugins/web_parser/utils.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +import re +import json +from typing import Optional, Dict, Any, Union, List + +from core.utils.logger import logger +from models import MessageEvent, MessageSegment + + +def format_duration(seconds: int) -> str: + """ + 将秒数格式化为 MM:SS 的形式 + + Args: + seconds (int): 秒数 + + Returns: + str: 格式化后的时间字符串 + """ + if not isinstance(seconds, int) or seconds < 0: + return "00:00" + minutes, seconds = divmod(seconds, 60) + return f"{minutes:02d}:{seconds:02d}" + + +def clean_url(url: str) -> str: + """ + 清理URL,去掉不必要的查询参数 + + Args: + url (str): 原始URL + + Returns: + str: 清理后的URL + """ + clean_url = url.split('?')[0] + if '#/' in clean_url: + clean_url = clean_url.split('#/')[0] + return clean_url + + +def extract_original_text(segments: List[Any], url_pattern: re.Pattern) -> str: + """ + 从消息段中提取原始文本(去除链接) + + Args: + segments (List[Any]): 消息段列表 + url_pattern (re.Pattern): URL正则表达式模式 + + Returns: + str: 提取的原始文本 + """ + for segment in segments: + if segment.type == "text": + text_content = segment.data.get("text", "") + # 移除链接 + cleaned_text = re.sub(url_pattern, '', text_content) + # 移除常见的分享提示 + cleaned_text = re.sub(r'复制此链接.*?打开.*?搜索.*?直接观看视频!', '', cleaned_text) + cleaned_text = cleaned_text.strip() + if cleaned_text: + return cleaned_text + return "" + + +def build_forward_nodes(event: MessageEvent, nickname: str, messages: List[Any]) -> List[Any]: + """ + 构建转发消息节点 + + Args: + event (MessageEvent): 消息事件对象 + nickname (str): 发送者昵称 + messages (List[Any]): 消息内容列表 + + Returns: + List[Any]: 转发消息节点列表 + """ + nodes = [] + for msg in messages: + if isinstance(msg, str): + node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=nickname, + message=msg + ) + nodes.append(node) + elif isinstance(msg, list): + node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=nickname, + message=msg + ) + nodes.append(node) + return nodes + + +def safe_get(data: Dict[str, Any], keys: List[str], default: Any = None) -> Any: + """ + 安全地从嵌套字典中获取值 + + Args: + data (Dict[str, Any]): 嵌套字典 + keys (List[str]): 键路径列表 + default (Any, optional): 默认值. Defaults to None. + + Returns: + Any: 获取的值或默认值 + """ + result = data + for key in keys: + if isinstance(result, dict) and key in result: + result = result[key] + else: + return default + return result + + +def normalize_url(url: str) -> str: + """ + 规范化URL + + Args: + url (str): 原始URL + + Returns: + str: 规范化后的URL + """ + if not url.startswith('http'): + url = 'https://' + url + return url + + +def validate_url(url: str) -> bool: + """ + 验证URL格式是否正确 + + Args: + url (str): URL + + Returns: + bool: URL格式是否正确 + """ + url_pattern = re.compile(r'https?://[^]+') + return bool(url_pattern.match(url)) diff --git a/templates/code_execution.html b/templates/code_execution.html new file mode 100644 index 0000000..41c7ee1 --- /dev/null +++ b/templates/code_execution.html @@ -0,0 +1,379 @@ + + +
+ + +{{ timestamp }}
+