refactor(scripts): 重构并优化脚本文件结构

feat(scripts): 添加Python环境检查脚本
feat(scripts): 增强依赖导出脚本功能
perf(plugins/bili_parser): 优化B站解析器性能和代码结构
style(plugins/bili_parser): 统一代码风格和常量命名
This commit is contained in:
2026-01-18 21:01:15 +08:00
parent 03bcded7ed
commit bd8726b768
8 changed files with 319 additions and 73 deletions

View File

@@ -1,8 +0,0 @@
import subprocess
# 运行pip freeze命令获取所有依赖
result = subprocess.run(['pip', 'freeze'], capture_output=True, text=True)
# 将输出写入requirements.txt文件
with open('requirements.txt', 'w', encoding='utf-8') as f:
f.write(result.stdout)

View File

@@ -1,16 +0,0 @@
import sys
import sysconfig
print(f"Python Version: {sys.version}")
# 检查 GIL 状态
try:
# Python 3.13+ free-threading build 才有这个属性
is_gil_enabled = sys._is_gil_enabled()
print(f"GIL Enabled: {is_gil_enabled}")
except AttributeError:
print("GIL Status: Unknown (sys._is_gil_enabled not found, likely GIL-enabled build)")
# 检查 JIT 状态
# 目前没有直接的 API 检查 JIT 是否开启,通常看性能或启动日志
print("JIT Support: Experimental (Enable with -X jit)")

View File

