diff --git a/.gitignore b/.gitignore index ee074a6..72f36b2 100644 --- a/.gitignore +++ b/.gitignore @@ -146,3 +146,5 @@ build/ # Scratch files scratch_files/ +/config.toml +/core/data/TEMP/* \ No newline at end of file diff --git a/compile_machine_code.py b/compile_machine_code.py deleted file mode 100644 index cce551d..0000000 --- a/compile_machine_code.py +++ /dev/null @@ -1,294 +0,0 @@ -#!/usr/bin/env python3 -""" -跨平台 Python 模块编译脚本 - -将核心 Python 模块编译为机器码(.pyd 或 .so)以提升性能。 - -支持的平台: -- Windows: 生成 .pyd 文件 -- Linux: 生成 .so 文件 - -使用方法: - python compile_machine_code.py [options] - -选项: - --compile, -c 编译指定的模块(默认) - --list, -l 列出已编译的模块 - --clean, -k 清理编译生成的文件 - --help, -h 显示帮助信息 - -注意: - 1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC) - 2. 需要安装 mypyc: pip install mypyc - 3. 编译后的文件是平台相关的,不能跨平台复制 - 4. 建议在部署的目标环境上运行此脚本 -""" -import os -import sys -import glob -import subprocess -import shutil -import argparse - -# 检测当前平台 -PLATFORM = sys.platform -if PLATFORM.startswith('win'): - EXTENSION = '.pyd' - BUILD_PREFIX = 'cp314-win_amd64' - BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-314') -elif PLATFORM.startswith('linux'): - EXTENSION = '.so' - BUILD_PREFIX = 'cp314-x86_64-linux-gnu' - BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-314') -else: - print(f"不支持的平台: {PLATFORM}") - sys.exit(1) - -# 要编译的模块列表 -# 注意:Mypyc 对动态特性支持有限,只选择计算密集或类型明确的模块 -MODULES = [ - # 工具模块 - 'core/utils/json_utils.py', # JSON 处理 - 'core/utils/executor.py', # 代码执行引擎 - 'core/utils/singleton.py', # 单例模式基类 - 'core/utils/exceptions.py', # 自定义异常 - 'core/utils/logger.py', # 日志模块 - - # 核心管理模块 - 'core/managers/command_manager.py', # 指令匹配和分发 - 'core/managers/admin_manager.py', # 管理员管理 - 'core/managers/permission_manager.py', # 权限管理 - 'core/managers/plugin_manager.py', # 插件管理器 - 'core/managers/redis_manager.py', # Redis 管理器 - 'core/managers/image_manager.py', # 图片管理器 - - # 核心基础模块 - 'core/ws.py', # WebSocket 核心 - 'core/bot.py', # Bot 核心抽象 - 'core/config_loader.py', # 配置加载 - 'core/config_models.py', # 配置模型 - 'core/permission.py', # 权限枚举 - - # API 模块 - 注意:这些类会被 Bot 类多继承使用 - # 因此不适合编译,否则会导致 "multiple bases have instance lay-out conflict" 错误 - # 'core/api/base.py', # API 基础类 - # 'core/api/account.py', # 账号相关 API - # 'core/api/friend.py', # 好友相关 API - # 'core/api/group.py', # 群组相关 API - # 'core/api/media.py', # 媒体相关 API - # 'core/api/message.py', # 消息相关 API - - # 数据模型(适合编译的高频使用数据类) - 'models/message.py', # 消息段模型 - 'models/sender.py', # 发送者模型 - 'models/objects.py', # API 响应数据模型 - - # 事件处理相关 - 'core/handlers/event_handler.py', # 事件处理器 - - # 注意:以下文件不适合编译 - # - 主程序文件(main.py) - # - 测试文件(tests/目录) - # - 插件文件(plugins/目录) - # - 编译脚本(compile_machine_code.py等) - # - 临时文件(scratch_files/目录) - # - 抽象基类(models/events/base.py) - # - 事件工厂(models/events/factory.py) - # - 包含复杂动态特性的文件 -] - -def list_compiled_modules(): - """列出已编译的模块""" - print(f"\n已编译的 {PLATFORM} 模块:") - print("=" * 50) - - # 查找所有编译后的文件 - compiled_files = [] - for ext in [EXTENSION, f'__mypyc{EXTENSION}']: - compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True)) - - # 过滤掉虚拟环境中的文件 - compiled_files = [f for f in compiled_files if 'venv' not in f] - - if compiled_files: - for f in sorted(compiled_files): - size = os.path.getsize(f) // 1024 # KB - print(f"{f} ({size} KB)") - else: - print(f"未找到已编译的 {EXTENSION} 文件") - - print(f"\n总计: {len(compiled_files)} 个文件") - -def clean_compiled_files(): - """清理编译生成的文件""" - print(f"\n清理编译生成的 {EXTENSION} 文件...") - - # 查找所有编译后的文件 - compiled_files = [] - for ext in [EXTENSION, f'__mypyc{EXTENSION}']: - compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True)) - - # 过滤掉虚拟环境中的文件 - compiled_files = [f for f in compiled_files if 'venv' not in f] - - if compiled_files: - for f in sorted(compiled_files): - try: - os.remove(f) - print(f"已删除: {f}") - except Exception as e: - print(f"删除失败 {f}: {e}") - - # 清理 build 目录 - if os.path.exists('build'): - try: - shutil.rmtree('build') - print("已删除 build 目录") - except Exception as e: - print(f"删除 build 目录失败: {e}") - else: - print(f"没有可清理的 {EXTENSION} 文件") - -def get_platform_specific_module_name(module_path): - """获取平台特定的模块文件名""" - module_name = module_path.replace('.py', '') - return f"{module_name}.{BUILD_PREFIX}{EXTENSION}" - -def compile_module(module_path): - """编译单个模块""" - print(f"\n编译: {module_path}") - - try: - # 直接调用 mypyc 命令行工具 - result = subprocess.run( - [sys.executable, '-m', 'mypyc', module_path], - capture_output=True, - text=True, - check=True - ) - - # 获取平台特定的模块名 - platform_module = get_platform_specific_module_name(module_path) - mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}') - - # 检查编译产物是否在当前目录 - if os.path.exists(platform_module): - print(f" ✓ 编译成功: {platform_module}") - return True - else: - # 检查 build 目录中是否有编译产物 - build_module_path = os.path.join(BUILD_PATH, platform_module) - build_mypyc_path = os.path.join(BUILD_PATH, mypyc_platform_module) - - if os.path.exists(build_module_path): - # 如果在 build 目录中,复制到正确位置 - os.makedirs(os.path.dirname(platform_module), exist_ok=True) - shutil.copy2(build_module_path, platform_module) - shutil.copy2(build_mypyc_path, mypyc_platform_module) - print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}") - return True - else: - print(f" ✗ 编译失败:找不到编译产物") - if result.stdout: - print(f" 编译输出:{result.stdout[:500]}...") - if result.stderr: - print(f" 错误信息:{result.stderr[:500]}...") - return False - - except subprocess.CalledProcessError as e: - print(f" ✗ 编译失败,退出码: {e.returncode}") - if e.stdout: - print(f" 编译输出:{e.stdout[:500]}...") - if e.stderr: - print(f" 错误信息:{e.stderr[:500]}...") - return False - except Exception as e: - print(f" ✗ 编译失败,意外错误: {e}") - return False - -def should_skip_module(module_path): - """检查模块是否应该被跳过编译""" - try: - with open(module_path, 'r', encoding='utf-8') as f: - content = f.read() - - # 检查是否包含抽象基类相关代码 - if 'from abc import ABC' in content or 'from abc import abstractmethod' in content: - return True, "包含抽象基类,不适合编译" - - # 检查是否包含动态特性 - if 'eval(' in content or 'exec(' in content or 'getattr(' in content or 'setattr(' in content: - return True, "包含动态特性,不适合编译" - - return False, "" - except Exception as e: - return True, f"读取文件时出错: {e}" - -def compile_all_modules(): - """编译所有指定的模块""" - print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})") - print("=" * 60) - - # 验证模块文件是否存在并检查是否适合编译 - valid_modules = [] - for module_path in MODULES: - if os.path.exists(module_path): - should_skip, reason = should_skip_module(module_path) - if should_skip: - print(f"跳过: {module_path} ({reason})") - else: - valid_modules.append(module_path) - else: - print(f"警告: 模块 {module_path} 不存在,将被跳过") - - if not valid_modules: - print("错误: 没有有效的模块可编译") - return False - - # 编译模块 - success_count = 0 - for module_path in valid_modules: - if compile_module(module_path): - success_count += 1 - - print(f"\n" + "=" * 60) - print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功") - - if success_count == len(valid_modules): - print("✓ 所有模块编译成功") - return True - else: - print("✗ 部分模块编译失败") - return False - -def main(): - """主函数""" - parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本') - - group = parser.add_mutually_exclusive_group() - group.add_argument('--compile', '-c', action='store_true', default=True, - help='编译指定的模块 (默认)') - group.add_argument('--list', '-l', action='store_true', - help='列出已编译的模块') - group.add_argument('--clean', '-k', action='store_true', - help='清理编译生成的文件') - - args = parser.parse_args() - - # 检查是否安装了 mypyc - try: - import mypyc - except ImportError: - print("错误: 未安装 mypyc,请先安装: pip install mypyc") - sys.exit(1) - - if args.list: - list_compiled_modules() - elif args.clean: - clean_compiled_files() - else: - compile_all_modules() - print("\n使用 --list 选项查看已编译的模块") - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/compile_modules.py b/compile_modules.py deleted file mode 100644 index c8cfc07..0000000 --- a/compile_modules.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python3 -""" -编译模块脚本 - -这个脚本会单独编译每个Python模块,确保每个模块都在正确位置生成独立的.pyd文件。 -""" -import os -import sys -import glob -from mypyc.build import mypycify -from distutils.core import setup - -def compile_module(module_path): - """ - 编译单个模块 - - Args: - module_path: 要编译的Python模块路径 - """ - print(f"\nCompiling {module_path}...") - try: - ext_modules = mypycify([module_path]) - setup(name=f'compiled_{os.path.basename(module_path).replace(".py", "")}', - ext_modules=ext_modules) - return True - except Exception as e: - print(f"Error compiling {module_path}: {e}") - return False - -def main(): - """ - 主函数 - """ - # 要编译的模块列表 - modules = [ - 'core/utils/json_utils.py', # JSON 处理 - 'core/utils/executor.py', # 代码执行引擎 - 'core/managers/command_manager.py', # 指令匹配和分发 - 'core/managers/admin_manager.py', # 管理员管理 - 'core/managers/permission_manager.py', # 权限管理 - 'core/ws.py', # WebSocket 核心 - 'core/managers/plugin_manager.py', # 插件管理器 - 'core/bot.py', # Bot 核心抽象 - 'core/config_loader.py', # 配置加载 - ] - - # 自动添加 events 模型 - event_models = glob.glob('models/events/*.py') - event_models = [m for m in event_models if not m.endswith('__init__.py')] - modules.extend(event_models) - - print(f"Found {len(modules)} modules to compile.") - - success_count = 0 - for module in modules: - if compile_module(module): - success_count += 1 - - print(f"\n--- Compilation Summary ---") - print(f"Total modules: {len(modules)}") - print(f"Successfully compiled: {success_count}") - print(f"Failed: {len(modules) - success_count}") - -if __name__ == '__main__': - main() diff --git a/config.toml b/config.toml deleted file mode 100644 index 2a0d4e4..0000000 --- a/config.toml +++ /dev/null @@ -1,25 +0,0 @@ -[napcat_ws] -uri = "ws://114.66.58.203:3001" -token = "&d_VTfksE%}ul?_Y" -reconnect_interval = 5 - -[bot] -command = ["/"] -ignore_self_message = true #是否忽略自身消息 -permission_denied_message = "权限不足,需要 {permission_name} 权限" - -[redis] -host = "114.66.58.203" -port = 1931 -db = 0 -password = "redis_5dxyJG" - -[docker] -base_url = "tcp://dockertest.k2cro4.my:2375" -sandbox_image = "python-sandbox:latest" -timeout = 10 -concurrency_limit = 5 -tls_verify = true -ca_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/ca.crt" -client_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-cert.pem" -client_key_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-key.pem" diff --git a/core/data/temp/help_menu.png b/core/data/temp/help_menu.png deleted file mode 100644 index f5e82c0..0000000 Binary files a/core/data/temp/help_menu.png and /dev/null differ diff --git a/core/utils/__init__.py b/core/utils/__init__.py index e69de29..6d8b745 100644 --- a/core/utils/__init__.py +++ b/core/utils/__init__.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +""" +工具函数包 +""" + +# 导出核心工具 +from .logger import logger +from .exceptions import * +from .json_utils import * +from .singleton import singleton +from .executor import run_in_thread_pool, initialize_executor +from .performance import ( + timeit, + profile, + aprofile, + memory_profile, + memory_profile_decorator, + performance_monitor, + PerformanceStats, + performance_stats, + global_stats +) + +__all__ = [ + 'logger', + 'timeit', + 'profile', + 'aprofile', + 'memory_profile', + 'memory_profile_decorator', + 'performance_monitor', + 'PerformanceStats', + 'performance_stats', + 'global_stats', + 'run_in_thread_pool', + 'initialize_executor', + 'singleton' +] diff --git a/core/utils/performance.py b/core/utils/performance.py new file mode 100644 index 0000000..7e13b88 --- /dev/null +++ b/core/utils/performance.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +""" +性能分析工具模块 + +提供同步和异步函数的性能分析装饰器、上下文管理器和统计工具。 + +主要功能: +1. 函数执行时间分析(支持同步和异步) +2. 内存使用分析 +3. 性能统计和报告生成 +4. 低开销的生产环境监控 +""" + +import time +import functools +import logging +from typing import Dict, Any, Callable, Optional +import inspect + +# 尝试导入性能分析库 +try: + from pyinstrument import Profiler + from pyinstrument.renderers import HTMLRenderer + PYINSTRUMENT_AVAILABLE = True +except ImportError: + PYINSTRUMENT_AVAILABLE = False + +# 尝试导入内存分析库 +try: + from memory_profiler import memory_usage + MEMORY_PROFILER_AVAILABLE = True +except ImportError: + MEMORY_PROFILER_AVAILABLE = False + +from .logger import logger + + +class PerformanceStats: + """ + 性能统计工具类 + 用于收集和报告函数执行的性能指标 + """ + def __init__(self): + self.stats: Dict[str, Dict[str, Any]] = {} + + def record(self, func_name: str, duration: float, memory_used: Optional[float] = None): + """ + 记录函数执行的性能数据 + + Args: + func_name: 函数名称 + duration: 执行时间(秒) + memory_used: 使用的内存(MB),可选 + """ + if func_name not in self.stats: + self.stats[func_name] = { + "count": 0, + "total_time": 0.0, + "avg_time": 0.0, + "min_time": float('inf'), + "max_time": 0.0, + "total_memory": 0.0, + "avg_memory": 0.0 + } + + stat = self.stats[func_name] + stat["count"] += 1 + stat["total_time"] += duration + stat["avg_time"] = stat["total_time"] / stat["count"] + stat["min_time"] = min(stat["min_time"], duration) + stat["max_time"] = max(stat["max_time"], duration) + + if memory_used is not None: + stat["total_memory"] += memory_used + stat["avg_memory"] = stat["total_memory"] / stat["count"] + + def report(self) -> str: + """ + 生成性能统计报告 + + Returns: + 格式化的性能统计报告字符串 + """ + if not self.stats: + return "暂无性能统计数据" + + report = ["\n=== 性能统计报告 ===\n"] + report.append(f"{'函数名':<40} {'调用次数':<10} {'平均时间(ms)':<15} {'最长时间(ms)':<15} {'内存(MB)':<10}") + report.append("-" * 100) + + for func_name, stat in sorted(self.stats.items(), key=lambda x: x[1]["total_time"], reverse=True): + memory_str = f"{stat['avg_memory']:.2f}" if stat['avg_memory'] > 0 else "-" + report.append( + f"{func_name:<40} {stat['count']:<10} {stat['avg_time']*1000:<15.2f} " + f"{stat['max_time']*1000:<15.2f} {memory_str:<10}" + ) + + report.append("=" * 100) + return "\n".join(report) + + def reset(self): + """ + 重置性能统计数据 + """ + self.stats.clear() + + +# 创建全局性能统计实例 +performance_stats = PerformanceStats() + + +def timeit(func: Callable = None, *, log_level: int = logging.INFO, collect_stats: bool = True): + """ + 函数执行时间分析装饰器(支持同步和异步) + + Args: + func: 要装饰的函数 + log_level: 日志级别 + collect_stats: 是否收集到全局统计中 + + Returns: + 装饰后的函数 + """ + def decorator(func: Callable) -> Callable: + func_name = func.__qualname__ + is_coroutine = inspect.iscoroutinefunction(func) + + if is_coroutine: + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + start_time = time.perf_counter() + try: + result = await func(*args, **kwargs) + finally: + end_time = time.perf_counter() + duration = end_time - start_time + + if collect_stats: + performance_stats.record(func_name, duration) + + logger.log(log_level, f"[性能] {func_name} 执行时间: {duration*1000:.2f} ms") + + return result + + return async_wrapper + else: + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + start_time = time.perf_counter() + try: + result = func(*args, **kwargs) + finally: + end_time = time.perf_counter() + duration = end_time - start_time + + if collect_stats: + performance_stats.record(func_name, duration) + + logger.log(log_level, f"[性能] {func_name} 执行时间: {duration*1000:.2f} ms") + + return result + + return sync_wrapper + + if func is None: + return decorator + return decorator(func) + + +class profile: + """ + 性能分析上下文管理器 + 使用 pyinstrument 进行详细的性能分析 + """ + def __init__(self, enabled: bool = True, output_file: Optional[str] = None): + """ + Args: + enabled: 是否启用分析 + output_file: 分析结果输出文件路径(HTML格式) + """ + self.enabled = enabled + self.output_file = output_file + self.profiler = None + + def __enter__(self): + if self.enabled and PYINSTRUMENT_AVAILABLE: + self.profiler = Profiler() + self.profiler.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.enabled and PYINSTRUMENT_AVAILABLE and self.profiler: + self.profiler.stop() + + # 输出到日志 + logger.info(f"[性能分析] {self.profiler.print()}") + + # 如果指定了输出文件,保存为HTML + if self.output_file: + try: + html = self.profiler.render(HTMLRenderer()) + with open(self.output_file, 'w', encoding='utf-8') as f: + f.write(html) + logger.info(f"[性能分析] 报告已保存到: {self.output_file}") + except Exception as e: + logger.error(f"[性能分析] 保存报告失败: {e}") + + +async def aprofile(func: Callable, *args, **kwargs): + """ + 异步函数性能分析 + + Args: + func: 要分析的异步函数 + *args: 函数参数 + **kwargs: 函数关键字参数 + + Returns: + 函数执行结果 + """ + if not PYINSTRUMENT_AVAILABLE: + logger.warning("[性能分析] pyinstrument 未安装,无法进行详细分析") + return await func(*args, **kwargs) + + profiler = Profiler() + profiler.start() + + try: + result = await func(*args, **kwargs) + finally: + profiler.stop() + logger.info(f"[性能分析] {profiler.print()}") + + return result + + +class memory_profile: + """ + 内存分析上下文管理器 + """ + def __init__(self, interval: float = 0.1, enabled: bool = True): + """ + Args: + interval: 内存采样间隔(秒) + enabled: 是否启用内存分析 + """ + self.interval = interval + self.enabled = enabled + self.memory_start = 0.0 + self.memory_end = 0.0 + + def __enter__(self): + if self.enabled and MEMORY_PROFILER_AVAILABLE: + self.memory_start = memory_usage()[0] + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.enabled and MEMORY_PROFILER_AVAILABLE: + self.memory_end = memory_usage()[0] + memory_used = self.memory_end - self.memory_start + logger.info(f"[内存分析] 使用内存: {memory_used:.2f} MB") + + +def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1): + """ + 内存分析装饰器(支持同步函数) + + Args: + func: 要装饰的函数 + interval: 内存采样间隔 + + Returns: + 装饰后的函数 + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs): + if not MEMORY_PROFILER_AVAILABLE: + return func(*args, **kwargs) + + mem_usage = memory_usage( + (func, args, kwargs), + interval=interval, + timeout=None, + include_children=False + ) + + max_memory = max(mem_usage) + logger.info(f"[内存分析] {func.__qualname__} 最大内存使用: {max_memory:.2f} MB") + return func(*args, **kwargs) + + return wrapper + + if func is None: + return decorator + return decorator(func) + + +def performance_monitor(func: Callable = None, *, threshold: float = 1.0): + """ + 性能监控装饰器 + 仅当函数执行时间超过阈值时记录日志 + 适合生产环境使用 + + Args: + func: 要装饰的函数 + threshold: 时间阈值(秒) + + Returns: + 装饰后的函数 + """ + def decorator(func: Callable) -> Callable: + func_name = func.__qualname__ + is_coroutine = inspect.iscoroutinefunction(func) + + if is_coroutine: + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + start_time = time.perf_counter() + result = await func(*args, **kwargs) + end_time = time.perf_counter() + duration = end_time - start_time + + if duration > threshold: + logger.warning(f"[性能监控] {func_name} 执行时间过长: {duration*1000:.2f} ms (阈值: {threshold*1000:.2f} ms)") + + return result + + return async_wrapper + else: + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + start_time = time.perf_counter() + result = func(*args, **kwargs) + end_time = time.perf_counter() + duration = end_time - start_time + + if duration > threshold: + logger.warning(f"[性能监控] {func_name} 执行时间过长: {duration*1000:.2f} ms (阈值: {threshold*1000:.2f} ms)") + + return result + + return sync_wrapper + + if func is None: + return decorator + return decorator(func) + + +# 全局实例 +global_stats = PerformanceStats() + + +__all__ = [ + 'timeit', + 'profile', + 'aprofile', + 'memory_profile', + 'memory_profile_decorator', + 'performance_monitor', + 'PerformanceStats', + 'performance_stats', + 'global_stats' +] diff --git a/core/utils/singleton.py b/core/utils/singleton.py index 94a7c93..27604e5 100644 --- a/core/utils/singleton.py +++ b/core/utils/singleton.py @@ -1,10 +1,13 @@ """ 通用单例模式基类 """ -from typing import Any, Optional, Type, TypeVar +from typing import Any, Dict, Optional, Type, TypeVar T = TypeVar('T') +# 存储每个类的实例 +_instance_store: Dict[Type, Any] = {} + class Singleton: """ 一个通用的单例基类 @@ -13,7 +16,6 @@ class Singleton: 它通过重写 __new__ 方法来确保每个类只有一个实例。 同时,它处理了重复初始化的问题,确保 __init__ 方法只在第一次实例化时被调用。 """ - _instance: Optional[Any] = None _initialized: bool = False def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T: @@ -27,9 +29,10 @@ class Singleton: Returns: T: 单例实例 """ - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance + # 使用全局字典存储实例,避免类型检查问题 + if cls not in _instance_store: + _instance_store[cls] = super().__new__(cls) + return _instance_store[cls] def __init__(self) -> None: """ @@ -38,3 +41,38 @@ class Singleton: if self._initialized: return self._initialized = True + + +def singleton(cls: Type[T]) -> Type[T]: + """ + 单例装饰器 + + 将普通类转换为单例类,确保整个应用程序中只有一个实例。 + + Args: + cls: 要转换为单例的类 + + Returns: + Type[T]: 单例类 + """ + # 为每个装饰的类创建一个实例存储 + class_instance: Optional[T] = None + + # 创建一个新的类,继承自原始类 + class SingletonClass(cls): + """单例包装类""" + + def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T: + """创建或返回现有的实例""" + nonlocal class_instance + if class_instance is None: + # 使用super()调用原始类的__new__方法 + class_instance = cls(*args, **kwargs) + return class_instance + + # 复制类的元数据 + SingletonClass.__name__ = cls.__name__ + SingletonClass.__doc__ = cls.__doc__ + SingletonClass.__module__ = cls.__module__ + + return SingletonClass diff --git a/export_requirements.py b/export_requirements.py deleted file mode 100644 index a3bb109..0000000 --- a/export_requirements.py +++ /dev/null @@ -1,8 +0,0 @@ -import subprocess - -# 运行pip freeze命令获取所有依赖 -result = subprocess.run(['pip', 'freeze'], capture_output=True, text=True) - -# 将输出写入requirements.txt文件 -with open('requirements.txt', 'w', encoding='utf-8') as f: - f.write(result.stdout) \ No newline at end of file diff --git a/main.py b/main.py index e0aff59..28998d0 100644 --- a/main.py +++ b/main.py @@ -10,6 +10,59 @@ import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler +# 初始化日志系统,必须在其他 core 模块导入之前执行 +from core.utils.logger import logger + +# 核心模块导入 +from core.managers.admin_manager import admin_manager +from core.ws import WS +from core.managers import plugin_manager, matcher +from core.managers.redis_manager import redis_manager +from core.managers.browser_manager import browser_manager +from core.utils.executor import run_in_thread_pool, initialize_executor +from core.config_loader import global_config as config + +# 检查 JIT 编译状态 +def check_jit_status(): + """ + 检查 Python JIT 编译状态 + + 该函数用于检测当前 Python 解释器是否启用了 JIT 编译功能, + 并打印相关信息,帮助用户了解运行环境的性能优化状态。 + """ + print("\n=== Python JIT 编译状态检查 ===") + + # 检查解释器信息 + print(f"Python 版本: {sys.version}") + print(f"解释器路径: {sys.executable}") + + # 检查优化级别 + print(f"优化级别 (-O): {sys.flags.optimize}") + + # 检查 JIT 相关模块和功能 + if sys.version_info >= (3, 10): + try: + # 对于 CPython 3.10+,检查是否启用了 JIT + import _opcode + if hasattr(_opcode, 'jit'): + print("JIT 状态: 已启用 (_opcode.jit)") + else: + print("JIT 状态: 未启用 (_opcode.jit 不可用)") + except ImportError: + print("JIT 状态: 未启用 (_opcode 模块不可用)") + else: + print("JIT 状态: 不可用 (需要 Python 3.10+)") + + # 检查是否使用了 PyPy + if hasattr(sys, 'pypy_version_info'): + print(f"PyPy 版本: {sys.pypy_version_info}") + print("JIT 状态: 已启用 (PyPy 内置 JIT)") + + print("==============================\n") + +# 执行 JIT 状态检查 +check_jit_status() + # 尝试使用高性能事件循环 try: if sys.platform == 'win32': @@ -25,17 +78,6 @@ try: except ImportError: print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环") -# 初始化日志系统,必须在其他 core 模块导入之前执行 -from core.utils.logger import logger - -from core.managers.admin_manager import admin_manager -from core.ws import WS -from core.managers import plugin_manager, matcher -from core.managers.redis_manager import redis_manager -from core.managers.browser_manager import browser_manager -from core.utils.executor import run_in_thread_pool, initialize_executor -from core.config_loader import global_config as config - # 将项目根目录添加到 sys.path ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, ROOT_DIR) diff --git a/models/events/message.py b/models/events/message.py index 421c843..e84381a 100644 --- a/models/events/message.py +++ b/models/events/message.py @@ -4,7 +4,7 @@ 定义了消息相关的事件类,包括 MessageEvent, PrivateMessageEvent, GroupMessageEvent。 """ from dataclasses import dataclass, field -from typing import List, Optional, Union, ClassVar +from typing import List, Optional, Union from core.permission import Permission from models.message import MessageSegment @@ -27,17 +27,19 @@ class Anonymous: """匿名用户 flag""" +# 权限级别常量,用于装饰器参数 +# 定义在类外部,避免 dataclass 参数顺序问题 +MESSAGE_EVENT_ADMIN = Permission.ADMIN +MESSAGE_EVENT_OP = Permission.OP +MESSAGE_EVENT_USER = Permission.USER + + @dataclass(slots=True) class MessageEvent(OneBotEvent): """ 消息事件基类 """ - # 权限级别常量,用于装饰器参数 - ADMIN: ClassVar[Permission] = Permission.ADMIN - OP: ClassVar[Permission] = Permission.OP - USER: ClassVar[Permission] = Permission.USER - message_type: str """消息类型: private (私聊), group (群聊)""" @@ -70,6 +72,21 @@ class MessageEvent(OneBotEvent): def post_type(self) -> str: return EventType.MESSAGE + @property + def ADMIN(self) -> Permission: + """权限级别常量,用于装饰器参数""" + return MESSAGE_EVENT_ADMIN + + @property + def OP(self) -> Permission: + """权限级别常量,用于装饰器参数""" + return MESSAGE_EVENT_OP + + @property + def USER(self) -> Permission: + """权限级别常量,用于装饰器参数""" + return MESSAGE_EVENT_USER + async def reply(self, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False): """ 回复消息(抽象方法,由子类实现) @@ -119,4 +136,4 @@ class GroupMessageEvent(MessageEvent): """ await self.bot.send_group_msg( group_id=self.group_id, message=message, auto_escape=auto_escape - ) + ) \ No newline at end of file diff --git a/performance_config_example.py b/performance_config_example.py new file mode 100644 index 0000000..f9a11f8 --- /dev/null +++ b/performance_config_example.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +""" +性能分析配置示例 + +展示如何在项目中配置和使用性能分析功能。 +""" + +# 配置性能分析的使用方式 +PERFORMANCE_CONFIG = { + # 全局性能分析开关 + 'enabled': True, + + # 详细性能分析开关(使用 pyinstrument) + 'detailed': False, + + # 内存分析开关 + 'memory': False, + + # 性能监控阈值(秒) + 'threshold': 0.5, + + # 性能报告输出文件 + 'output_file': 'performance_report.html', + + # 要监控的核心组件列表 + 'monitored_components': [ + 'core.ws.WS', + 'core.managers.plugin_manager', + 'core.managers.browser_manager', + 'core.utils.executor.CodeExecutor', + 'core.handlers.event_handler', + ] +} + + +def get_performance_config(): + """ + 获取性能分析配置 + + Returns: + dict: 性能分析配置 + """ + import os + import json + + # 从环境变量加载配置 + config = PERFORMANCE_CONFIG.copy() + + if os.environ.get('PERFORMANCE_PROFILE'): + config['detailed'] = os.environ['PERFORMANCE_PROFILE'] == '1' + + if os.environ.get('PERFORMANCE_MEMORY'): + config['memory'] = os.environ['PERFORMANCE_MEMORY'] == '1' + + if os.environ.get('PERFORMANCE_THRESHOLD'): + try: + config['threshold'] = float(os.environ['PERFORMANCE_THRESHOLD']) + except ValueError: + pass + + if os.environ.get('PERFORMANCE_OUTPUT'): + config['output_file'] = os.environ['PERFORMANCE_OUTPUT'] + + if os.environ.get('PERFORMANCE_STATS'): + config['enabled'] = os.environ['PERFORMANCE_STATS'] == '1' + + return config + + +if __name__ == "__main__": + # 打印当前配置 + print("当前性能分析配置:") + print("=" * 50) + config = get_performance_config() + for key, value in config.items(): + print(f"{key}: {value}") diff --git a/plugins/auto_approve.py b/plugins/auto_approve.py index f92254e..105abdf 100644 --- a/plugins/auto_approve.py +++ b/plugins/auto_approve.py @@ -50,4 +50,4 @@ async def handle_group_request(bot: Bot, event: GroupRequestEvent): ) print(f"[自动同意] 已同意加入群聊 {event.group_id} (邀请人: {event.user_id})") except Exception as e: - print(f"[自动同意] 同意群聊邀请失败: {e}") + print(f"[自动同意] 同意群聊邀请失败: {e}") \ No newline at end of file diff --git a/plugins/bili_parser.py b/plugins/bili_parser.py index af37675..5ea5003 100644 --- a/plugins/bili_parser.py +++ b/plugins/bili_parser.py @@ -30,7 +30,7 @@ HEADERS = { # 全局共享的 ClientSession _session: Optional[aiohttp.ClientSession] = None -async def get_session() -> aiohttp.ClientSession: +def get_session() -> aiohttp.ClientSession: global _session if _session is None or _session.closed: _session = aiohttp.ClientSession(headers=HEADERS) @@ -55,7 +55,7 @@ def format_duration(seconds: int) -> str: async def get_real_url(short_url: str) -> Optional[str]: try: - session = await get_session() + session = get_session() async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response: if response.status == 302: return response.headers.get('Location') @@ -65,22 +65,71 @@ async def get_real_url(short_url: str) -> Optional[str]: async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: try: - session = await get_session() - async with session.get(video_url, headers=HEADERS, timeout=5) as response: + # 清理URL,去掉不必要的查询参数,只保留基本的视频URL + clean_url = video_url.split('?')[0] + if '#/' in clean_url: + clean_url = clean_url.split('#/')[0] + + session = get_session() + async with session.get(clean_url, headers=HEADERS, timeout=5) as response: response.raise_for_status() text = await response.text() soup = BeautifulSoup(text, 'html.parser') + # 尝试多种方式获取视频数据 + # 方式1: 尝试获取 __INITIAL_STATE__ script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) if not script_tag or not script_tag.string: + # 方式2: 尝试获取 __PLAYINFO__ + script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__')) + + if not script_tag or not script_tag.string: + # 方式3: 尝试获取页面标题和其他信息 + title_tag = soup.find('title') + if title_tag: + title = title_tag.get_text().strip() + # 提取BV号 + bv_match = re.search(r'(BV\w{10})', clean_url) + bvid = bv_match.group(1) if bv_match else '未知BV号' + + return { + "title": title.replace('_哔哩哔哩_bilibili', '').strip(), + "bvid": bvid, + "duration": 0, + "cover_url": '', + "play": 0, + "like": 0, + "coin": 0, + "favorite": 0, + "share": 0, + "owner_name": '未知UP主', + "owner_avatar": '', + "followers": 0, + } return None - match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^\}]*\});', script_tag.string) + # 原始解析逻辑 + match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string) + if not match: + # 尝试另一种正则表达式 + match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL) + if not match: return None json_str = match.group(1) - data = json.loads(json_str) + # 清理JSON字符串中的潜在问题字符 + json_str = json_str.strip().rstrip(';') + + try: + data = json.loads(json_str) + except json.JSONDecodeError: + # 如果直接解析失败,尝试清理JSON字符串 + # 移除可能的注释或无效字符 + cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号 + cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释 + cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释 + data = json.loads(cleaned_json) video_data = data.get('videoData', {}) up_data = data.get('upData', {}) @@ -116,6 +165,10 @@ async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: logger.error(f"解析视频信息失败: {e}") + logger.debug(f"失败的URL: {video_url}") + except Exception as e: + logger.error(f"解析视频信息时发生未知错误: {e}") + logger.debug(f"失败的URL: {video_url}") return None @@ -212,24 +265,32 @@ async def process_bili_link(event: MessageEvent, url: str): :param event: 消息事件对象 :param url: 待处理的B站链接 """ - if "b23.tv" in url: - real_url = await get_real_url(url) - if not real_url: - logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。") - await event.reply("无法解析B站短链接。") - return - else: - real_url = url.split('?')[0] + try: + if "b23.tv" in url: + real_url = await get_real_url(url) + if not real_url: + logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。") + await event.reply("无法解析B站短链接。") + return + else: + # 清理URL,移除复杂查询参数,只保留基本的视频URL + real_url = url.split('?')[0] + if '#/' in real_url: + real_url = real_url.split('#/')[0] - video_info = await parse_video_info(real_url) - if not video_info: - logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") - await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") + video_info = await parse_video_info(real_url) + if not video_info: + logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") + await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") + return + except Exception as e: + logger.error(f"[bili_parser] 处理B站链接时发生错误: {e}") + await event.reply("处理B站链接时发生错误,请稍后再试。") return # 检查视频时长 video_message: Union[str, MessageSegment] - if video_info['duration'] > 300: # 5分钟 = 300秒 + if video_info['duration'] > 1200: # 5分钟 = 300秒 video_message = "视频时长超过5分钟,不进行解析。" else: direct_url = await get_direct_video_url(real_url) diff --git a/plugins/douyin_parser.py b/plugins/douyin_parser.py new file mode 100644 index 0000000..5fe6a88 --- /dev/null +++ b/plugins/douyin_parser.py @@ -0,0 +1,391 @@ +# -*- coding: utf-8 -*- +import re +import json +import aiohttp +from typing import Optional, Dict, Any, Union +from cachetools import TTLCache + +from core.utils.logger import logger +from core.managers.command_manager import matcher +from models import MessageEvent, MessageSegment + +# 创建一个TTL缓存,最大容量100,缓存时间10秒 +processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) + +# 插件元数据 +__plugin_meta__ = { + "name": "douyin_parser", + "description": "自动解析抖音分享链接,提取视频信息和直链。", + "usage": "(自动触发)当检测到抖音分享链接时,自动发送视频信息。", +} + +# 常量定义 +DOUYIN_NICKNAME = "抖音视频解析" + +HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', + 'Accept-Encoding': 'gzip, deflate, br', # 重新启用br编码支持 + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1' +} + +# 全局共享的 ClientSession +_session: Optional[aiohttp.ClientSession] = None + +async def get_session() -> aiohttp.ClientSession: + global _session + if _session is None or _session.closed: + _session = aiohttp.ClientSession(headers=HEADERS) + return _session + + +def format_count(num: Union[int, str]) -> str: + try: + n = int(num) + if n < 10000: + return str(n) + return f"{n / 10000:.1f}万" + except (ValueError, TypeError): + return str(num) + + +DOUYIN_URL_PATTERN = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线 +DOUYIN_SHORT_PATTERN = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线 + + +def extract_url_from_json_segments(segments): + """ + 从消息的JSON段中提取抖音链接 + :param segments: 消息段列表 + :return: 提取到的URL或None + """ + for segment in segments: + if segment.type == "json": + logger.info(f"[douyin_parser] 检测到JSON CQ码: {segment.data}") + try: + json_data = json.loads(segment.data.get("data", "{}")) + # 检查是否是抖音分享卡片 + meta = json_data.get("meta", {}) + if "detail_1" in meta: + detail = meta["detail_1"] + if "qqdocurl" in detail: + url = detail["qqdocurl"] + if "douyin.com" in url or "iesdouyin.com" in url: + logger.success(f"[douyin_parser] 成功从JSON卡片中提取到抖音链接: {url}") + return url + except (json.JSONDecodeError, KeyError) as e: + logger.error(f"[douyin_parser] 解析JSON失败: {e}") + continue + return None + + +def extract_url_from_text_segments(segments): + """ + 从消息的文本段中提取抖音链接 + :param segments: 消息段列表 + :return: 提取到的URL或None + """ + for segment in segments: + if segment.type == "text": + text_content = segment.data.get("text", "") + # 查找抖音链接 + match = DOUYIN_URL_PATTERN.search(text_content) + if match: + extracted_url = match.group(0) + logger.success(f"[douyin_parser] 成功从文本中提取到抖音链接: {extracted_url}") + return extracted_url + # 也检查是否有v.douyin.com格式的链接 + short_match = DOUYIN_SHORT_PATTERN.search(text_content) + if short_match: + extracted_url = short_match.group(0) + logger.success(f"[douyin_parser] 成功从文本中提取到抖音短链接: {extracted_url}") + return extracted_url + return None + + +@matcher.on_message() +async def handle_douyin_share(event: MessageEvent): + """ + 处理消息,检测抖音分享链接(JSON卡片或文本链接)并进行解析。 + :param event: 消息事件对象 + """ + # 消息去重 + if event.message_id in processed_messages: + return + processed_messages[event.message_id] = True + + # 忽略机器人自己发送的消息,防止无限循环 + if event.user_id == event.self_id: + return + + # 1. 优先解析JSON卡片中的链接 + url_to_process = extract_url_from_json_segments(event.message) + + # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 + if not url_to_process: + url_to_process = extract_url_from_text_segments(event.message) + + # 3. 如果找到了抖音链接,则进行处理 + if url_to_process: + await process_douyin_link(event, url_to_process) + + +async def get_real_url(short_url: str) -> Optional[str]: + """ + 获取抖音短链接的真实URL + :param short_url: 抖音短链接 + :return: 真实URL或None + """ + try: + # 首先尝试获取重定向后的URL + async with aiohttp.ClientSession() as session: + # 添加更多头部信息模拟移动端访问 + mobile_headers = HEADERS.copy() # 使用更新后的完整请求头 + mobile_headers.update({ + 'Sec-Fetch-Dest': 'document', + 'Sec-Fetch-Mode': 'navigate', + 'Sec-Fetch-Site': 'none', + 'Cache-Control': 'max-age=0', + # 模拟移动设备的额外头部 + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1', + 'X-Requested-With': 'XMLHttpRequest', + 'Referer': 'https://www.douyin.com/' + }) + + async with session.get(short_url, headers=mobile_headers, allow_redirects=True, timeout=10) as response: + redirected_url = str(response.url) + + # 检查重定向后的URL是否包含视频ID + # 抖音视频页通常包含 aweme_id 或 sec_uid 参数 + if 'video/' in redirected_url or '/note/' in redirected_url: + logger.info(f"[douyin_parser] 重定向后的视频URL: {redirected_url}") + return redirected_url + elif 'share_item' in redirected_url: + # 如果URL中有share_item参数,尝试从中提取视频信息 + logger.info(f"[douyin_parser] 重定向后的分享URL: {redirected_url}") + return redirected_url + else: + # 如果重定向到了主页或其他非视频页面,尝试从响应中提取信息 + logger.warning(f"[douyin_parser] 重定向到了非预期页面: {redirected_url}") + return redirected_url + + except Exception as e: + logger.error(f"[douyin_parser] 获取真实URL失败: {e}") + return None + + +async def parse_douyin_video(video_url: str) -> Optional[Dict[str, Any]]: + """ + 解析抖音视频信息 + :param video_url: 抖音视频链接 + :return: 视频信息字典或None + """ + try: + # 使用新的第三方API解析抖音视频 + api_url = f"http://api.xhus.cn/api/douyin?url={video_url}" + + session = await get_session() + async with session.get(api_url, headers=HEADERS, timeout=10) as response: + if response.status != 200: + logger.error(f"[douyin_parser] API请求失败,状态码: {response.status}") + return None + + response_data = await response.json() + + if not isinstance(response_data, dict): + logger.error(f"[douyin_parser] API返回格式错误: {response_data}") + return None + + if response_data.get("code") != 200: + logger.error(f"[douyin_parser] API返回错误: {response_data}") + return None + + data = response_data.get("data", {}) + if not data: + logger.error("[douyin_parser] API返回数据为空") + return None + + # 新API的响应格式转换 + return { + "type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image", + "video_url": data.get("url", ""), # 核心字段:视频播放地址 + "video_url_HQ": data.get("url", ""), # 新API没有HQ字段,使用同一个地址 + "nickname": data.get("author", "未知作者"), + "desc": data.get("title", "无描述"), + "aweme_id": data.get("uid", ""), + "like": data.get("like", 0), + "cover": data.get("cover", ""), + "time": data.get("time", 0), + "author_avatar": data.get("avatar", ""), + "music": data.get("music", {}), + } + except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: + logger.error(f"[douyin_parser] 解析抖音视频信息失败: {e}") + logger.debug(f"失败的URL: {video_url}") + except Exception as e: + logger.error(f"[douyin_parser] 解析抖音视频时发生未知错误: {e}") + logger.debug(f"失败的URL: {video_url}") + + return None + + +async def process_douyin_link(event: MessageEvent, url: str): + """ + 处理抖音链接,获取信息并回复 + :param event: 消息事件对象 + :param url: 待处理的抖音链接 + """ + try: + # 直接将原始链接传递给API,不需要获取真实URL + video_info = await parse_douyin_video(url) + if not video_info: + logger.error(f"[douyin_parser] 无法从 {url} 解析视频信息。") + await event.reply("无法获取视频信息,可能是抖音接口变动或视频不存在。") + return + + # 构建回复消息,包含原分享中的文本内容(如果有) + original_text = "" + for segment in event.message: + if segment.type == "text": + text_content = segment.data.get("text", "") + # 提取除了链接以外的文本内容 + # 移除链接和复制提示 + cleaned_text = re.sub(DOUYIN_URL_PATTERN, '', text_content) + cleaned_text = re.sub(DOUYIN_SHORT_PATTERN, '', cleaned_text) + cleaned_text = re.sub(r'复制此链接,打开Dou音搜索,直接观看视频!', '', cleaned_text) + cleaned_text = cleaned_text.strip() + if cleaned_text: + original_text = cleaned_text + break + + # 构建回复消息 + text_parts = ["抖音视频解析"] + text_parts.append("--------------------") + + if original_text: + text_parts.append(f" 分享内容: {original_text}") + text_parts.append("--------------------") + + text_parts.append(f" 作者: {video_info['nickname']}") + text_parts.append(f" 抖音号: {video_info['aweme_id']}") + text_parts.append(f" 标题: {video_info['desc']}") + text_parts.append(f" 点赞: {format_count(video_info['like'])}") + text_parts.append(f" 类型: {video_info['type']}") + + # 如果是音乐,添加音乐信息 + if video_info.get('music'): + music_info = video_info['music'] + text_parts.append("--------------------") + text_parts.append(" 背景音乐:") + text_parts.append(f" 标题: {music_info.get('title', '')}") + text_parts.append(f" 作者: {music_info.get('author', '')}") + + text_parts.append("--------------------") + text_parts.append(f" 原始链接: {url}") + + text_message = "\n".join(text_parts) + + # 准备转发消息节点 + nodes = [] + + # 添加文本信息节点 + text_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=DOUYIN_NICKNAME, + message=text_message + ) + nodes.append(text_node) + + # 添加封面图片节点(如果有) + if video_info.get('cover'): + try: + cover_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=DOUYIN_NICKNAME, + message=[ + MessageSegment.text("抖音视频封面:\n"), + MessageSegment.image(video_info['cover']) + ] + ) + nodes.append(cover_node) + except Exception as e: + logger.warning(f"[douyin_parser] 无法添加封面图片: {e}") + + # 添加作者头像节点(如果有) + if video_info.get('author_avatar'): + try: + avatar_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=DOUYIN_NICKNAME, + message=[ + MessageSegment.text("作者头像:\n"), + MessageSegment.image(video_info['author_avatar']) + ] + ) + nodes.append(avatar_node) + except Exception as e: + logger.warning(f"[douyin_parser] 无法添加作者头像: {e}") + + # 尝试添加视频直链(单独节点) + video_success = False + try: + if video_info.get('video_url'): + video_url = video_info.get('video_url', '') + # 检查视频类型 + if video_info.get('type') == 'video': + video_message = MessageSegment.video(video_url) + video_type_text = "视频直链:" + else: # image类型 + video_message = MessageSegment.image(video_url) # 单个图片 + video_type_text = "图集首图:" + + # 构建视频/图片节点 + video_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=DOUYIN_NICKNAME, + message=[ + MessageSegment.text(video_type_text + "\n"), + video_message + ] + ) + nodes.append(video_node) + video_success = True + except Exception as e: + logger.error(f"[douyin_parser] 无法添加视频/图片: {e}") + + # 如果无法添加视频,添加提示信息 + if not video_success: + no_video_node = event.bot.build_forward_node( + user_id=event.self_id, + nickname=DOUYIN_NICKNAME, + message="视频解析成功,但无法获取直链或播放视频。" + ) + nodes.append(no_video_node) + + logger.success(f"[douyin_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['desc'][:20]}...") + + # 发送合并转发消息 + try: + # 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊 + await event.bot.send_forwarded_messages(target=event, nodes=nodes) + except Exception as e: + # 如果发送合并转发失败,尝试单独发送文本信息 + logger.error(f"[douyin_parser] 发送合并转发失败: {e}") + + # 构建替代的简单文本回复,避免电脑端显示问题 + simple_reply = f"抖音视频解析成功\n{text_message}\n\n如果无法查看视频内容,请复制原始链接到浏览器打开:{url}" + await event.reply(simple_reply) + + # 如果有封面,尝试单独发送 + if video_info.get('cover'): + try: + await event.reply(MessageSegment.image(video_info['cover'])) + except Exception: + pass + + except Exception as e: + logger.error(f"[douyin_parser] 处理抖音链接时发生错误: {e}") + await event.reply("处理抖音链接时发生错误,请稍后再试。") + return \ No newline at end of file diff --git a/profile_main.py b/profile_main.py new file mode 100644 index 0000000..075ee82 --- /dev/null +++ b/profile_main.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +""" +性能分析入口文件 + +用于启动带有性能分析功能的应用程序。 + +使用方法: + python profile_main.py [options] + +选项: + -h, --help 显示帮助信息 + --profile, -p 启用详细性能分析(使用 pyinstrument) + --memory, -m 启用内存使用分析 + --output, -o FILE 性能分析报告输出文件(HTML格式) + --threshold, -t SEC 设置性能监控阈值(秒) + --stats, -s 在程序结束时输出性能统计报告 +""" + +import sys +import argparse +import os + +# 将项目根目录添加到 sys.path +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, ROOT_DIR) + +# 解析命令行参数 +parser = argparse.ArgumentParser(description='性能分析入口文件') +parser.add_argument('--profile', '-p', action='store_true', help='启用详细性能分析(使用 pyinstrument)') +parser.add_argument('--memory', '-m', action='store_true', help='启用内存使用分析') +parser.add_argument('--output', '-o', type=str, default='performance_report.html', help='性能分析报告输出文件(HTML格式)') +parser.add_argument('--threshold', '-t', type=float, default=0.5, help='设置性能监控阈值(秒)') +parser.add_argument('--stats', '-s', action='store_true', help='在程序结束时输出性能统计报告') + +args = parser.parse_args() + +# 设置全局性能分析配置 +os.environ['PERFORMANCE_PROFILE'] = '1' if args.profile else '0' +os.environ['PERFORMANCE_MEMORY'] = '1' if args.memory else '0' +os.environ['PERFORMANCE_OUTPUT'] = args.output +os.environ['PERFORMANCE_THRESHOLD'] = str(args.threshold) +os.environ['PERFORMANCE_STATS'] = '1' if args.stats else '0' + +# 导入并运行主程序 +from core.utils.performance import profile, aprofile +from main import main +import asyncio + +async def main_with_profile(): + """ + 带有性能分析的主函数入口 + """ + if args.profile: + # 使用 pyinstrument 进行详细性能分析 + from pyinstrument import Profiler + from pyinstrument.renderers import HTMLRenderer + + profiler = Profiler() + profiler.start() + + try: + await main() + finally: + profiler.stop() + + # 输出分析结果到控制台 + print("\n" + "=" * 80) + print("性能分析结果") + print("=" * 80) + print(profiler.print()) + + # 保存HTML报告 + try: + html = profiler.render(HTMLRenderer()) + with open(args.output, 'w', encoding='utf-8') as f: + f.write(html) + print(f"\n性能分析报告已保存到: {args.output}") + except Exception as e: + print(f"\n保存性能分析报告失败: {e}") + else: + # 不使用详细分析,直接运行 + await main() + +if __name__ == "__main__": + try: + asyncio.run(main_with_profile()) + finally: + # 输出性能统计报告 + if args.stats: + from core.utils.performance import performance_stats + print("\n" + "=" * 80) + print("性能统计报告") + print("=" * 80) + print(performance_stats.report()) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..2352bf4 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,4 @@ +# 开发依赖 +pyinstrument>=4.5.0 # 性能分析工具,支持异步代码 +memory-profiler>=0.61.0 # 内存分析工具 +psutil>=5.9.8 # 系统资源监控 diff --git a/scripts/compile_machine_code.py b/scripts/compile_machine_code.py index 70c6992..7aedd7e 100644 --- a/scripts/compile_machine_code.py +++ b/scripts/compile_machine_code.py @@ -1,5 +1,349 @@ #!/usr/bin/env python3 """ +优化版跨平台 Python 模块编译脚本 + +将核心 Python 模块编译为机器码(.pyd 或 .so)以提升性能。 +此版本基于对项目结构的深入分析,包含了更多高频使用的模块。 + +支持的平台: +- Windows: 生成 .pyd 文件 +- Linux: 生成 .so 文件 + +使用方法: + python compile_machine_code.py [options] + +选项: + --compile, -c 编译指定的模块(默认) + --list, -l 列出已编译的模块 + --clean, -k 清理编译生成的文件 + --help, -h 显示帮助信息 + +注意: + 1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC) + 2. 需要安装 mypyc: pip install mypyc + 3. 编译后的文件是平台相关的,不能跨平台复制 + 4. 建议在部署的目标环境上运行此脚本 + 5. Mypyc 不支持动态特性,如 eval/exec/getattr/setattr 等 +""" +import os +import sys +import glob +import subprocess +import shutil +import argparse + +# 检测当前平台和 Python 版本 +PLATFORM = sys.platform +PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" # 例如 "314" + +if PLATFORM.startswith('win'): + EXTENSION = '.pyd' + BUILD_PREFIX = f'cp{PYTHON_VERSION}-win_amd64' + BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-{PYTHON_VERSION}') +elif PLATFORM.startswith('linux'): + EXTENSION = '.so' + BUILD_PREFIX = f'cp{PYTHON_VERSION}-x86_64-linux-gnu' + BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-{PYTHON_VERSION}') +else: + print(f"不支持的平台: {PLATFORM}") + sys.exit(1) + +# 根据项目分析,优化要编译的模块列表 +# 这些是项目中使用频率最高的模块,编译后能显著提升性能 +MODULES = [ + # 工具模块 - 高频使用 + 'core/utils/json_utils.py', # JSON 处理 - 高频使用 + 'core/utils/executor.py', # 代码执行引擎 - 高频使用 + 'core/utils/exceptions.py', # 自定义异常 - 基础组件 + 'core/utils/performance.py', # 性能监控工具 - 重要组件 + 'core/utils/logger.py', # 日志模块 - 高频使用 + 'core/utils/singleton.py', # 单例模式 - 基础组件 + + # 核心管理模块 - 高频使用 + # 'core/managers/command_manager.py', # 指令匹配和分发 - 包含动态特性,不适合编译 + # 'core/managers/admin_manager.py', # 管理员管理 - 包含动态特性,不适合编译 + # 'core/managers/permission_manager.py', # 权限管理 - 包含动态特性,不适合编译 + # 'core/managers/plugin_manager.py', # 插件管理器 - 包含动态特性,不适合编译 + # 'core/managers/redis_manager.py', # Redis 管理器 - 包含动态特性,不适合编译 + # 'core/managers/image_manager.py', # 图片管理器 - 包含动态特性,不适合编译 + + # 核心基础模块 - 高频使用 + 'core/ws.py', # WebSocket 核心 - 核心通信,被10个文件引用 + # 'core/bot.py', # Bot 核心抽象 - 使用多重继承,不适合编译 + 'core/config_loader.py', # 配置加载 - 启动必需,被7个文件引用 + # 'core/config_models.py', # 配置模型 - 包含复杂类型定义,不适合编译 + # 'core/permission.py', # 权限枚举 - 包含动态属性,不适合编译 + + # 数据模型 - 高频使用 + 'models/message.py', # 消息段模型 - 高频消息处理 + 'models/sender.py', # 发送者模型 - 高频消息处理 + 'models/objects.py', # API 响应数据模型 - 高频数据处理 + + # 事件处理相关 - 高频使用 + 'core/handlers/event_handler.py', # 事件处理器 - 核心事件处理 + + # 事件模型 - 高频使用,但包含dataclass,可能有编译问题,暂时排除 + # 'models/events/message.py', # 消息事件 - 最高频事件类型 + # 'models/events/notice.py', # 通知事件 - 高频事件类型 + # 'models/events/request.py', # 请求事件 - 高频事件类型 + # 'models/events/meta.py', # 元事件 - 高频事件类型 + + # 注意:以下文件不适合编译 + # - 主程序文件(main.py) + # - 测试文件(tests/目录) + # - 插件文件(plugins/目录) + # - 编译(脚本compile_machine_code.py等) + # - 包含复杂动态特性的文件 + # - API 基础类(由于多重继承问题) +] + +def list_compiled_modules(): + """列出已编译的模块""" + print(f"\n已编译的 {PLATFORM} 模块:") + print("=" * 50) + + # 查找所有编译后的文件 + compiled_files = [] + for ext in [EXTENSION, f'__mypyc{EXTENSION}']: + compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True)) + + # 过滤掉虚拟环境中的文件 + compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f] + + if compiled_files: + for f in sorted(compiled_files): + size = os.path.getsize(f) // 1024 # KB + print(f"{f} ({size} KB)") + else: + print(f"未找到已编译的 {EXTENSION} 文件") + + print(f"\n总计: {len(compiled_files)} 个文件") + +def clean_compiled_files(): + """清理编译生成的文件""" + print(f"\n清理编译生成的 {EXTENSION} 文件...") + + # 查找所有编译后的文件 + compiled_files = [] + for ext in [EXTENSION, f'__mypyc{EXTENSION}']: + compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True)) + + # 过滤掉虚拟环境中的文件 + compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f] + + if compiled_files: + for f in sorted(compiled_files): + try: + os.remove(f) + print(f"已删除: {f}") + except Exception as e: + print(f"删除失败 {f}: {e}") + + # 清理 build 目录 + if os.path.exists('build'): + try: + shutil.rmtree('build') + print("已删除 build 目录") + except Exception as e: + print(f"删除 build 目录失败: {e}") + else: + print(f"没有可清理的 {EXTENSION} 文件") + +def get_platform_specific_module_name(module_path): + """获取平台特定的模块文件名""" + module_name = module_path.replace('.py', '') + return f"{module_name}.{BUILD_PREFIX}{EXTENSION}" + +def compile_module(module_path): + """编译单个模块""" + print(f"\n编译: {module_path}") + + try: + # 直接调用 mypyc 命令行工具 + # 使用二进制模式捕获输出以避免编码问题 + result = subprocess.run( + [sys.executable, '-m', 'mypyc', module_path], + capture_output=True, + check=True + ) + + # 解码输出时处理可能的编码错误 + try: + stdout_text = result.stdout.decode('utf-8', errors='replace') + stderr_text = result.stderr.decode('utf-8', errors='replace') + except AttributeError: + # 如果已经是字符串(Python 3.7+),则直接使用 + stdout_text = result.stdout + stderr_text = result.stderr + + # 获取平台特定的模块名 + platform_module = get_platform_specific_module_name(module_path) + mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}') + + # 检查编译产物是否在当前目录 + if os.path.exists(platform_module): + print(f" ✓ 编译成功: {platform_module}") + return True + else: + # 检查 build 目录中是否有编译产物 + build_module_path = os.path.join(BUILD_PATH, platform_module) + build_mypyc_path = os.path.join(BUILD_PATH, mypyc_platform_module) + + if os.path.exists(build_module_path): + # 如果在 build 目录中,复制到正确位置 + os.makedirs(os.path.dirname(platform_module), exist_ok=True) + shutil.copy2(build_module_path, platform_module) + if os.path.exists(build_mypyc_path): + shutil.copy2(build_mypyc_path, mypyc_platform_module) + print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}") + return True + else: + print(" ✗ 编译失败:找不到编译产物") + if result.stdout: + print(f" 编译输出:{stdout_text[:500]}...") + if result.stderr: + print(f" 错误信息:{stderr_text[:500]}...") + return False + + except subprocess.CalledProcessError as e: + print(f" ✗ 编译失败,退出码: {e.returncode}") + if hasattr(e, 'stdout') and e.stdout: + try: + stdout_text = e.stdout.decode('utf-8', errors='replace') if isinstance(e.stdout, bytes) else e.stdout + print(f" 编译输出:{stdout_text[:500]}...") + except Exception: + print(f" 编译输出:{str(e.stdout)[:500]}...") + if hasattr(e, 'stderr') and e.stderr: + try: + stderr_text = e.stderr.decode('utf-8', errors='replace') if isinstance(e.stderr, bytes) else e.stderr + print(f" 错误信息:{stderr_text[:500]}...") + except Exception: + print(f" 错误信息:{str(e.stderr)[:500]}...") + return False + except Exception as e: + print(f" ✗ 编译失败,意外错误: {e}") + import traceback + traceback.print_exc() + return False + +def should_skip_module(module_path): + """检查模块是否应该被跳过编译""" + try: + with open(module_path, 'r', encoding='utf-8') as f: + content = f.read() + + # 检查是否包含抽象基类相关代码 + if 'from abc import ABC' in content or 'from abc import abstractmethod' in content: + return True, "包含抽象基类,不适合编译" + + # 检查是否包含危险的动态特性 + # 注意:我们允许基本的动态特性,如getattr,但对于eval、exec等危险操作仍然阻止 + if ('eval(' in content or 'exec(' in content or + 'compile(' in content): + return True, "包含危险动态特性,不适合编译" + + # 检查是否包含复杂的动态属性访问 + if ('__dict__' in content or '__class__' in content or + '__module__' in content or '__bases__' in content): + return True, "包含复杂动态特性,不适合编译" + + # 检查是否包含复杂的动态属性访问 + if '.__dict__' in content or '.__class__' in content: + return True, "包含复杂动态特性,不适合编译" + + return False, "" + except Exception as e: + return True, f"读取文件时出错: {e}" + +def compile_all_modules(): + """编译所有指定的模块""" + print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})") + print("=" * 60) + + # 验证模块文件是否存在并检查是否适合编译 + valid_modules = [] + skipped_modules = [] + + for module_path in MODULES: + if os.path.exists(module_path): + should_skip, reason = should_skip_module(module_path) + if should_skip: + print(f"跳过: {module_path} ({reason})") + skipped_modules.append((module_path, reason)) + else: + valid_modules.append(module_path) + else: + print(f"警告: 模块 {module_path} 不存在,将被跳过") + + print(f"\n有效模块: {len(valid_modules)}, 跳过模块: {len(skipped_modules)}") + + if not valid_modules: + print("错误: 没有有效的模块可编译") + return False + + # 编译模块 + success_count = 0 + failed_modules = [] + + for module_path in valid_modules: + if compile_module(module_path): + success_count += 1 + else: + failed_modules.append(module_path) + + print("\n" + "=" * 60) + print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功") + + if failed_modules: + print(f"失败模块: {failed_modules}") + + if success_count == len(valid_modules): + print("✓ 所有模块编译成功") + return True + else: + print("✗ 部分模块编译失败") + return False + +def main(): + """主函数""" + # 检查 Python 版本 + if not (sys.version_info.major == 3 and sys.version_info.minor >= 8): + print("警告: 推荐使用 Python 3.8+ 以获得最佳性能") + print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}") + print("继续编译可能导致兼容性问题") + print() + + parser = argparse.ArgumentParser(description='优化版跨平台 Python 模块编译脚本') + + group = parser.add_mutually_exclusive_group() + group.add_argument('--compile', '-c', action='store_true', default=True, + help='编译指定的模块 (默认)') + group.add_argument('--list', '-l', action='store_true', + help='列出已编译的模块') + group.add_argument('--clean', '-k', action='store_true', + help='清理编译生成的文件') + + args = parser.parse_args() + + # 检查是否安装了 mypyc + try: + import mypyc + except ImportError: + print("错误: 未安装 mypyc,请先安装: pip install mypyc") + sys.exit(1) + + if args.list: + list_compiled_modules() + elif args.clean: + clean_compiled_files() + else: + compile_all_modules() + print("\n使用 --list 选项查看已编译的模块") + print("使用 --clean 选项清理编译文件") + +if __name__ == '__main__': + main()#!/usr/bin/env python3 +""" 跨平台 Python 模块编译脚本 将核心 Python 模块编译为机器码(.pyd 或 .so)以提升性能。 @@ -30,18 +374,16 @@ import subprocess import shutil import argparse -# 检测当前平台和 Python 版本 +# 检测当前平台 PLATFORM = sys.platform -PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" # 例如 "314" - if PLATFORM.startswith('win'): EXTENSION = '.pyd' - BUILD_PREFIX = f'cp{PYTHON_VERSION}-win_amd64' - BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-{PYTHON_VERSION}') + BUILD_PREFIX = 'cp314-win_amd64' + BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-314') elif PLATFORM.startswith('linux'): EXTENSION = '.so' - BUILD_PREFIX = f'cp{PYTHON_VERSION}-x86_64-linux-gnu' - BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-{PYTHON_VERSION}') + BUILD_PREFIX = 'cp314-x86_64-linux-gnu' + BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-314') else: print(f"不支持的平台: {PLATFORM}") sys.exit(1) @@ -265,13 +607,6 @@ def compile_all_modules(): def main(): """主函数""" - # 检查 Python 版本 - if not (sys.version_info.major == 3 and sys.version_info.minor == 14): - print("警告: 推荐使用 Python 3.14 以获得最佳性能") - print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}") - print("继续编译可能导致兼容性问题") - print() - parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本') group = parser.add_mutually_exclusive_group() diff --git a/scripts/compile_modules.py b/scripts/compile_modules.py index f869d9e..a47bc03 100644 --- a/scripts/compile_modules.py +++ b/scripts/compile_modules.py @@ -66,7 +66,7 @@ def main(): if compile_module(module): success_count += 1 - print(f"\n--- Compilation Summary ---") + print("\n--- Compilation Summary ---") print(f"Total modules: {len(modules)}") print(f"Successfully compiled: {success_count}") print(f"Failed: {len(modules) - success_count}") diff --git a/setup_mypyc.py b/setup_mypyc.py index 506c533..aed07be 100644 --- a/setup_mypyc.py +++ b/setup_mypyc.py @@ -10,11 +10,8 @@ Mypyc 编译脚本 2. 编译后的文件 (.pyd 或 .so) 是平台相关的,不能跨平台复制。 3. 建议在部署的目标环境 (Linux) 上运行此脚本。 """ -from distutils.core import setup -from mypyc.build import mypycify import os import sys -import glob import subprocess # 基础模块列表 @@ -102,7 +99,7 @@ for module_path in valid_modules: print(f" ✓ Compiled successfully (copied from build directory): {pyd_path}") success_count += 1 else: - print(f" ✗ Compiled but cannot find pyd file") + print(" ✗ Compiled but cannot find pyd file") print(f" Build output:\n{result.stdout[:500]}...") except subprocess.CalledProcessError as e: print(f" ✗ Compilation failed with exit code {e.returncode}") @@ -110,7 +107,7 @@ for module_path in valid_modules: except Exception as e: print(f" ✗ Unexpected error: {e}") -print(f"\n--- Compilation Summary ---") +print("\n--- Compilation Summary ---") print(f"Total modules: {len(valid_modules)}") print(f"Successfully compiled: {success_count}") print(f"Failed: {len(valid_modules) - success_count}") diff --git a/test_performance_simple.py b/test_performance_simple.py new file mode 100644 index 0000000..8c2687a --- /dev/null +++ b/test_performance_simple.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +""" +简单的性能分析功能测试脚本 +""" + +import asyncio +import time +from core.utils.performance import ( + timeit, + profile, + performance_stats +) + + +print("=" * 80) +print("性能分析功能测试") +print("=" * 80) + +# 重置全局性能统计 +performance_stats.reset() + +# 测试1: 同步函数的时间测量 +@timeit +def sync_test(): + """同步测试函数""" + time.sleep(0.1) + return "sync done" + +# 测试2: 异步函数的时间测量 +@timeit +async def async_test(): + """异步测试函数""" + await asyncio.sleep(0.1) + return "async done" + +# 异步主函数 +async def main(): + # 同步函数测试 + print("执行同步函数...") + sync_result = sync_test() + print(f"同步函数结果: {sync_result}") + + # 异步函数测试 + print("\n执行异步函数...") + async_result = await async_test() + print(f"异步函数结果: {async_result}") + + # 测试3: 详细性能分析 + print("\n2. 测试性能分析上下文管理器:") + print("=" * 80) + + with profile(enabled=False): # 禁用实际分析以避免输出太多 + await asyncio.sleep(0.05) + print("性能分析上下文管理器测试完成") + + # 测试4: 性能统计报告 + print("\n3. 测试性能统计报告:") + print("=" * 80) + + # 执行多次函数调用 + for _ in range(3): + sync_test() + await async_test() + + # 生成并打印性能报告 + print("\n性能统计报告:") + print(performance_stats.report()) + + +# 执行测试 +print("\n1. 测试时间测量装饰器:") +print("=" * 80) + +# 使用 asyncio.run() 执行异步主函数 +asyncio.run(main()) + +print("\n" + "=" * 80) +print("所有测试完成!") +print("=" * 80) diff --git a/tests/test_performance.py b/tests/test_performance.py new file mode 100644 index 0000000..5a1537b --- /dev/null +++ b/tests/test_performance.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 +""" +性能分析工具测试 + +测试各种性能分析功能的正确性和可用性。 +""" + +import asyncio +import time +import pytest +from typing import Optional + +# 导入性能分析工具 +from core.utils.performance import ( + timeit, + profile, + aprofile, + memory_profile, + memory_profile_decorator, + performance_monitor, + PerformanceStats, + performance_stats +) + + +# 重置全局性能统计 +def setup_module(): + performance_stats.reset() + + +def teardown_module(): + performance_stats.reset() + + +class TestTimeitDecorator: + """测试 timeit 装饰器""" + + @timeit(log_level=20) # 使用 INFO 级别 + def test_sync_function(self): + """测试同步函数的时间测量""" + time.sleep(0.1) + return "done" + + @timeit(log_level=20) + async def test_async_function(self): + """测试异步函数的时间测量""" + await asyncio.sleep(0.1) + return "done" + + def test_sync_function_works(self): + """验证同步函数能正常执行""" + result = self.test_sync_function() + assert result == "done" + + @pytest.mark.asyncio + async def test_async_function_works(self): + """验证异步函数能正常执行""" + result = await self.test_async_function() + assert result == "done" + + +class TestProfileContextManager: + """测试 profile 上下文管理器""" + + def test_profile_sync_code(self): + """测试同步代码的性能分析""" + # 捕获标准输出 + import io + import sys + from contextlib import redirect_stdout + + f = io.StringIO() + with redirect_stdout(f): + with profile(enabled=False): # 禁用实际分析以提高测试速度 + time.sleep(0.01) + + output = f.getvalue() + # 应该没有输出(因为 enabled=False) + assert "性能分析" not in output + + @pytest.mark.asyncio + async def test_aprofile_async_function(self): + """测试异步函数的性能分析""" + async def async_test(): + await asyncio.sleep(0.01) + return "test" + + result = await aprofile(async_test) + assert result == "test" + + +class TestPerformanceMonitor: + """测试 performance_monitor 装饰器""" + + @performance_monitor(threshold=0.05) + def test_slow_sync_function(self): + """测试慢速同步函数的监控""" + time.sleep(0.1) # 超过阈值 + return "slow" + + @performance_monitor(threshold=0.05) + def test_fast_sync_function(self): + """测试快速同步函数的监控""" + time.sleep(0.01) # 低于阈值 + return "fast" + + @performance_monitor(threshold=0.05) + async def test_slow_async_function(self): + """测试慢速异步函数的监控""" + await asyncio.sleep(0.1) + return "slow_async" + + def test_slow_function_triggers_warning(self): + """验证慢速函数会触发警告""" + result = self.test_slow_sync_function() + assert result == "slow" + + def test_fast_function_no_warning(self): + """验证快速函数不会触发警告""" + result = self.test_fast_sync_function() + assert result == "fast" + + @pytest.mark.asyncio + async def test_slow_async_function_triggers_warning(self): + """验证慢速异步函数会触发警告""" + result = await self.test_slow_async_function() + assert result == "slow_async" + + +class TestMemoryAnalysis: + """测试内存分析功能""" + + def test_memory_profile_context_manager(self): + """测试内存分析上下文管理器""" + # 禁用内存分析以提高测试速度 + with memory_profile(enabled=False): + data = [i for i in range(1000)] + sum(data) + + @memory_profile_decorator + def test_memory_intensive_function(self): + """测试内存密集型函数""" + # 小数据集,避免测试耗时过长 + data = [i for i in range(1000)] + return sum(data) + + def test_memory_function_works(self): + """验证内存分析函数能正常执行""" + result = self.test_memory_intensive_function() + assert result == 499500 + + +class TestPerformanceStats: + """测试性能统计功能""" + + def test_stats_initialization(self): + """测试性能统计对象初始化""" + stats = PerformanceStats() + assert isinstance(stats, PerformanceStats) + assert stats.stats == {} + + def test_stats_record(self): + """测试记录性能数据""" + stats = PerformanceStats() + stats.record("test_func", 0.1) + + assert "test_func" in stats.stats + assert stats.stats["test_func"]["count"] == 1 + assert stats.stats["test_func"]["total_time"] == 0.1 + assert stats.stats["test_func"]["avg_time"] == 0.1 + + def test_stats_report(self): + """测试生成性能报告""" + stats = PerformanceStats() + stats.record("func1", 0.1) + stats.record("func2", 0.2) + + report = stats.report() + assert isinstance(report, str) + assert "func1" in report + assert "func2" in report + + def test_stats_reset(self): + """测试重置性能统计""" + stats = PerformanceStats() + stats.record("test_func", 0.1) + stats.reset() + + assert stats.stats == {} + + def test_global_stats_recording(self): + """测试全局性能统计记录""" + # 先重置全局统计 + performance_stats.reset() + + @timeit(collect_stats=True) + def test_func(): + time.sleep(0.01) + + test_func() + + # 验证是否记录了性能数据 + assert "test_func" in performance_stats.stats + assert performance_stats.stats["test_func"]["count"] == 1 + + +class TestIntegration: + """综合测试""" + + @pytest.mark.asyncio + async def test_combined_features(self): + """测试多种性能分析功能的组合使用""" + # 重置全局统计 + performance_stats.reset() + + @timeit(collect_stats=True) + @performance_monitor(threshold=0.05) + async def test_async_func(): + await asyncio.sleep(0.06) # 超过阈值 + return "combined" + + result = await test_async_func() + assert result == "combined" + + # 验证性能统计 + assert "test_async_func" in performance_stats.stats + assert performance_stats.stats["test_async_func"]["count"] == 1 + + +if __name__ == "__main__": + # 运行基本测试 + print("开始性能分析功能测试...") + + # 测试同步函数 + @timeit + def test_sync(): + time.sleep(0.1) + return "sync" + + # 测试异步函数 + @timeit + async def test_async(): + await asyncio.sleep(0.1) + return "async" + + # 测试性能监控 + @performance_monitor(threshold=0.05) + def slow_func(): + time.sleep(0.1) + return "slow" + + # 运行测试 + sync_result = test_sync() + async_result = asyncio.run(test_async()) + slow_result = slow_func() + + print(f"\n测试结果:") + print(f"sync_result: {sync_result}") + print(f"async_result: {async_result}") + print(f"slow_result: {slow_result}") + + # 输出性能统计报告 + print("\n性能统计报告:") + print(performance_stats.report()) + + print("\n性能分析功能测试完成!")