From e84f59e875e8d172f4968a455314a17b4840de8f Mon Sep 17 00:00:00 2001 From: K2cr2O1 <2221577113@qq.com> Date: Sun, 4 Jan 2026 23:57:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=86=8D=E6=AC=A1=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 13 ++-- plugins/bili_parser.py | 141 +++++++++++++++++++++++++++++++++++++++++ plugins/thpic.py | 5 +- 3 files changed, 153 insertions(+), 6 deletions(-) create mode 100644 plugins/bili_parser.py diff --git a/main.py b/main.py index 5b15394..1445f1f 100644 --- a/main.py +++ b/main.py @@ -27,12 +27,13 @@ class PluginReloadHandler(FileSystemEventHandler): 继承自 watchdog.events.FileSystemEventHandler, 监听 base_plugins 目录下的文件变化,并触发插件重载。 """ - def __init__(self): + def __init__(self, loop: asyncio.AbstractEventLoop): """ 初始化处理器 - 设置冷却时间,防止短时间内多次触发重载。 + 设置冷却时间,并保存主事件循环的引用。 """ + self.loop = loop self.last_reload_time = 0 self.cooldown = 1.0 # 冷却时间,防止短时间内多次重载 @@ -64,8 +65,8 @@ class PluginReloadHandler(FileSystemEventHandler): logger.info("正在重载插件...") try: - # 重新扫描并加载插件 - run_in_thread_pool(load_all_plugins) + # 使用线程安全的方式在主事件循环中运行异步的插件加载函数 + asyncio.run_coroutine_threadsafe(run_in_thread_pool(load_all_plugins), self.loop) logger.success("插件重载完成") except Exception as e: logger.exception(f"重载失败: {e}") @@ -93,7 +94,9 @@ async def main(): # 监控 plugins 目录 plugin_path = os.path.join(os.path.dirname(__file__), "plugins") - event_handler = PluginReloadHandler() + # 获取当前事件循环并传递给处理器 + loop = asyncio.get_running_loop() + event_handler = PluginReloadHandler(loop) observer = Observer() if os.path.exists(plugin_path): diff --git a/plugins/bili_parser.py b/plugins/bili_parser.py new file mode 100644 index 0000000..8c08865 --- /dev/null +++ b/plugins/bili_parser.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +import re +import json +import html +import requests +from bs4 import BeautifulSoup +from typing import Optional, Tuple, Dict, Any + +from core.logger import logger +from core.command_manager import matcher +from models import MessageEvent, MessageSegment + +__plugin_meta__ = { + "name": "bili_parser", + "description": "自动解析B站分享卡片,提取视频封面和播放量等信息。", + "usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。", +} + +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' +} + + +def format_count(num: int) -> str: + if not isinstance(num, int): + return str(num) + if num < 10000: + return str(num) + return f"{num / 10000:.1f}万" + + +def get_real_url(short_url: str) -> Optional[str]: + try: + response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) + if response.status_code == 302: + return response.headers.get('Location') + except requests.RequestException as e: + print(f"获取真实URL失败: {e}") + return None + +def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: + try: + response = requests.get(video_url, headers=HEADERS, timeout=5) + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + + script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) + if not script_tag: + return None + + json_str = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string).group(1) + data = json.loads(json_str) + + video_data = data.get('videoData', {}) + stat = video_data.get('stat', {}) + + cover_url = video_data.get('pic', '') + if cover_url: + cover_url = cover_url.split('@')[0] + if cover_url.startswith('//'): + cover_url = 'https:' + cover_url + + return { + "title": video_data.get('title', '未知标题'), + "bvid": video_data.get('bvid', '未知BV号'), + "cover_url": cover_url, + "play": stat.get('view', 0), + "like": stat.get('like', 0), + "coin": stat.get('coin', 0), + "favorite": stat.get('favorite', 0), + "share": stat.get('share', 0), + } + + except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e: + print(f"解析视频信息失败: {e}") + + return None + +@matcher.on_message() +async def handle_bili_share(event: MessageEvent): + if not event.raw_message.startswith('[CQ:json,data='): + return + + logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}") + + try: + json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']') + + json_str_decoded = html.unescape(json_str_raw) + + data = json.loads(json_str_decoded) + + short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl") + + if not short_url or "b23.tv" not in short_url: + logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。") + return + + short_url = short_url.split('?')[0] + logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}") + + except (json.JSONDecodeError, KeyError) as e: + logger.error(f"[bili_parser] 解析JSON失败: {e}") + return + + real_url = get_real_url(short_url) + if not real_url: + logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。") + await event.reply("无法解析B站短链接。") + return + + video_info = parse_video_info(real_url) + if not video_info: + logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") + await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") + return + + text_message = ( + f"BiliBili 视频解析\n" + f"--------------------\n" + f" 标题: {video_info['title']}\n" + f" BV号: {video_info['bvid']}\n" + f"--------------------\n" + f" 数据:\n" + f" 播放: {format_count(video_info['play'])}\n" + f" 点赞: {format_count(video_info['like'])}\n" + f" 投币: {format_count(video_info['coin'])}\n" + f" 收藏: {format_count(video_info['favorite'])}\n" + f" 转发: {format_count(video_info['share'])}\n" + f" B站链接: {short_url}" + ) + + image_message_segment = [MessageSegment.text("B站封面:"),MessageSegment.image(video_info['cover_url'])] + + nodes = [ + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message), + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment) + ] + + logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}") + await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes) diff --git a/plugins/thpic.py b/plugins/thpic.py index da0784d..1a3dfc8 100644 --- a/plugins/thpic.py +++ b/plugins/thpic.py @@ -25,4 +25,7 @@ async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): :param event: 消息事件对象。 :param args: 指令参数列表(未使用)。 """ - await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random")) + try: + await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random")) + except Exception as e: + await event.reply("报错了。。。" + e)