@@ -13,12 +13,16 @@ from models import MessageEvent, MessageSegment
# 创建一个TTL缓存最大容量100缓存时间10秒
processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
# 插件元数据
__plugin_meta__ = {
"name": "bili_parser",
"description": "自动解析B站分享卡片提取视频封面和播放量等信息。",
"usage": "自动触发当检测到B站小程序分享卡片时自动发送视频信息。",
}
# 常量定义
BILI_NICKNAME = "B站视频解析"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
@@ -29,7 +33,7 @@ _session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession()
_session = aiohttp.ClientSession(headers=HEADERS)
return _session
@@ -71,7 +75,7 @@ async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
if not script_tag or not script_tag.string:
return None
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string)
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^\}]*\});', script_tag.string)
if not match:
return None
@@ -135,9 +139,47 @@ async def get_direct_video_url(video_url: str) -> Optional[str]:
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
return None
BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/[a-zA-Z0-9_]+|b23\.tv/[a-zA-Z0-9]+)")
BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/\w+|b23\.tv/[a-zA-Z0-9]+)")
def extract_url_from_json_segments(segments):
"""
从消息的JSON段中提取B站链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "json":
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if short_url and "b23.tv" in short_url:
extracted_url = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {extracted_url}")
return extracted_url
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
continue
return None
def extract_url_from_text_segments(segments):
"""
从消息的文本段中提取B站链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "text":
text_content = segment.data.get("text", "")
match = BILI_URL_PATTERN.search(text_content)
if match:
extracted_url = match.group(0)
logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {extracted_url}")
return extracted_url
return None
@matcher.on_message()
async def handle_bili_share(event: MessageEvent):
"""
@@ -153,34 +195,12 @@ async def handle_bili_share(event: MessageEvent):
if event.user_id == event.self_id:
return
url_to_process = None
# 1. 优先解析JSON卡片中的短链接
for segment in event.message:
if segment.type == "json":
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if short_url and "b23.tv" in short_url:
url_to_process = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {url_to_process}")
break # 找到后立即跳出循环
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
continue
url_to_process = extract_url_from_json_segments(event.message)
# 2. 如果未在JSON卡片中找到链接则在文本消息中查找
if not url_to_process:
for segment in event.message:
if segment.type == "text":
text_content = segment.data.get("text", "")
match = BILI_URL_PATTERN.search(text_content)
if match:
url_to_process = match.group(0)
logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {url_to_process}")
break # 找到后立即跳出循环
url_to_process = extract_url_from_text_segments(event.message)
# 3. 如果找到了任何类型的B站链接则进行处理
if url_to_process:
@@ -248,10 +268,10 @@ async def process_bili_link(event: MessageEvent, url: str):
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=video_message)
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname=BILI_NICKNAME, message=video_message)
]
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")

104
scripts/check_python_env.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
Python 环境检查脚本
检查当前 Python 环境是否符合 NEO Bot 要求包括版本、GIL 状态、JIT 支持等。
"""
import sys
import platform
def main():
"""主函数"""
print("=" * 60)
print("NEO Bot Python 环境检查")
print("=" * 60)
# 1. Python 版本信息
version_info = sys.version_info
print("\n[1] Python 版本:")
print(f" 版本号: {sys.version}")
print(f" 主版本: {version_info.major}.{version_info.minor}.{version_info.micro}")
print(f" 发布日期: {version_info.releaselevel} {version_info.serial}")
# 检查是否为 Python 3.14
if version_info.major == 3 and version_info.minor == 14:
print(" ✓ 符合要求: Python 3.14")
else:
print(f" ⚠ 警告: 推荐使用 Python 3.14,当前为 {version_info.major}.{version_info.minor}")
# 2. 平台信息
print("\n[2] 平台信息:")
print(f" 操作系统: {platform.system()} {platform.release()}")
print(f" 处理器: {platform.processor()}")
print(f" 架构: {platform.machine()}")
# 3. GIL 状态
print("\n[3] GIL (全局解释器锁) 状态:")
try:
# Python 3.13+ free-threading build 才有这个属性
is_gil_enabled = sys._is_gil_enabled()
if is_gil_enabled:
print(" GIL 已启用 (传统模式)")
else:
print(" GIL 已禁用 (自由线程模式)")
except AttributeError:
print(" GIL 状态: 未知 (sys._is_gil_enabled 未找到,可能是传统 GIL 构建)")
# 4. JIT 状态
print("\n[4] JIT (即时编译) 状态:")
# 检查是否启用了 JIT
jit_enabled = False
jit_details = "未知"
# 方法1: 检查启动标志
if hasattr(sys, 'flags'):
# Python 3.14 的 JIT 通过 -X jit 启用
# 但 sys.flags 中没有直接的 JIT 标志
pass
# 方法2: 检查是否有 JIT 相关属性
try:
# 尝试导入 _jit 模块(如果存在)
import _jit
jit_enabled = True
jit_details = "检测到 _jit 模块"
del _jit # 避免未使用的导入警告
except ImportError:
# 检查 sys 模块中是否有 JIT 相关属性
if hasattr(sys, '_jit_enabled'):
jit_enabled = sys._jit_enabled
jit_details = f"sys._jit_enabled = {jit_enabled}"
else:
jit_details = "未检测到 JIT 模块或属性"
if jit_enabled:
print(" ✓ JIT 已启用")
print(f" 详情: {jit_details}")
else:
print(" ⚠ JIT 未启用或不可用")
print(f" 详情: {jit_details}")
print(" 建议: 启动时使用 -X jit 参数启用 JIT例如: python -X jit main.py")
# 5. 其他信息
print("\n[5] 其他信息:")
print(f" 实现: {platform.python_implementation()}")
print(f" 构建: {platform.python_build()}")
print(f" 编译器: {platform.python_compiler()}")
# 6. 路径信息
print("\n[6] 路径信息:")
print(f" 执行文件: {sys.executable}")
print(f" 前缀: {sys.prefix}")
print(" 路径:")
for i, path in enumerate(sys.path[:5], 1): # 只显示前5个
print(f" {i}. {path}")
if len(sys.path) > 5:
print(f" ... 还有 {len(sys.path) - 5} 个路径")
print("\n" + "=" * 60)
print("检查完成")
print("=" * 60)
if __name__ == '__main__':
main()

View File

@@ -30,16 +30,18 @@ import subprocess
import shutil
import argparse
# 检测当前平台
# 检测当前平台和 Python 版本
PLATFORM = sys.platform
PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" # 例如 "314"
if PLATFORM.startswith('win'):
EXTENSION = '.pyd'
BUILD_PREFIX = 'cp314-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-314')
BUILD_PREFIX = f'cp{PYTHON_VERSION}-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-{PYTHON_VERSION}')
elif PLATFORM.startswith('linux'):
EXTENSION = '.so'
BUILD_PREFIX = 'cp314-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-314')
BUILD_PREFIX = f'cp{PYTHON_VERSION}-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-{PYTHON_VERSION}')
else:
print(f"不支持的平台: {PLATFORM}")
sys.exit(1)
@@ -263,6 +265,13 @@ def compile_all_modules():
def main():
"""主函数"""
# 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor == 14):
print("警告: 推荐使用 Python 3.14 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题")
print()
parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()

