From d7fbc5bb7064e65fec468272add8d01ef69eb211 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=95=80=E9=93=AC=E9=85=B8=E9=92=BE?= <148796996+K2cr2O1@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:58:56 +0800 Subject: [PATCH 1/3] =?UTF-8?q?Dev=E8=87=B3main=20(#21)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 整合开发历史 * codepy安全性升级 * 优化一些东西 * 再次优化 --- main.py | 13 ++-- plugins/bili_parser.py | 141 +++++++++++++++++++++++++++++++++++++++++ plugins/code_py.py | 15 ++++- plugins/echo.py | 2 +- plugins/thpic.py | 5 +- 5 files changed, 167 insertions(+), 9 deletions(-) create mode 100644 plugins/bili_parser.py diff --git a/main.py b/main.py index 5b15394..1445f1f 100644 --- a/main.py +++ b/main.py @@ -27,12 +27,13 @@ class PluginReloadHandler(FileSystemEventHandler): 继承自 watchdog.events.FileSystemEventHandler, 监听 base_plugins 目录下的文件变化,并触发插件重载。 """ - def __init__(self): + def __init__(self, loop: asyncio.AbstractEventLoop): """ 初始化处理器 - 设置冷却时间,防止短时间内多次触发重载。 + 设置冷却时间,并保存主事件循环的引用。 """ + self.loop = loop self.last_reload_time = 0 self.cooldown = 1.0 # 冷却时间,防止短时间内多次重载 @@ -64,8 +65,8 @@ class PluginReloadHandler(FileSystemEventHandler): logger.info("正在重载插件...") try: - # 重新扫描并加载插件 - run_in_thread_pool(load_all_plugins) + # 使用线程安全的方式在主事件循环中运行异步的插件加载函数 + asyncio.run_coroutine_threadsafe(run_in_thread_pool(load_all_plugins), self.loop) logger.success("插件重载完成") except Exception as e: logger.exception(f"重载失败: {e}") @@ -93,7 +94,9 @@ async def main(): # 监控 plugins 目录 plugin_path = os.path.join(os.path.dirname(__file__), "plugins") - event_handler = PluginReloadHandler() + # 获取当前事件循环并传递给处理器 + loop = asyncio.get_running_loop() + event_handler = PluginReloadHandler(loop) observer = Observer() if os.path.exists(plugin_path): diff --git a/plugins/bili_parser.py b/plugins/bili_parser.py new file mode 100644 index 0000000..8c08865 --- /dev/null +++ b/plugins/bili_parser.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +import re +import json +import html +import requests +from bs4 import BeautifulSoup +from typing import Optional, Tuple, Dict, Any + +from core.logger import logger +from core.command_manager import matcher +from models import MessageEvent, MessageSegment + +__plugin_meta__ = { + "name": "bili_parser", + "description": "自动解析B站分享卡片,提取视频封面和播放量等信息。", + "usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。", +} + +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' +} + + +def format_count(num: int) -> str: + if not isinstance(num, int): + return str(num) + if num < 10000: + return str(num) + return f"{num / 10000:.1f}万" + + +def get_real_url(short_url: str) -> Optional[str]: + try: + response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) + if response.status_code == 302: + return response.headers.get('Location') + except requests.RequestException as e: + print(f"获取真实URL失败: {e}") + return None + +def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: + try: + response = requests.get(video_url, headers=HEADERS, timeout=5) + response.raise_for_status() + soup = BeautifulSoup(response.text, 'html.parser') + + script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) + if not script_tag: + return None + + json_str = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string).group(1) + data = json.loads(json_str) + + video_data = data.get('videoData', {}) + stat = video_data.get('stat', {}) + + cover_url = video_data.get('pic', '') + if cover_url: + cover_url = cover_url.split('@')[0] + if cover_url.startswith('//'): + cover_url = 'https:' + cover_url + + return { + "title": video_data.get('title', '未知标题'), + "bvid": video_data.get('bvid', '未知BV号'), + "cover_url": cover_url, + "play": stat.get('view', 0), + "like": stat.get('like', 0), + "coin": stat.get('coin', 0), + "favorite": stat.get('favorite', 0), + "share": stat.get('share', 0), + } + + except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e: + print(f"解析视频信息失败: {e}") + + return None + +@matcher.on_message() +async def handle_bili_share(event: MessageEvent): + if not event.raw_message.startswith('[CQ:json,data='): + return + + logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}") + + try: + json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']') + + json_str_decoded = html.unescape(json_str_raw) + + data = json.loads(json_str_decoded) + + short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl") + + if not short_url or "b23.tv" not in short_url: + logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。") + return + + short_url = short_url.split('?')[0] + logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}") + + except (json.JSONDecodeError, KeyError) as e: + logger.error(f"[bili_parser] 解析JSON失败: {e}") + return + + real_url = get_real_url(short_url) + if not real_url: + logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。") + await event.reply("无法解析B站短链接。") + return + + video_info = parse_video_info(real_url) + if not video_info: + logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") + await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") + return + + text_message = ( + f"BiliBili 视频解析\n" + f"--------------------\n" + f" 标题: {video_info['title']}\n" + f" BV号: {video_info['bvid']}\n" + f"--------------------\n" + f" 数据:\n" + f" 播放: {format_count(video_info['play'])}\n" + f" 点赞: {format_count(video_info['like'])}\n" + f" 投币: {format_count(video_info['coin'])}\n" + f" 收藏: {format_count(video_info['favorite'])}\n" + f" 转发: {format_count(video_info['share'])}\n" + f" B站链接: {short_url}" + ) + + image_message_segment = [MessageSegment.text("B站封面:"),MessageSegment.image(video_info['cover_url'])] + + nodes = [ + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message), + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment) + ] + + logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}") + await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes) diff --git a/plugins/code_py.py b/plugins/code_py.py index b595354..2fa2988 100644 --- a/plugins/code_py.py +++ b/plugins/code_py.py @@ -22,20 +22,31 @@ __plugin_meta__ = { "usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码", } -# --- 安全配置:危险模块黑名单 --- +# --- 安全配置:危险模块和内置函数黑名单 --- DANGEROUS_MODULES = [ "os", "sys", "subprocess", "shutil", "socket", "requests", "urllib", "http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing", "asyncio", ] +DANGEROUS_BUILTINS = [ + "__import__", "open", "exec", "eval", "compile", "input", "breakpoint" +] # 编译后的正则表达式,用于分割语句 STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]') +# 编译后的正则表达式,用于查找危险的内置函数调用 +BUILTIN_CALL_PATTERN = re.compile(r'\b(' + '|'.join(DANGEROUS_BUILTINS) + r')\s*\(') def is_code_safe(code: str) -> Tuple[bool, str]: """ - 检查代码中是否包含危险的模块导入。 + 检查代码中是否包含危险的模块导入或内置函数调用。 """ + # 1. 检查危险的内置函数 + found_builtins = BUILTIN_CALL_PATTERN.search(code) + if found_builtins: + return False, f"检测到不允许的内置函数调用:'{found_builtins.group(1)}'" + + # 2. 检查危险的模块导入 statements = STATEMENT_SPLIT_PATTERN.split(code) for statement in statements: statement = statement.strip() diff --git a/plugins/echo.py b/plugins/echo.py index 34a7f22..ff743ea 100644 --- a/plugins/echo.py +++ b/plugins/echo.py @@ -13,7 +13,7 @@ __plugin_meta__ = { "usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞", } -@matcher.command("echo") +@matcher.command("echo",permission=MessageEvent.ADMIN) async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): """ 处理 echo 指令,原样回复用户输入的内容 diff --git a/plugins/thpic.py b/plugins/thpic.py index da0784d..1a3dfc8 100644 --- a/plugins/thpic.py +++ b/plugins/thpic.py @@ -25,4 +25,7 @@ async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): :param event: 消息事件对象。 :param args: 指令参数列表(未使用)。 """ - await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random")) + try: + await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random")) + except Exception as e: + await event.reply("报错了。。。" + e) From 15dd4e0592aeffd7dccd301cd3eebb9175de6bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=95=80=E9=93=AC=E9=85=B8=E9=92=BE?= <148796996+K2cr2O1@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:59:26 +0800 Subject: [PATCH 2/3] Update echo.py --- plugins/echo.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugins/echo.py b/plugins/echo.py index ff743ea..e47ebb0 100644 --- a/plugins/echo.py +++ b/plugins/echo.py @@ -13,7 +13,7 @@ __plugin_meta__ = { "usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞", } -@matcher.command("echo",permission=MessageEvent.ADMIN) +@matcher.command("echo") async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): """ 处理 echo 指令,原样回复用户输入的内容 @@ -31,7 +31,6 @@ async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): @matcher.command( "赞我", - permission=MessageEvent.ADMIN, override_permission_check=True ) async def handle_poke(bot: Bot, event: MessageEvent, permission_granted: bool): @@ -51,4 +50,4 @@ async def handle_poke(bot: Bot, event: MessageEvent, permission_granted: bool): await bot.send_like(event.user_id, times=10) await event.reply("好感度+10!(〃'▽'〃)") except Exception as e: - await event.reply(f"点赞失败了 >_<: {str(e)}") \ No newline at end of file + await event.reply(f"点赞失败了 >_<: {str(e)}") From 4a18909c4fb8070a2d4479ef29e43e0b0202e77b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=95=80=E9=93=AC=E9=85=B8=E9=92=BE?= <148796996+K2cr2O1@users.noreply.github.com> Date: Mon, 5 Jan 2026 00:02:40 +0800 Subject: [PATCH 3/3] Dev to main (#22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 整合开发历史 * codepy安全性升级 * 优化一些东西 * 再次优化 * 更新一下 requirements.txt --- plugins/echo.py | 2 +- requirements.txt | Bin 231 -> 750 bytes 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/echo.py b/plugins/echo.py index e47ebb0..407510a 100644 --- a/plugins/echo.py +++ b/plugins/echo.py @@ -13,7 +13,7 @@ __plugin_meta__ = { "usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞", } -@matcher.command("echo") +@matcher.command("echo",permission=MessageEvent.ADMIN) async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): """ 处理 echo 指令,原样回复用户输入的内容 diff --git a/requirements.txt b/requirements.txt index f63028280ccb38fbda8a61c0c780e87489a04d78..17a9c33e3864882cb7217cb519675a201e0d8f3d 100644 GIT binary patch literal 750 zcmYk4+fKq@5QO*I#7B|PgW!$NLQ1L72)49E#D`bE+3g=p^Hb89ox`@jKU-_8vWrdD z+8g(yjn;ARt+ZFJ(iWEaZ()!2U|aA&mLNL0Kd;dlW|h__bI{zLeLnNw#5~|QpwqyA zvJdO+cPoQ!INzrR}=ZhbWh@**~ZTFN%OI@P!P6*6{xnM>9u>u+y%ynOE7ZK zg3Jpxr+n#cjA%6W$&rij7fz&@=x`!q^3U8OdQWyEVN3VD<>RQYu*3= literal 231 zcmW-aNfN^#3HFoOlcYVpG9}u$0SreMF9l?d64|8&JZ3KctjWIpc&GXAzfEDD-&p{cX?Ays4-1!q$6R1^ LkSoNr_0r)74=qE0