feat: 添加抖音视频解析插件并优化代码结构

添加抖音视频解析插件,支持自动解析抖音分享链接并提取视频信息。优化现有代码结构,包括:
- 重构单例模式实现
- 移除未使用的导入和文件
- 修复性能测试脚本中的异步调用
- 优化消息事件模型中的权限常量定义
- 改进编译脚本的错误处理
- 增强B站解析插件的稳定性

同时清理了多个废弃脚本和临时文件,提升代码可维护性。
This commit is contained in:
2026-01-19 01:16:22 +08:00
parent 067d81a07c
commit 9f54a98c17
17 changed files with 680 additions and 519 deletions

2
.gitignore vendored
View File

@@ -146,4 +146,4 @@ build/
# Scratch files # Scratch files
scratch_files/ scratch_files/
config.toml /config.toml

View File

@@ -1,294 +0,0 @@
#!/usr/bin/env python3
"""
跨平台 Python 模块编译脚本
将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。
支持的平台:
- Windows: 生成 .pyd 文件
- Linux: 生成 .so 文件
使用方法:
python compile_machine_code.py [options]
选项:
--compile, -c 编译指定的模块(默认)
--list, -l 列出已编译的模块
--clean, -k 清理编译生成的文件
--help, -h 显示帮助信息
注意:
1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC)
2. 需要安装 mypyc: pip install mypyc
3. 编译后的文件是平台相关的,不能跨平台复制
4. 建议在部署的目标环境上运行此脚本
"""
import os
import sys
import glob
import subprocess
import shutil
import argparse
# 检测当前平台
PLATFORM = sys.platform
if PLATFORM.startswith('win'):
EXTENSION = '.pyd'
BUILD_PREFIX = 'cp314-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-314')
elif PLATFORM.startswith('linux'):
EXTENSION = '.so'
BUILD_PREFIX = 'cp314-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-314')
else:
print(f"不支持的平台: {PLATFORM}")
sys.exit(1)
# 要编译的模块列表
# 注意Mypyc 对动态特性支持有限,只选择计算密集或类型明确的模块
MODULES = [
# 工具模块
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/utils/singleton.py', # 单例模式基类
'core/utils/exceptions.py', # 自定义异常
'core/utils/logger.py', # 日志模块
# 核心管理模块
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/managers/plugin_manager.py', # 插件管理器
'core/managers/redis_manager.py', # Redis 管理器
'core/managers/image_manager.py', # 图片管理器
# 核心基础模块
'core/ws.py', # WebSocket 核心
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
'core/config_models.py', # 配置模型
'core/permission.py', # 权限枚举
# API 模块 - 注意:这些类会被 Bot 类多继承使用
# 因此不适合编译,否则会导致 "multiple bases have instance lay-out conflict" 错误
# 'core/api/base.py', # API 基础类
# 'core/api/account.py', # 账号相关 API
# 'core/api/friend.py', # 好友相关 API
# 'core/api/group.py', # 群组相关 API
# 'core/api/media.py', # 媒体相关 API
# 'core/api/message.py', # 消息相关 API
# 数据模型(适合编译的高频使用数据类)
'models/message.py', # 消息段模型
'models/sender.py', # 发送者模型
'models/objects.py', # API 响应数据模型
# 事件处理相关
'core/handlers/event_handler.py', # 事件处理器
# 注意:以下文件不适合编译
# - 主程序文件main.py
# - 测试文件tests/目录)
# - 插件文件plugins/目录)
# - 编译脚本compile_machine_code.py等
# - 临时文件scratch_files/目录)
# - 抽象基类models/events/base.py
# - 事件工厂models/events/factory.py
# - 包含复杂动态特性的文件
]
def list_compiled_modules():
"""列出已编译的模块"""
print(f"\n已编译的 {PLATFORM} 模块:")
print("=" * 50)
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
size = os.path.getsize(f) // 1024 # KB
print(f"{f} ({size} KB)")
else:
print(f"未找到已编译的 {EXTENSION} 文件")
print(f"\n总计: {len(compiled_files)} 个文件")
def clean_compiled_files():
"""清理编译生成的文件"""
print(f"\n清理编译生成的 {EXTENSION} 文件...")
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
try:
os.remove(f)
print(f"已删除: {f}")
except Exception as e:
print(f"删除失败 {f}: {e}")
# 清理 build 目录
if os.path.exists('build'):
try:
shutil.rmtree('build')
print("已删除 build 目录")
except Exception as e:
print(f"删除 build 目录失败: {e}")
else:
print(f"没有可清理的 {EXTENSION} 文件")
def get_platform_specific_module_name(module_path):
"""获取平台特定的模块文件名"""
module_name = module_path.replace('.py', '')
return f"{module_name}.{BUILD_PREFIX}{EXTENSION}"
def compile_module(module_path):
"""编译单个模块"""
print(f"\n编译: {module_path}")
try:
# 直接调用 mypyc 命令行工具
result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path],
capture_output=True,
text=True,
check=True
)
# 获取平台特定的模块名
platform_module = get_platform_specific_module_name(module_path)
mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}')
# 检查编译产物是否在当前目录
if os.path.exists(platform_module):
print(f" ✓ 编译成功: {platform_module}")
return True
else:
# 检查 build 目录中是否有编译产物
build_module_path = os.path.join(BUILD_PATH, platform_module)
build_mypyc_path = os.path.join(BUILD_PATH, mypyc_platform_module)
if os.path.exists(build_module_path):
# 如果在 build 目录中,复制到正确位置
os.makedirs(os.path.dirname(platform_module), exist_ok=True)
shutil.copy2(build_module_path, platform_module)
shutil.copy2(build_mypyc_path, mypyc_platform_module)
print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}")
return True
else:
print(f" ✗ 编译失败:找不到编译产物")
if result.stdout:
print(f" 编译输出:{result.stdout[:500]}...")
if result.stderr:
print(f" 错误信息:{result.stderr[:500]}...")
return False
except subprocess.CalledProcessError as e:
print(f" ✗ 编译失败,退出码: {e.returncode}")
if e.stdout:
print(f" 编译输出:{e.stdout[:500]}...")
if e.stderr:
print(f" 错误信息:{e.stderr[:500]}...")
return False
except Exception as e:
print(f" ✗ 编译失败,意外错误: {e}")
return False
def should_skip_module(module_path):
"""检查模块是否应该被跳过编译"""
try:
with open(module_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含抽象基类相关代码
if 'from abc import ABC' in content or 'from abc import abstractmethod' in content:
return True, "包含抽象基类,不适合编译"
# 检查是否包含动态特性
if 'eval(' in content or 'exec(' in content or 'getattr(' in content or 'setattr(' in content:
return True, "包含动态特性,不适合编译"
return False, ""
except Exception as e:
return True, f"读取文件时出错: {e}"
def compile_all_modules():
"""编译所有指定的模块"""
print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})")
print("=" * 60)
# 验证模块文件是否存在并检查是否适合编译
valid_modules = []
for module_path in MODULES:
if os.path.exists(module_path):
should_skip, reason = should_skip_module(module_path)
if should_skip:
print(f"跳过: {module_path} ({reason})")
else:
valid_modules.append(module_path)
else:
print(f"警告: 模块 {module_path} 不存在,将被跳过")
if not valid_modules:
print("错误: 没有有效的模块可编译")
return False
# 编译模块
success_count = 0
for module_path in valid_modules:
if compile_module(module_path):
success_count += 1
print(f"\n" + "=" * 60)
print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功")
if success_count == len(valid_modules):
print("✓ 所有模块编译成功")
return True
else:
print("✗ 部分模块编译失败")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()
group.add_argument('--compile', '-c', action='store_true', default=True,
help='编译指定的模块 (默认)')
group.add_argument('--list', '-l', action='store_true',
help='列出已编译的模块')
group.add_argument('--clean', '-k', action='store_true',
help='清理编译生成的文件')
args = parser.parse_args()
# 检查是否安装了 mypyc
try:
import mypyc
except ImportError:
print("错误: 未安装 mypyc请先安装: pip install mypyc")
sys.exit(1)
if args.list:
list_compiled_modules()
elif args.clean:
clean_compiled_files()
else:
compile_all_modules()
print("\n使用 --list 选项查看已编译的模块")
if __name__ == '__main__':
main()

