Dev至main (#21)

* feat: 整合开发历史

* codepy安全性升级

* 优化一些东西

* 再次优化
This commit is contained in:
镀铬酸钾
2026-01-04 23:58:56 +08:00
committed by GitHub
parent a733d3dc4b
commit d7fbc5bb70
5 changed files with 167 additions and 9 deletions

13
main.py
View File

@@ -27,12 +27,13 @@ class PluginReloadHandler(FileSystemEventHandler):
继承自 watchdog.events.FileSystemEventHandler
监听 base_plugins 目录下的文件变化,并触发插件重载。
"""
def __init__(self):
def __init__(self, loop: asyncio.AbstractEventLoop):
"""
初始化处理器
设置冷却时间,防止短时间内多次触发重载
设置冷却时间,并保存主事件循环的引用
"""
self.loop = loop
self.last_reload_time = 0
self.cooldown = 1.0 # 冷却时间,防止短时间内多次重载
@@ -64,8 +65,8 @@ class PluginReloadHandler(FileSystemEventHandler):
logger.info("正在重载插件...")
try:
# 重新扫描并加载插件
run_in_thread_pool(load_all_plugins)
# 使用线程安全的方式在主事件循环中运行异步的插件加载函数
asyncio.run_coroutine_threadsafe(run_in_thread_pool(load_all_plugins), self.loop)
logger.success("插件重载完成")
except Exception as e:
logger.exception(f"重载失败: {e}")
@@ -93,7 +94,9 @@ async def main():
# 监控 plugins 目录
plugin_path = os.path.join(os.path.dirname(__file__), "plugins")
event_handler = PluginReloadHandler()
# 获取当前事件循环并传递给处理器
loop = asyncio.get_running_loop()
event_handler = PluginReloadHandler(loop)
observer = Observer()
if os.path.exists(plugin_path):

141
plugins/bili_parser.py Normal file
View File

@@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-
import re
import json
import html
import requests
from bs4 import BeautifulSoup
from typing import Optional, Tuple, Dict, Any
from core.logger import logger
from core.command_manager import matcher
from models import MessageEvent, MessageSegment
__plugin_meta__ = {
"name": "bili_parser",
"description": "自动解析B站分享卡片提取视频封面和播放量等信息。",
"usage": "自动触发当检测到B站小程序分享卡片时自动发送视频信息。",
}
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def format_count(num: int) -> str:
if not isinstance(num, int):
return str(num)
if num < 10000:
return str(num)
return f"{num / 10000:.1f}"
def get_real_url(short_url: str) -> Optional[str]:
try:
response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5)
if response.status_code == 302:
return response.headers.get('Location')
except requests.RequestException as e:
print(f"获取真实URL失败: {e}")
return None
def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
try:
response = requests.get(video_url, headers=HEADERS, timeout=5)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag:
return None
json_str = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string).group(1)
data = json.loads(json_str)
video_data = data.get('videoData', {})
stat = video_data.get('stat', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
}
except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e:
print(f"解析视频信息失败: {e}")
return None
@matcher.on_message()
async def handle_bili_share(event: MessageEvent):
if not event.raw_message.startswith('[CQ:json,data='):
return
logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}")
try:
json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']')
json_str_decoded = html.unescape(json_str_raw)
data = json.loads(json_str_decoded)
short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if not short_url or "b23.tv" not in short_url:
logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。")
return
short_url = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}")
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
return
real_url = get_real_url(short_url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。")
await event.reply("无法解析B站短链接。")
return
video_info = parse_video_info(real_url)
if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" 标题: {video_info['title']}\n"
f" BV号: {video_info['bvid']}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {format_count(video_info['play'])}\n"
f" 点赞: {format_count(video_info['like'])}\n"
f" 投币: {format_count(video_info['coin'])}\n"
f" 收藏: {format_count(video_info['favorite'])}\n"
f" 转发: {format_count(video_info['share'])}\n"
f" B站链接: {short_url}"
)
image_message_segment = [MessageSegment.text("B站封面"),MessageSegment.image(video_info['cover_url'])]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment)
]
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes)

View File

@@ -22,20 +22,31 @@ __plugin_meta__ = {
"usage": "/code_py - 进入交互模式,等待输入代码块\n/code_py [单行代码] - 快速执行单行代码",
}
# --- 安全配置:危险模块黑名单 ---
# --- 安全配置:危险模块和内置函数黑名单 ---
DANGEROUS_MODULES = [
"os", "sys", "subprocess", "shutil", "socket", "requests", "urllib",
"http", "ftplib", "telnetlib", "ctypes", "_thread", "multiprocessing",
"asyncio",
]
DANGEROUS_BUILTINS = [
"__import__", "open", "exec", "eval", "compile", "input", "breakpoint"
]
# 编译后的正则表达式,用于分割语句
STATEMENT_SPLIT_PATTERN = re.compile(r'[;\n]')
# 编译后的正则表达式,用于查找危险的内置函数调用
BUILTIN_CALL_PATTERN = re.compile(r'\b(' + '|'.join(DANGEROUS_BUILTINS) + r')\s*\(')
def is_code_safe(code: str) -> Tuple[bool, str]:
"""
检查代码中是否包含危险的模块导入。
检查代码中是否包含危险的模块导入或内置函数调用
"""
# 1. 检查危险的内置函数
found_builtins = BUILTIN_CALL_PATTERN.search(code)
if found_builtins:
return False, f"检测到不允许的内置函数调用:'{found_builtins.group(1)}'"
# 2. 检查危险的模块导入
statements = STATEMENT_SPLIT_PATTERN.split(code)
for statement in statements:
statement = statement.strip()

View File

@@ -13,7 +13,7 @@ __plugin_meta__ = {
"usage": "/echo [内容] - 复读内容\n/赞我 - 让机器人给你点赞",
}
@matcher.command("echo")
@matcher.command("echo",permission=MessageEvent.ADMIN)
async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理 echo 指令,原样回复用户输入的内容

View File

@@ -25,4 +25,7 @@ async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
:param event: 消息事件对象。
:param args: 指令参数列表(未使用)。
"""
try:
await event.reply(MessageSegment.image("https://img.paulzzh.com/touhou/random"))
except Exception as e:
await event.reply("报错了。。。" + e)