View File

@@ -8,7 +8,10 @@ import os
import sys
import glob
from mypyc.build import mypycify
from distutils.core import setup
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
def compile_module(module_path):
"""
@@ -31,6 +34,13 @@ def main():
"""
主函数
"""
# 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor == 14):
print("警告: 推荐使用 Python 3.14 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题")
print()
# 要编译的模块列表
modules = [
'core/utils/json_utils.py', # JSON 处理
@@ -39,7 +49,7 @@ def main():
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/ws.py', # WebSocket 核心
'core/managers/plugin_manager.py', # 插件管理器
'core/managers/plugin_manager.py', # 插件管理器
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
]

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
导出项目依赖到 requirements.txt 文件
支持两种模式:
1. 默认模式导出当前虚拟环境中的所有包pip freeze
2. 本地模式只导出当前项目的依赖pip freeze --local
使用方法:
python export_requirements.py [options]
选项:
--local, -l 只导出当前项目的依赖(推荐)
--output, -o 指定输出文件路径(默认为 requirements.txt
--help, -h 显示帮助信息
"""
import subprocess
import sys
import argparse
def run_pip_freeze(local_mode=False):
"""
运行 pip freeze 命令
Args:
local_mode: 是否只导出当前项目依赖
Returns:
(success, output): 成功标志和输出内容
"""
cmd = ['pip', 'freeze']
if local_mode:
cmd.append('--local')
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
encoding='utf-8'
)
return True, result.stdout
except subprocess.CalledProcessError as e:
error_msg = f"pip freeze 命令失败,退出码: {e.returncode}\n"
if e.stderr:
error_msg += f"错误信息: {e.stderr}"
return False, error_msg
except FileNotFoundError:
return False, "错误: 未找到 pip 命令,请确保 Python 环境已正确安装"
except Exception as e:
return False, f"未知错误: {e}"
def write_requirements_file(output_path, content):
"""
将依赖内容写入文件
Args:
output_path: 输出文件路径
content: 依赖内容
Returns:
success: 是否成功
"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content)
# 统计行数(忽略空行)
lines = [line.strip() for line in content.split('\n') if line.strip()]
return True, len(lines)
except IOError as e:
return False, f"写入文件失败: {e}"
except Exception as e:
return False, f"未知错误: {e}"
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='导出项目依赖到 requirements.txt 文件')
parser.add_argument('--local', '-l', action='store_true',
help='只导出当前项目的依赖(推荐)')
parser.add_argument('--output', '-o', default='requirements.txt',
help='指定输出文件路径(默认为 requirements.txt')
args = parser.parse_args()
print("=" * 60)
print("NEO Bot 依赖导出工具")
print("=" * 60)
# 显示模式信息
if args.local:
print("模式: 本地模式(只导出当前项目依赖)")
else:
print("模式: 全局模式(导出所有已安装包)")
print("提示: 建议使用 --local 选项只导出当前项目依赖")
print(f"输出文件: {args.output}")
print()
# 运行 pip freeze
print("正在收集依赖信息...")
success, output = run_pip_freeze(args.local)
if not success:
print(f"错误: {output}")
sys.exit(1)
# 写入文件
print("正在写入文件...")
success, result = write_requirements_file(args.output, output)
if not success:
print(f"错误: {result}")
sys.exit(1)
line_count = result
print("✓ 依赖导出完成")
print(f" 文件: {args.output}")
print(f" 依赖数量: {line_count} 个包")
# 显示前几个依赖(如果有)
lines = [line.strip() for line in output.split('\n') if line.strip()]
if lines:
print("\n前5个依赖:")
for i, line in enumerate(lines[:5], 1):
print(f" {i}. {line}")
if len(lines) > 5:
print(f" ... 还有 {len(lines) - 5} 个依赖")
print("\n" + "=" * 60)
print("提示: 可以使用 pip install -r requirements.txt 安装这些依赖")
print("=" * 60)
if __name__ == '__main__':
main()

View File

@@ -1,10 +0,0 @@
x = 5
# 它有自己的身份
print(id(x))
# 它有自己的类型
print(type(x))
# 它甚至有自己的工具!
print(x.bit_length())