View File

@@ -1,65 +0,0 @@
#!/usr/bin/env python3
"""
编译模块脚本
这个脚本会单独编译每个Python模块确保每个模块都在正确位置生成独立的.pyd文件。
"""
import os
import sys
import glob
from mypyc.build import mypycify
from distutils.core import setup
def compile_module(module_path):
"""
编译单个模块
Args:
module_path: 要编译的Python模块路径
"""
print(f"\nCompiling {module_path}...")
try:
ext_modules = mypycify([module_path])
setup(name=f'compiled_{os.path.basename(module_path).replace(".py", "")}',
ext_modules=ext_modules)
return True
except Exception as e:
print(f"Error compiling {module_path}: {e}")
return False
def main():
"""
主函数
"""
# 要编译的模块列表
modules = [
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/ws.py', # WebSocket 核心
'core/managers/plugin_manager.py', # 插件管理器
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
]
# 自动添加 events 模型
event_models = glob.glob('models/events/*.py')
event_models = [m for m in event_models if not m.endswith('__init__.py')]
modules.extend(event_models)
print(f"Found {len(modules)} modules to compile.")
success_count = 0
for module in modules:
if compile_module(module):
success_count += 1
print(f"\n--- Compilation Summary ---")
print(f"Total modules: {len(modules)}")
print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(modules) - success_count}")
if __name__ == '__main__':
main()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 147 KiB

After

Width:  |  Height:  |  Size: 159 KiB

View File

@@ -12,7 +12,6 @@
""" """
import time import time
import asyncio
import functools import functools
import logging import logging
from typing import Dict, Any, Callable, Optional from typing import Dict, Any, Callable, Optional

View File

@@ -1,11 +1,13 @@
""" """
通用单例模式基类 通用单例模式基类
""" """
from typing import Any, Optional, Type, TypeVar from typing import Any, Dict, Optional, Type, TypeVar
import functools
T = TypeVar('T') T = TypeVar('T')
# 存储每个类的实例
_instance_store: Dict[Type, Any] = {}
class Singleton: class Singleton:
""" """
一个通用的单例基类 一个通用的单例基类
@@ -14,7 +16,6 @@ class Singleton:
它通过重写 __new__ 方法来确保每个类只有一个实例。 它通过重写 __new__ 方法来确保每个类只有一个实例。
同时,它处理了重复初始化的问题,确保 __init__ 方法只在第一次实例化时被调用。 同时,它处理了重复初始化的问题,确保 __init__ 方法只在第一次实例化时被调用。
""" """
_instance: Optional[Any] = None
_initialized: bool = False _initialized: bool = False
def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T: def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T:
@@ -28,9 +29,10 @@ class Singleton:
Returns: Returns:
T: 单例实例 T: 单例实例
""" """
if cls._instance is None: # 使用全局字典存储实例,避免类型检查问题
cls._instance = super().__new__(cls) if cls not in _instance_store:
return cls._instance _instance_store[cls] = super().__new__(cls)
return _instance_store[cls]
def __init__(self) -> None: def __init__(self) -> None:
""" """
@@ -53,15 +55,24 @@ def singleton(cls: Type[T]) -> Type[T]:
Returns: Returns:
Type[T]: 单例类 Type[T]: 单例类
""" """
_instance: Optional[T] = None # 为每个装饰的类创建一个实例存储
_initialized: bool = False class_instance: Optional[T] = None
@functools.wraps(cls) # 创建一个新的类,继承自原始类
def wrapper(*args: Any, **kwargs: Any) -> T: class SingletonClass(cls):
nonlocal _instance, _initialized """单例包装类"""
if _instance is None: def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T:
_instance = cls(*args, **kwargs) """创建或返回现有的实例"""
return _instance nonlocal class_instance
if class_instance is None:
# 使用super()调用原始类的__new__方法
class_instance = cls(*args, **kwargs)
return class_instance
return wrapper # 复制类的元数据
SingletonClass.__name__ = cls.__name__
SingletonClass.__doc__ = cls.__doc__
SingletonClass.__module__ = cls.__module__
return SingletonClass

View File

@@ -1,8 +0,0 @@
import subprocess
# 运行pip freeze命令获取所有依赖
result = subprocess.run(['pip', 'freeze'], capture_output=True, text=True)
# 将输出写入requirements.txt文件
with open('requirements.txt', 'w', encoding='utf-8') as f:
f.write(result.stdout)

64
main.py
View File

@@ -10,6 +10,59 @@ import time
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
# 初始化日志系统,必须在其他 core 模块导入之前执行
from core.utils.logger import logger
# 核心模块导入
from core.managers.admin_manager import admin_manager
from core.ws import WS
from core.managers import plugin_manager, matcher
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
from core.config_loader import global_config as config
# 检查 JIT 编译状态
def check_jit_status():
"""
检查 Python JIT 编译状态
该函数用于检测当前 Python 解释器是否启用了 JIT 编译功能,
并打印相关信息,帮助用户了解运行环境的性能优化状态。
"""
print("\n=== Python JIT 编译状态检查 ===")
# 检查解释器信息
print(f"Python 版本: {sys.version}")
print(f"解释器路径: {sys.executable}")
# 检查优化级别
print(f"优化级别 (-O): {sys.flags.optimize}")
# 检查 JIT 相关模块和功能
if sys.version_info >= (3, 10):
try:
# 对于 CPython 3.10+,检查是否启用了 JIT
import _opcode
if hasattr(_opcode, 'jit'):
print("JIT 状态: 已启用 (_opcode.jit)")
else:
print("JIT 状态: 未启用 (_opcode.jit 不可用)")
except ImportError:
print("JIT 状态: 未启用 (_opcode 模块不可用)")
else:
print("JIT 状态: 不可用 (需要 Python 3.10+)")
# 检查是否使用了 PyPy
if hasattr(sys, 'pypy_version_info'):
print(f"PyPy 版本: {sys.pypy_version_info}")
print("JIT 状态: 已启用 (PyPy 内置 JIT)")
print("==============================\n")
# 执行 JIT 状态检查
check_jit_status()
# 尝试使用高性能事件循环 # 尝试使用高性能事件循环
try: try:
if sys.platform == 'win32': if sys.platform == 'win32':
@@ -25,17 +78,6 @@ try:
except ImportError: except ImportError:
print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环") print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环")
# 初始化日志系统,必须在其他 core 模块导入之前执行
from core.utils.logger import logger
from core.managers.admin_manager import admin_manager
from core.ws import WS
from core.managers import plugin_manager, matcher
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
from core.config_loader import global_config as config
# 将项目根目录添加到 sys.path # 将项目根目录添加到 sys.path
ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT_DIR) sys.path.insert(0, ROOT_DIR)

View File

@@ -4,7 +4,7 @@
定义了消息相关的事件类,包括 MessageEvent, PrivateMessageEvent, GroupMessageEvent。 定义了消息相关的事件类,包括 MessageEvent, PrivateMessageEvent, GroupMessageEvent。
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import List, Optional, Union, ClassVar from typing import List, Optional, Union
from core.permission import Permission from core.permission import Permission
from models.message import MessageSegment from models.message import MessageSegment
@@ -27,17 +27,19 @@ class Anonymous:
"""匿名用户 flag""" """匿名用户 flag"""
# 权限级别常量,用于装饰器参数
# 定义在类外部,避免 dataclass 参数顺序问题
MESSAGE_EVENT_ADMIN = Permission.ADMIN
MESSAGE_EVENT_OP = Permission.OP
MESSAGE_EVENT_USER = Permission.USER
@dataclass(slots=True) @dataclass(slots=True)
class MessageEvent(OneBotEvent): class MessageEvent(OneBotEvent):
""" """
消息事件基类 消息事件基类
""" """
# 权限级别常量,用于装饰器参数
ADMIN: ClassVar[Permission] = Permission.ADMIN
OP: ClassVar[Permission] = Permission.OP
USER: ClassVar[Permission] = Permission.USER
message_type: str message_type: str
"""消息类型: private (私聊), group (群聊)""" """消息类型: private (私聊), group (群聊)"""
@@ -70,6 +72,21 @@ class MessageEvent(OneBotEvent):
def post_type(self) -> str: def post_type(self) -> str:
return EventType.MESSAGE return EventType.MESSAGE
@property
def ADMIN(self) -> Permission:
"""权限级别常量,用于装饰器参数"""
return MESSAGE_EVENT_ADMIN
@property
def OP(self) -> Permission:
"""权限级别常量,用于装饰器参数"""
return MESSAGE_EVENT_OP
@property
def USER(self) -> Permission:
"""权限级别常量,用于装饰器参数"""
return MESSAGE_EVENT_USER
async def reply(self, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False): async def reply(self, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False):
""" """
回复消息(抽象方法,由子类实现) 回复消息(抽象方法,由子类实现)

View File

@@ -30,7 +30,7 @@ HEADERS = {
# 全局共享的 ClientSession # 全局共享的 ClientSession
_session: Optional[aiohttp.ClientSession] = None _session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession: def get_session() -> aiohttp.ClientSession:
global _session global _session
if _session is None or _session.closed: if _session is None or _session.closed:
_session = aiohttp.ClientSession(headers=HEADERS) _session = aiohttp.ClientSession(headers=HEADERS)
@@ -55,7 +55,7 @@ def format_duration(seconds: int) -> str:
async def get_real_url(short_url: str) -> Optional[str]: async def get_real_url(short_url: str) -> Optional[str]:
try: try:
session = await get_session() session = get_session()
async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response: async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302: if response.status == 302:
return response.headers.get('Location') return response.headers.get('Location')
@@ -65,22 +65,71 @@ async def get_real_url(short_url: str) -> Optional[str]:
async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
try: try:
session = await get_session() # 清理URL去掉不必要的查询参数只保留基本的视频URL
async with session.get(video_url, headers=HEADERS, timeout=5) as response: clean_url = video_url.split('?')[0]
if '#/' in clean_url:
clean_url = clean_url.split('#/')[0]
session = get_session()
async with session.get(clean_url, headers=HEADERS, timeout=5) as response:
response.raise_for_status() response.raise_for_status()
text = await response.text() text = await response.text()
soup = BeautifulSoup(text, 'html.parser') soup = BeautifulSoup(text, 'html.parser')
# 尝试多种方式获取视频数据
# 方式1: 尝试获取 __INITIAL_STATE__
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string: if not script_tag or not script_tag.string:
# 方式2: 尝试获取 __PLAYINFO__
script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__'))
if not script_tag or not script_tag.string:
# 方式3: 尝试获取页面标题和其他信息
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# 提取BV号
bv_match = re.search(r'(BV\w{10})', clean_url)
bvid = bv_match.group(1) if bv_match else '未知BV号'
return {
"title": title.replace('_哔哩哔哩_bilibili', '').strip(),
"bvid": bvid,
"duration": 0,
"cover_url": '',
"play": 0,
"like": 0,
"coin": 0,
"favorite": 0,
"share": 0,
"owner_name": '未知UP主',
"owner_avatar": '',
"followers": 0,
}
return None return None
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^\}]*\});', script_tag.string) # 原始解析逻辑
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string)
if not match:
# 尝试另一种正则表达式
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL)
if not match: if not match:
return None return None
json_str = match.group(1) json_str = match.group(1)
data = json.loads(json_str) # 清理JSON字符串中的潜在问题字符
json_str = json_str.strip().rstrip(';')
try:
data = json.loads(json_str)
except json.JSONDecodeError:
# 如果直接解析失败尝试清理JSON字符串
# 移除可能的注释或无效字符
cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释
cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释
data = json.loads(cleaned_json)
video_data = data.get('videoData', {}) video_data = data.get('videoData', {})
up_data = data.get('upData', {}) up_data = data.get('upData', {})
@@ -116,6 +165,10 @@ async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"解析视频信息失败: {e}") logger.error(f"解析视频信息失败: {e}")
logger.debug(f"失败的URL: {video_url}")
except Exception as e:
logger.error(f"解析视频信息时发生未知错误: {e}")
logger.debug(f"失败的URL: {video_url}")
return None return None
@@ -212,24 +265,32 @@ async def process_bili_link(event: MessageEvent, url: str):
:param event: 消息事件对象 :param event: 消息事件对象
:param url: 待处理的B站链接 :param url: 待处理的B站链接
""" """
if "b23.tv" in url: try:
real_url = await get_real_url(url) if "b23.tv" in url:
if not real_url: real_url = await get_real_url(url)
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。") if not real_url:
await event.reply("无法解析B站短链接") logger.error(f"[bili_parser] 无法从 {url} 获取真实URL")
return await event.reply("无法解析B站短链接。")
else: return
real_url = url.split('?')[0] else:
# 清理URL移除复杂查询参数只保留基本的视频URL
real_url = url.split('?')[0]
if '#/' in real_url:
real_url = real_url.split('#/')[0]
video_info = await parse_video_info(real_url) video_info = await parse_video_info(real_url)
if not video_info: if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。") await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
except Exception as e:
logger.error(f"[bili_parser] 处理B站链接时发生错误: {e}")
await event.reply("处理B站链接时发生错误请稍后再试。")
return return
# 检查视频时长 # 检查视频时长
video_message: Union[str, MessageSegment] video_message: Union[str, MessageSegment]
if video_info['duration'] > 300: # 5分钟 = 300秒 if video_info['duration'] > 1200: # 5分钟 = 300秒
video_message = "视频时长超过5分钟不进行解析。" video_message = "视频时长超过5分钟不进行解析。"
else: else:
direct_url = await get_direct_video_url(real_url) direct_url = await get_direct_video_url(real_url)

391
plugins/douyin_parser.py Normal file
View File

@@ -0,0 +1,391 @@
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from typing import Optional, Dict, Any, Union
from cachetools import TTLCache
from core.utils.logger import logger
from core.managers.command_manager import matcher
from models import MessageEvent, MessageSegment
# 创建一个TTL缓存最大容量100缓存时间10秒
processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
# 插件元数据
__plugin_meta__ = {
"name": "douyin_parser",
"description": "自动解析抖音分享链接,提取视频信息和直链。",
"usage": "(自动触发)当检测到抖音分享链接时,自动发送视频信息。",
}
# 常量定义
DOUYIN_NICKNAME = "抖音视频解析"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br', # 重新启用br编码支持
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# 全局共享的 ClientSession
_session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession(headers=HEADERS)
return _session
def format_count(num: Union[int, str]) -> str:
try:
n = int(num)
if n < 10000:
return str(n)
return f"{n / 10000:.1f}"
except (ValueError, TypeError):
return str(num)
DOUYIN_URL_PATTERN = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线
DOUYIN_SHORT_PATTERN = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线
def extract_url_from_json_segments(segments):
"""
从消息的JSON段中提取抖音链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "json":
logger.info(f"[douyin_parser] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
# 检查是否是抖音分享卡片
meta = json_data.get("meta", {})
if "detail_1" in meta:
detail = meta["detail_1"]
if "qqdocurl" in detail:
url = detail["qqdocurl"]
if "douyin.com" in url or "iesdouyin.com" in url:
logger.success(f"[douyin_parser] 成功从JSON卡片中提取到抖音链接: {url}")
return url
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[douyin_parser] 解析JSON失败: {e}")
continue
return None
def extract_url_from_text_segments(segments):
"""
从消息的文本段中提取抖音链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "text":
text_content = segment.data.get("text", "")
# 查找抖音链接
match = DOUYIN_URL_PATTERN.search(text_content)
if match:
extracted_url = match.group(0)
logger.success(f"[douyin_parser] 成功从文本中提取到抖音链接: {extracted_url}")
return extracted_url
# 也检查是否有v.douyin.com格式的链接
short_match = DOUYIN_SHORT_PATTERN.search(text_content)
if short_match:
extracted_url = short_match.group(0)
logger.success(f"[douyin_parser] 成功从文本中提取到抖音短链接: {extracted_url}")
return extracted_url
return None
@matcher.on_message()
async def handle_douyin_share(event: MessageEvent):
"""
处理消息检测抖音分享链接JSON卡片或文本链接并进行解析。
:param event: 消息事件对象
"""
# 消息去重
if event.message_id in processed_messages:
return
processed_messages[event.message_id] = True
# 忽略机器人自己发送的消息,防止无限循环
if event.user_id == event.self_id:
return
# 1. 优先解析JSON卡片中的链接
url_to_process = extract_url_from_json_segments(event.message)
# 2. 如果未在JSON卡片中找到链接则在文本消息中查找
if not url_to_process:
url_to_process = extract_url_from_text_segments(event.message)
# 3. 如果找到了抖音链接,则进行处理
if url_to_process:
await process_douyin_link(event, url_to_process)
async def get_real_url(short_url: str) -> Optional[str]:
"""
获取抖音短链接的真实URL
:param short_url: 抖音短链接
:return: 真实URL或None
"""
try:
# 首先尝试获取重定向后的URL
async with aiohttp.ClientSession() as session:
# 添加更多头部信息模拟移动端访问
mobile_headers = HEADERS.copy() # 使用更新后的完整请求头
mobile_headers.update({
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
# 模拟移动设备的额外头部
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'https://www.douyin.com/'
})
async with session.get(short_url, headers=mobile_headers, allow_redirects=True, timeout=10) as response:
redirected_url = str(response.url)
# 检查重定向后的URL是否包含视频ID
# 抖音视频页通常包含 aweme_id 或 sec_uid 参数
if 'video/' in redirected_url or '/note/' in redirected_url:
logger.info(f"[douyin_parser] 重定向后的视频URL: {redirected_url}")
return redirected_url
elif 'share_item' in redirected_url:
# 如果URL中有share_item参数尝试从中提取视频信息
logger.info(f"[douyin_parser] 重定向后的分享URL: {redirected_url}")
return redirected_url
else:
# 如果重定向到了主页或其他非视频页面,尝试从响应中提取信息
logger.warning(f"[douyin_parser] 重定向到了非预期页面: {redirected_url}")
return redirected_url
except Exception as e:
logger.error(f"[douyin_parser] 获取真实URL失败: {e}")
return None
async def parse_douyin_video(video_url: str) -> Optional[Dict[str, Any]]:
"""
解析抖音视频信息
:param video_url: 抖音视频链接
:return: 视频信息字典或None
"""
try:
# 使用新的第三方API解析抖音视频
api_url = f"http://api.xhus.cn/api/douyin?url={video_url}"
session = await get_session()
async with session.get(api_url, headers=HEADERS, timeout=10) as response:
if response.status != 200:
logger.error(f"[douyin_parser] API请求失败状态码: {response.status}")
return None
response_data = await response.json()
if not isinstance(response_data, dict):
logger.error(f"[douyin_parser] API返回格式错误: {response_data}")
return None
if response_data.get("code") != 200:
logger.error(f"[douyin_parser] API返回错误: {response_data}")
return None
data = response_data.get("data", {})
if not data:
logger.error("[douyin_parser] API返回数据为空")
return None
# 新API的响应格式转换
return {
"type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image",
"video_url": data.get("url", ""), # 核心字段:视频播放地址
"video_url_HQ": data.get("url", ""), # 新API没有HQ字段使用同一个地址
"nickname": data.get("author", "未知作者"),
"desc": data.get("title", "无描述"),
"aweme_id": data.get("uid", ""),
"like": data.get("like", 0),
"cover": data.get("cover", ""),
"time": data.get("time", 0),
"author_avatar": data.get("avatar", ""),
"music": data.get("music", {}),
}
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"[douyin_parser] 解析抖音视频信息失败: {e}")
logger.debug(f"失败的URL: {video_url}")
except Exception as e:
logger.error(f"[douyin_parser] 解析抖音视频时发生未知错误: {e}")
logger.debug(f"失败的URL: {video_url}")
return None
async def process_douyin_link(event: MessageEvent, url: str):
"""
处理抖音链接,获取信息并回复
:param event: 消息事件对象
:param url: 待处理的抖音链接
"""
try:
# 直接将原始链接传递给API不需要获取真实URL
video_info = await parse_douyin_video(url)
if not video_info:
logger.error(f"[douyin_parser] 无法从 {url} 解析视频信息。")
await event.reply("无法获取视频信息,可能是抖音接口变动或视频不存在。")
return
# 构建回复消息,包含原分享中的文本内容(如果有)
original_text = ""
for segment in event.message:
if segment.type == "text":
text_content = segment.data.get("text", "")
# 提取除了链接以外的文本内容
# 移除链接和复制提示
cleaned_text = re.sub(DOUYIN_URL_PATTERN, '', text_content)
cleaned_text = re.sub(DOUYIN_SHORT_PATTERN, '', cleaned_text)
cleaned_text = re.sub(r'复制此链接打开Dou音搜索直接观看视频', '', cleaned_text)
cleaned_text = cleaned_text.strip()
if cleaned_text:
original_text = cleaned_text
break
# 构建回复消息
text_parts = ["抖音视频解析"]
text_parts.append("--------------------")
if original_text:
text_parts.append(f" 分享内容: {original_text}")
text_parts.append("--------------------")
text_parts.append(f" 作者: {video_info['nickname']}")
text_parts.append(f" 抖音号: {video_info['aweme_id']}")
text_parts.append(f" 标题: {video_info['desc']}")
text_parts.append(f" 点赞: {format_count(video_info['like'])}")
text_parts.append(f" 类型: {video_info['type']}")
# 如果是音乐,添加音乐信息
if video_info.get('music'):
music_info = video_info['music']
text_parts.append("--------------------")
text_parts.append(" 背景音乐:")
text_parts.append(f" 标题: {music_info.get('title', '')}")
text_parts.append(f" 作者: {music_info.get('author', '')}")
text_parts.append("--------------------")
text_parts.append(f" 原始链接: {url}")
text_message = "\n".join(text_parts)
# 准备转发消息节点
nodes = []
# 添加文本信息节点
text_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=text_message
)
nodes.append(text_node)
# 添加封面图片节点(如果有)
if video_info.get('cover'):
try:
cover_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=[
MessageSegment.text("抖音视频封面:\n"),
MessageSegment.image(video_info['cover'])
]
)
nodes.append(cover_node)
except Exception as e:
logger.warning(f"[douyin_parser] 无法添加封面图片: {e}")
# 添加作者头像节点(如果有)
if video_info.get('author_avatar'):
try:
avatar_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=[
MessageSegment.text("作者头像:\n"),
MessageSegment.image(video_info['author_avatar'])
]
)
nodes.append(avatar_node)
except Exception as e:
logger.warning(f"[douyin_parser] 无法添加作者头像: {e}")
# 尝试添加视频直链(单独节点)
video_success = False
try:
if video_info.get('video_url'):
video_url = video_info.get('video_url', '')
# 检查视频类型
if video_info.get('type') == 'video':
video_message = MessageSegment.video(video_url)
video_type_text = "视频直链:"
else: # image类型
video_message = MessageSegment.image(video_url) # 单个图片
video_type_text = "图集首图:"
# 构建视频/图片节点
video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=[
MessageSegment.text(video_type_text + "\n"),
video_message
]
)
nodes.append(video_node)
video_success = True
except Exception as e:
logger.error(f"[douyin_parser] 无法添加视频/图片: {e}")
# 如果无法添加视频,添加提示信息
if not video_success:
no_video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message="视频解析成功,但无法获取直链或播放视频。"
)
nodes.append(no_video_node)
logger.success(f"[douyin_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['desc'][:20]}...")
# 发送合并转发消息
try:
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
await event.bot.send_forwarded_messages(target=event, nodes=nodes)
except Exception as e:
# 如果发送合并转发失败,尝试单独发送文本信息
logger.error(f"[douyin_parser] 发送合并转发失败: {e}")
# 构建替代的简单文本回复,避免电脑端显示问题
simple_reply = f"抖音视频解析成功\n{text_message}\n\n如果无法查看视频内容,请复制原始链接到浏览器打开:{url}"
await event.reply(simple_reply)
# 如果有封面,尝试单独发送
if video_info.get('cover'):
try:
await event.reply(MessageSegment.image(video_info['cover']))
except Exception:
pass
except Exception as e:
logger.error(f"[douyin_parser] 处理抖音链接时发生错误: {e}")
await event.reply("处理抖音链接时发生错误,请稍后再试。")
return

View File

@@ -1,27 +0,0 @@
└ <BrowserType name=chromium executable_path=/root/.cache/ms-playwright/chromium-1200/chrome-linux64/chrome>
File "/usr/local/lib/python3.14/site-packages/playwright/_impl/_connection.py", line 69, in send
return await self._connection.wrap_api_call(
│ │ └ <function Connection.wrap_api_call at 0x7f1802831010>
│ └ <playwright._impl._connection.Connection object at 0x7f18019d2d50>
└ <playwright._impl._connection.Channel object at 0x7f1801b99950>
File "/usr/local/lib/python3.14/site-packages/playwright/_impl/_connection.py", line 559, in wrap_api_call
raise rewrite_error(error, f"{parsed_st['apiName']}: {error}") from None
│ └ {'frames': [{'file': '/app/core/managers/browser_manager.py', 'line': 35, 'column': 0, 'function': 'BrowserManager.initialize...
└ <function rewrite_error at 0x7f18029b1bc0>
playwright._impl._errors.TargetClosedError: BrowserType.launch: Target page, context or browser has been closedBrowser logs:
<launching> /root/.cache/ms-playwright/chromium_headless_shell-1200/chrome-headless-shell-linux64/chrome-headless-shell --disable-field-trial-config --disable-background-networking --disable-background-timer-throttling --disable-backgrounding-occluded-windows --disable-back-forward-cache --disable-breakpad --disable-client-side-phishing-detection --disable-component-extensions-with-background-pages --disable-component-update --no-default-browser-check --disable-default-apps --disable-dev-shm-usage --disable-extensions --disable-features=AcceptCHFrame,AvoidUnnecessaryBeforeUnloadCheckSync,DestroyProfileOnBrowserClose,DialMediaRouteProvider,GlobalMediaControls,HttpsUpgrades,LensOverlay,MediaRouter,PaintHolding,ThirdPartyStoragePartitioning,Translate,AutoDeElevate,RenderDocument,OptimizationHints --enable-features=CDPScreenshotNewSurface --allow-pre-commit-input --disable-hang-monitor --disable-ipc-flooding-protection --disable-popup-blocking --disable-prompt-on-repost --disable-renderer-backgrounding --force-color-profile=srgb --metrics-recording-only --no-first-run --password-store=basic --use-mock-keychain --no-service-autorun --export-tagged-pdf --disable-search-engine-choice-screen --unsafely-disable-devtools-self-xss-warnings --edge-skip-compat-layer-relaunch --enable-automation --disable-infobars --disable-search-engine-choice-screen --disable-sync --headless --hide-scrollbars --mute-audio --blink-settings=primaryHoverType=2,availableHoverTypes=2,primaryPointerType=4,availablePointerTypes=4 --no-sandbox --user-data-dir=/tmp/playwright_chromiumdev_profile-XViexK --remote-debugging-pipe --no-startup-window
<launched> pid=336
[pid=336][err] /root/.cache/ms-playwright/chromium_headless_shell-1200/chrome-headless-shell-linux64/chrome-headless-shell: error while loading shared libraries: libnspr4.so: cannot open shared object file: No such file or directory
Call log:
- <launching> /root/.cache/ms-playwright/chromium_headless_shell-1200/chrome-headless-shell-linux64/chrome-headless-shell --disable-field-trial-config --disable-background-networking --disable-background-timer-throttling --disable-backgrounding-occluded-windows --disable-back-forward-cache --disable-breakpad --disable-client-side-phishing-detection --disable-component-extensions-with-background-pages --disable-component-update --no-default-browser-check --disable-default-apps --disable-dev-shm-usage --disable-extensions --disable-features=AcceptCHFrame,AvoidUnnecessaryBeforeUnloadCheckSync,DestroyProfileOnBrowserClose,DialMediaRouteProvider,GlobalMediaControls,HttpsUpgrades,LensOverlay,MediaRouter,PaintHolding,ThirdPartyStoragePartitioning,Translate,AutoDeElevate,RenderDocument,OptimizationHints --enable-features=CDPScreenshotNewSurface --allow-pre-commit-input --disable-hang-monitor --disable-ipc-flooding-protection --disable-popup-blocking --disable-prompt-on-repost --disable-renderer-backgrounding --force-color-profile=srgb --metrics-recording-only --no-first-run --password-store=basic --use-mock-keychain --no-service-autorun --export-tagged-pdf --disable-search-engine-choice-screen --unsafely-disable-devtools-self-xss-warnings --edge-skip-compat-layer-relaunch --enable-automation --disable-infobars --disable-search-engine-choice-screen --disable-sync --headless --hide-scrollbars --mute-audio --blink-settings=primaryHoverType=2,availableHoverTypes=2,primaryPointerType=4,availablePointerTypes=4 --no-sandbox --user-data-dir=/tmp/playwright_chromiumdev_profile-XViexK --remote-debugging-pipe --no-startup-window
- <launched> pid=336
- [pid=336][err] /root/.cache/ms-playwright/chromium_headless_shell-1200/chrome-headless-shell-linux64/chrome-headless-shell: error while loading shared libraries: libnspr4.so: cannot open shared object file: No such file or directory
- [pid=336] <gracefully close start>
- [pid=336] <kill>
- [pid=336] <will force kill>
- [pid=336] exception while trying to kill process: Error: kill ESRCH
- [pid=336] <process did exit: exitCode=127, signal=null>
- [pid=336] starting temporary directories cleanup
- [pid=336] finished temporary directories cleanup
- [pid=336] <gracefully close end>
2026-01-18 13:26:32.984 | ERROR | core.managers.browser_manager:init_pool:49 - 浏览器初始化失败无法创建页面池2026-01-18 13:26:32.987 | INFO | __main__:main:151 - 已启动插件热重载监控: /app/plugins2026-01-18 13:26:32.987 | INFO | __main__:main:157 - [CodeExecutor] 初始化 Docker 客户端...2026-01-18 13:26:32.989 | ERROR | __main__:main:157 - 无法连接到 Docker 服务,请检查 Docker 是否正在运行: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))2026-01-18 13:26:32.990 | WARNING | __main__:main:167 - [Main] 未启动代码执行 Worker因为 Docker 客户端未初始化或连接失败。2026-01-18 13:26:32.990 | INFO | core.ws:connect:65 - 正在尝试连接至 NapCat: ws://127.0.0.1:30012026-01-18 13:26:32.998 | SUCCESS | core.ws:connect:71 - 连接成功2026-01-18 13:26:33.000 | SUCCESS | core.ws:on_event:139 - Bot 实例初始化完成: self_id=28703925662026-01-18 13:26:33.000 | INFO | core.ws:on_event:145 - 代码执行器已成功注入 Bot 实例。2026-01-18 13:26:35.630 | INFO | core.ws:on_event:160 - [消息] group | 2221577113(DOGSOHA): [CQ:image,summ

View File

@@ -1,8 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
跨平台 Python 模块编译脚本 优化版跨平台 Python 模块编译脚本
将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。 将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。
此版本基于对项目结构的深入分析,包含了更多高频使用的模块。
支持的平台: 支持的平台:
- Windows: 生成 .pyd 文件 - Windows: 生成 .pyd 文件
@@ -22,6 +23,7 @@
2. 需要安装 mypyc: pip install mypyc 2. 需要安装 mypyc: pip install mypyc
3. 编译后的文件是平台相关的,不能跨平台复制 3. 编译后的文件是平台相关的,不能跨平台复制
4. 建议在部署的目标环境上运行此脚本 4. 建议在部署的目标环境上运行此脚本
5. Mypyc 不支持动态特性,如 eval/exec/getattr/setattr 等
""" """
import os import os
import sys import sys
@@ -46,57 +48,53 @@ else:
print(f"不支持的平台: {PLATFORM}") print(f"不支持的平台: {PLATFORM}")
sys.exit(1) sys.exit(1)
# 要编译的模块列表 # 根据项目分析,优化要编译的模块列表
# 注意Mypyc 对动态特性支持有限,只选择计算密集或类型明确的模块 # 这些是项目中使用频率最高的模块,编译后能显著提升性能
MODULES = [ MODULES = [
# 工具模块 # 工具模块 - 高频使用
'core/utils/json_utils.py', # JSON 处理 'core/utils/json_utils.py', # JSON 处理 - 高频使用
'core/utils/executor.py', # 代码执行引擎 'core/utils/executor.py', # 代码执行引擎 - 高频使用
'core/utils/singleton.py', # 单例模式基类 'core/utils/exceptions.py', # 自定义异常 - 基础组件
'core/utils/exceptions.py', # 自定义异常 'core/utils/performance.py', # 性能监控工具 - 重要组件
'core/utils/logger.py', # 日志模块 'core/utils/logger.py', # 日志模块 - 高频使用
'core/utils/singleton.py', # 单例模式 - 基础组件
# 核心管理模块 # 核心管理模块 - 高频使用
'core/managers/command_manager.py', # 指令匹配和分发 # 'core/managers/command_manager.py', # 指令匹配和分发 - 包含动态特性,不适合编译
'core/managers/admin_manager.py', # 管理员管理 # 'core/managers/admin_manager.py', # 管理员管理 - 包含动态特性,不适合编译
'core/managers/permission_manager.py', # 权限管理 # 'core/managers/permission_manager.py', # 权限管理 - 包含动态特性,不适合编译
'core/managers/plugin_manager.py', # 插件管理器 # 'core/managers/plugin_manager.py', # 插件管理器 - 包含动态特性,不适合编译
'core/managers/redis_manager.py', # Redis 管理器 # 'core/managers/redis_manager.py', # Redis 管理器 - 包含动态特性,不适合编译
'core/managers/image_manager.py', # 图片管理器 # 'core/managers/image_manager.py', # 图片管理器 - 包含动态特性,不适合编译
# 核心基础模块 # 核心基础模块 - 高频使用
'core/ws.py', # WebSocket 核心 'core/ws.py', # WebSocket 核心 - 核心通信被10个文件引用
'core/bot.py', # Bot 核心抽象 # 'core/bot.py', # Bot 核心抽象 - 使用多重继承,不适合编译
'core/config_loader.py', # 配置加载 'core/config_loader.py', # 配置加载 - 启动必需被7个文件引用
'core/config_models.py', # 配置模型 # 'core/config_models.py', # 配置模型 - 包含复杂类型定义,不适合编译
'core/permission.py', # 权限枚举 # 'core/permission.py', # 权限枚举 - 包含动态属性,不适合编译
# API 模块 - 注意:这些类会被 Bot 类多继承使用 # 数据模型 - 高频使用
# 因此不适合编译,否则会导致 "multiple bases have instance lay-out conflict" 错误 'models/message.py', # 消息段模型 - 高频消息处理
# 'core/api/base.py', # API 基础类 'models/sender.py', # 发送者模型 - 高频消息处理
# 'core/api/account.py', # 账号相关 API 'models/objects.py', # API 响应数据模型 - 高频数据处理
# 'core/api/friend.py', # 好友相关 API
# 'core/api/group.py', # 群组相关 API
# 'core/api/media.py', # 媒体相关 API
# 'core/api/message.py', # 消息相关 API
# 数据模型(适合编译的高频使用数据类) # 事件处理相关 - 高频使用
'models/message.py', # 消息段模型 'core/handlers/event_handler.py', # 事件处理器 - 核心事件处理
'models/sender.py', # 发送者模型
'models/objects.py', # API 响应数据模型
# 事件处理相关 # 事件模型 - 高频使用但包含dataclass可能有编译问题暂时排除
'core/handlers/event_handler.py', # 事件处理器 # 'models/events/message.py', # 消息事件 - 最高频事件类型
# 'models/events/notice.py', # 通知事件 - 高频事件类型
# 'models/events/request.py', # 请求事件 - 高频事件类型
# 'models/events/meta.py', # 元事件 - 高频事件类型
# 注意:以下文件不适合编译 # 注意:以下文件不适合编译
# - 主程序文件main.py # - 主程序文件main.py
# - 测试文件tests/目录) # - 测试文件tests/目录)
# - 插件文件plugins/目录) # - 插件文件plugins/目录)
# - 编译脚本compile_machine_code.py等 # - 编译脚本compile_machine_code.py等
# - 临时文件scratch_files/目录)
# - 抽象基类models/events/base.py
# - 事件工厂models/events/factory.py
# - 包含复杂动态特性的文件 # - 包含复杂动态特性的文件
# - API 基础类(由于多重继承问题)
] ]
def list_compiled_modules(): def list_compiled_modules():
@@ -110,7 +108,7 @@ def list_compiled_modules():
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True)) compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件 # 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f] compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files: if compiled_files:
for f in sorted(compiled_files): for f in sorted(compiled_files):
@@ -131,7 +129,7 @@ def clean_compiled_files():
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True)) compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件 # 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f] compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files: if compiled_files:
for f in sorted(compiled_files): for f in sorted(compiled_files):
@@ -162,14 +160,22 @@ def compile_module(module_path):
try: try:
# 直接调用 mypyc 命令行工具 # 直接调用 mypyc 命令行工具
# 使用二进制模式捕获输出以避免编码问题
result = subprocess.run( result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path], [sys.executable, '-m', 'mypyc', module_path],
capture_output=True, capture_output=True,
text=True, check=True
check=True,
encoding='utf-8' # 设置正确的编码
) )
# 解码输出时处理可能的编码错误
try:
stdout_text = result.stdout.decode('utf-8', errors='replace')
stderr_text = result.stderr.decode('utf-8', errors='replace')
except AttributeError:
# 如果已经是字符串Python 3.7+),则直接使用
stdout_text = result.stdout
stderr_text = result.stderr
# 获取平台特定的模块名 # 获取平台特定的模块名
platform_module = get_platform_specific_module_name(module_path) platform_module = get_platform_specific_module_name(module_path)
mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}') mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}')
@@ -187,23 +193,32 @@ def compile_module(module_path):
# 如果在 build 目录中,复制到正确位置 # 如果在 build 目录中,复制到正确位置
os.makedirs(os.path.dirname(platform_module), exist_ok=True) os.makedirs(os.path.dirname(platform_module), exist_ok=True)
shutil.copy2(build_module_path, platform_module) shutil.copy2(build_module_path, platform_module)
shutil.copy2(build_mypyc_path, mypyc_platform_module) if os.path.exists(build_mypyc_path):
shutil.copy2(build_mypyc_path, mypyc_platform_module)
print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}") print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}")
return True return True
else: else:
print(f" ✗ 编译失败:找不到编译产物") print(" ✗ 编译失败:找不到编译产物")
if result.stdout: if result.stdout:
print(f" 编译输出:{result.stdout[:500]}...") print(f" 编译输出:{stdout_text[:500]}...")
if result.stderr: if result.stderr:
print(f" 错误信息:{result.stderr[:500]}...") print(f" 错误信息:{stderr_text[:500]}...")
return False return False
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f" ✗ 编译失败,退出码: {e.returncode}") print(f" ✗ 编译失败,退出码: {e.returncode}")
if e.stdout: if hasattr(e, 'stdout') and e.stdout:
print(f" 编译输出:{e.stdout[:500]}...") try:
if e.stderr: stdout_text = e.stdout.decode('utf-8', errors='replace') if isinstance(e.stdout, bytes) else e.stdout
print(f" 错误信息{e.stderr[:500]}...") print(f" 编译输出{stdout_text[:500]}...")
except Exception:
print(f" 编译输出:{str(e.stdout)[:500]}...")
if hasattr(e, 'stderr') and e.stderr:
try:
stderr_text = e.stderr.decode('utf-8', errors='replace') if isinstance(e.stderr, bytes) else e.stderr
print(f" 错误信息:{stderr_text[:500]}...")
except Exception:
print(f" 错误信息:{str(e.stderr)[:500]}...")
return False return False
except Exception as e: except Exception as e:
print(f" ✗ 编译失败,意外错误: {e}") print(f" ✗ 编译失败,意外错误: {e}")
@@ -221,9 +236,20 @@ def should_skip_module(module_path):
if 'from abc import ABC' in content or 'from abc import abstractmethod' in content: if 'from abc import ABC' in content or 'from abc import abstractmethod' in content:
return True, "包含抽象基类,不适合编译" return True, "包含抽象基类,不适合编译"
# 检查是否包含动态特性 # 检查是否包含危险的动态特性
if 'eval(' in content or 'exec(' in content or 'getattr(' in content or 'setattr(' in content: # 注意我们允许基本的动态特性如getattr但对于eval、exec等危险操作仍然阻止
return True, "包含动态特性,不适合编译" if ('eval(' in content or 'exec(' in content or
'compile(' in content):
return True, "包含危险动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if ('__dict__' in content or '__class__' in content or
'__module__' in content or '__bases__' in content):
return True, "包含复杂动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if '.__dict__' in content or '.__class__' in content:
return True, "包含复杂动态特性,不适合编译"
return False, "" return False, ""
except Exception as e: except Exception as e:
@@ -236,29 +262,41 @@ def compile_all_modules():
# 验证模块文件是否存在并检查是否适合编译 # 验证模块文件是否存在并检查是否适合编译
valid_modules = [] valid_modules = []
skipped_modules = []
for module_path in MODULES: for module_path in MODULES:
if os.path.exists(module_path): if os.path.exists(module_path):
should_skip, reason = should_skip_module(module_path) should_skip, reason = should_skip_module(module_path)
if should_skip: if should_skip:
print(f"跳过: {module_path} ({reason})") print(f"跳过: {module_path} ({reason})")
skipped_modules.append((module_path, reason))
else: else:
valid_modules.append(module_path) valid_modules.append(module_path)
else: else:
print(f"警告: 模块 {module_path} 不存在,将被跳过") print(f"警告: 模块 {module_path} 不存在,将被跳过")
print(f"\n有效模块: {len(valid_modules)}, 跳过模块: {len(skipped_modules)}")
if not valid_modules: if not valid_modules:
print("错误: 没有有效的模块可编译") print("错误: 没有有效的模块可编译")
return False return False
# 编译模块 # 编译模块
success_count = 0 success_count = 0
failed_modules = []
for module_path in valid_modules: for module_path in valid_modules:
if compile_module(module_path): if compile_module(module_path):
success_count += 1 success_count += 1
else:
failed_modules.append(module_path)
print(f"\n" + "=" * 60) print("\n" + "=" * 60)
print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功") print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功")
if failed_modules:
print(f"失败模块: {failed_modules}")
if success_count == len(valid_modules): if success_count == len(valid_modules):
print("✓ 所有模块编译成功") print("✓ 所有模块编译成功")
return True return True
@@ -269,13 +307,13 @@ def compile_all_modules():
def main(): def main():
"""主函数""" """主函数"""
# 检查 Python 版本 # 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor == 14): if not (sys.version_info.major == 3 and sys.version_info.minor >= 8):
print("警告: 推荐使用 Python 3.14 以获得最佳性能") print("警告: 推荐使用 Python 3.8+ 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}") print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题") print("继续编译可能导致兼容性问题")
print() print()
parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本') parser = argparse.ArgumentParser(description='优化版跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group() group = parser.add_mutually_exclusive_group()
group.add_argument('--compile', '-c', action='store_true', default=True, group.add_argument('--compile', '-c', action='store_true', default=True,
@@ -301,6 +339,7 @@ def main():
else: else:
compile_all_modules() compile_all_modules()
print("\n使用 --list 选项查看已编译的模块") print("\n使用 --list 选项查看已编译的模块")
print("使用 --clean 选项清理编译文件")
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -66,7 +66,7 @@ def main():
if compile_module(module): if compile_module(module):
success_count += 1 success_count += 1
print(f"\n--- Compilation Summary ---") print("\n--- Compilation Summary ---")
print(f"Total modules: {len(modules)}") print(f"Total modules: {len(modules)}")
print(f"Successfully compiled: {success_count}") print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(modules) - success_count}") print(f"Failed: {len(modules) - success_count}")

View File

@@ -10,11 +10,8 @@ Mypyc 编译脚本
2. 编译后的文件 (.pyd 或 .so) 是平台相关的,不能跨平台复制。 2. 编译后的文件 (.pyd 或 .so) 是平台相关的,不能跨平台复制。
3. 建议在部署的目标环境 (Linux) 上运行此脚本。 3. 建议在部署的目标环境 (Linux) 上运行此脚本。
""" """
from distutils.core import setup
from mypyc.build import mypycify
import os import os
import sys import sys
import glob
import subprocess import subprocess
# 基础模块列表 # 基础模块列表
@@ -102,7 +99,7 @@ for module_path in valid_modules:
print(f" ✓ Compiled successfully (copied from build directory): {pyd_path}") print(f" ✓ Compiled successfully (copied from build directory): {pyd_path}")
success_count += 1 success_count += 1
else: else:
print(f" ✗ Compiled but cannot find pyd file") print(" ✗ Compiled but cannot find pyd file")
print(f" Build output:\n{result.stdout[:500]}...") print(f" Build output:\n{result.stdout[:500]}...")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f" ✗ Compilation failed with exit code {e.returncode}") print(f" ✗ Compilation failed with exit code {e.returncode}")
@@ -110,7 +107,7 @@ for module_path in valid_modules:
except Exception as e: except Exception as e:
print(f" ✗ Unexpected error: {e}") print(f" ✗ Unexpected error: {e}")
print(f"\n--- Compilation Summary ---") print("\n--- Compilation Summary ---")
print(f"Total modules: {len(valid_modules)}") print(f"Total modules: {len(valid_modules)}")
print(f"Successfully compiled: {success_count}") print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(valid_modules) - success_count}") print(f"Failed: {len(valid_modules) - success_count}")

View File

@@ -8,8 +8,6 @@ import time
from core.utils.performance import ( from core.utils.performance import (
timeit, timeit,
profile, profile,
aprofile,
PerformanceStats,
performance_stats performance_stats
) )
@@ -52,7 +50,7 @@ async def main():
print("=" * 80) print("=" * 80)
with profile(enabled=False): # 禁用实际分析以避免输出太多 with profile(enabled=False): # 禁用实际分析以避免输出太多
time.sleep(0.05) await asyncio.sleep(0.05)
print("性能分析上下文管理器测试完成") print("性能分析上下文管理器测试完成")
# 测试4: 性能统计报告 # 测试4: 性能统计报告
@@ -60,7 +58,7 @@ async def main():
print("=" * 80) print("=" * 80)
# 执行多次函数调用 # 执行多次函数调用
for i in range(3): for _ in range(3):
sync_test() sync_test()
await async_test() await async_test()