Squash merge dev branch: Implement performance monitoring, auto-approve plugin, and fix various warnings

This commit is contained in:
2026-01-19 01:45:10 +08:00
parent ad8f7e761f
commit 698240b1a2
22 changed files with 1867 additions and 455 deletions

2
.gitignore vendored
View File

@@ -146,3 +146,5 @@ build/
# Scratch files
scratch_files/
/config.toml
/core/data/TEMP/*

View File

@@ -1,294 +0,0 @@
#!/usr/bin/env python3
"""
跨平台 Python 模块编译脚本
将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。
支持的平台:
- Windows: 生成 .pyd 文件
- Linux: 生成 .so 文件
使用方法:
python compile_machine_code.py [options]
选项:
--compile, -c 编译指定的模块(默认)
--list, -l 列出已编译的模块
--clean, -k 清理编译生成的文件
--help, -h 显示帮助信息
注意:
1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC)
2. 需要安装 mypyc: pip install mypyc
3. 编译后的文件是平台相关的,不能跨平台复制
4. 建议在部署的目标环境上运行此脚本
"""
import os
import sys
import glob
import subprocess
import shutil
import argparse
# 检测当前平台
PLATFORM = sys.platform
if PLATFORM.startswith('win'):
EXTENSION = '.pyd'
BUILD_PREFIX = 'cp314-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-314')
elif PLATFORM.startswith('linux'):
EXTENSION = '.so'
BUILD_PREFIX = 'cp314-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-314')
else:
print(f"不支持的平台: {PLATFORM}")
sys.exit(1)
# 要编译的模块列表
# 注意Mypyc 对动态特性支持有限,只选择计算密集或类型明确的模块
MODULES = [
# 工具模块
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/utils/singleton.py', # 单例模式基类
'core/utils/exceptions.py', # 自定义异常
'core/utils/logger.py', # 日志模块
# 核心管理模块
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/managers/plugin_manager.py', # 插件管理器
'core/managers/redis_manager.py', # Redis 管理器
'core/managers/image_manager.py', # 图片管理器
# 核心基础模块
'core/ws.py', # WebSocket 核心
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
'core/config_models.py', # 配置模型
'core/permission.py', # 权限枚举
# API 模块 - 注意:这些类会被 Bot 类多继承使用
# 因此不适合编译,否则会导致 "multiple bases have instance lay-out conflict" 错误
# 'core/api/base.py', # API 基础类
# 'core/api/account.py', # 账号相关 API
# 'core/api/friend.py', # 好友相关 API
# 'core/api/group.py', # 群组相关 API
# 'core/api/media.py', # 媒体相关 API
# 'core/api/message.py', # 消息相关 API
# 数据模型(适合编译的高频使用数据类)
'models/message.py', # 消息段模型
'models/sender.py', # 发送者模型
'models/objects.py', # API 响应数据模型
# 事件处理相关
'core/handlers/event_handler.py', # 事件处理器
# 注意:以下文件不适合编译
# - 主程序文件main.py
# - 测试文件tests/目录)
# - 插件文件plugins/目录)
# - 编译脚本compile_machine_code.py等
# - 临时文件scratch_files/目录)
# - 抽象基类models/events/base.py
# - 事件工厂models/events/factory.py
# - 包含复杂动态特性的文件
]
def list_compiled_modules():
"""列出已编译的模块"""
print(f"\n已编译的 {PLATFORM} 模块:")
print("=" * 50)
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
size = os.path.getsize(f) // 1024 # KB
print(f"{f} ({size} KB)")
else:
print(f"未找到已编译的 {EXTENSION} 文件")
print(f"\n总计: {len(compiled_files)} 个文件")
def clean_compiled_files():
"""清理编译生成的文件"""
print(f"\n清理编译生成的 {EXTENSION} 文件...")
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
try:
os.remove(f)
print(f"已删除: {f}")
except Exception as e:
print(f"删除失败 {f}: {e}")
# 清理 build 目录
if os.path.exists('build'):
try:
shutil.rmtree('build')
print("已删除 build 目录")
except Exception as e:
print(f"删除 build 目录失败: {e}")
else:
print(f"没有可清理的 {EXTENSION} 文件")
def get_platform_specific_module_name(module_path):
"""获取平台特定的模块文件名"""
module_name = module_path.replace('.py', '')
return f"{module_name}.{BUILD_PREFIX}{EXTENSION}"
def compile_module(module_path):
"""编译单个模块"""
print(f"\n编译: {module_path}")
try:
# 直接调用 mypyc 命令行工具
result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path],
capture_output=True,
text=True,
check=True
)
# 获取平台特定的模块名
platform_module = get_platform_specific_module_name(module_path)
mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}')
# 检查编译产物是否在当前目录
if os.path.exists(platform_module):
print(f" ✓ 编译成功: {platform_module}")
return True
else:
# 检查 build 目录中是否有编译产物
build_module_path = os.path.join(BUILD_PATH, platform_module)
build_mypyc_path = os.path.join(BUILD_PATH, mypyc_platform_module)
if os.path.exists(build_module_path):
# 如果在 build 目录中,复制到正确位置
os.makedirs(os.path.dirname(platform_module), exist_ok=True)
shutil.copy2(build_module_path, platform_module)
shutil.copy2(build_mypyc_path, mypyc_platform_module)
print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}")
return True
else:
print(f" ✗ 编译失败:找不到编译产物")
if result.stdout:
print(f" 编译输出:{result.stdout[:500]}...")
if result.stderr:
print(f" 错误信息:{result.stderr[:500]}...")
return False
except subprocess.CalledProcessError as e:
print(f" ✗ 编译失败,退出码: {e.returncode}")
if e.stdout:
print(f" 编译输出:{e.stdout[:500]}...")
if e.stderr:
print(f" 错误信息:{e.stderr[:500]}...")
return False
except Exception as e:
print(f" ✗ 编译失败,意外错误: {e}")
return False
def should_skip_module(module_path):
"""检查模块是否应该被跳过编译"""
try:
with open(module_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含抽象基类相关代码
if 'from abc import ABC' in content or 'from abc import abstractmethod' in content:
return True, "包含抽象基类,不适合编译"
# 检查是否包含动态特性
if 'eval(' in content or 'exec(' in content or 'getattr(' in content or 'setattr(' in content:
return True, "包含动态特性,不适合编译"
return False, ""
except Exception as e:
return True, f"读取文件时出错: {e}"
def compile_all_modules():
"""编译所有指定的模块"""
print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})")
print("=" * 60)
# 验证模块文件是否存在并检查是否适合编译
valid_modules = []
for module_path in MODULES:
if os.path.exists(module_path):
should_skip, reason = should_skip_module(module_path)
if should_skip:
print(f"跳过: {module_path} ({reason})")
else:
valid_modules.append(module_path)
else:
print(f"警告: 模块 {module_path} 不存在,将被跳过")
if not valid_modules:
print("错误: 没有有效的模块可编译")
return False
# 编译模块
success_count = 0
for module_path in valid_modules:
if compile_module(module_path):
success_count += 1
print(f"\n" + "=" * 60)
print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功")
if success_count == len(valid_modules):
print("✓ 所有模块编译成功")
return True
else:
print("✗ 部分模块编译失败")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()
group.add_argument('--compile', '-c', action='store_true', default=True,
help='编译指定的模块 (默认)')
group.add_argument('--list', '-l', action='store_true',
help='列出已编译的模块')
group.add_argument('--clean', '-k', action='store_true',
help='清理编译生成的文件')
args = parser.parse_args()
# 检查是否安装了 mypyc
try:
import mypyc
except ImportError:
print("错误: 未安装 mypyc请先安装: pip install mypyc")
sys.exit(1)
if args.list:
list_compiled_modules()
elif args.clean:
clean_compiled_files()
else:
compile_all_modules()
print("\n使用 --list 选项查看已编译的模块")
if __name__ == '__main__':
main()

View File

@@ -1,65 +0,0 @@
#!/usr/bin/env python3
"""
编译模块脚本
这个脚本会单独编译每个Python模块确保每个模块都在正确位置生成独立的.pyd文件。
"""
import os
import sys
import glob
from mypyc.build import mypycify
from distutils.core import setup
def compile_module(module_path):
"""
编译单个模块
Args:
module_path: 要编译的Python模块路径
"""
print(f"\nCompiling {module_path}...")
try:
ext_modules = mypycify([module_path])
setup(name=f'compiled_{os.path.basename(module_path).replace(".py", "")}',
ext_modules=ext_modules)
return True
except Exception as e:
print(f"Error compiling {module_path}: {e}")
return False
def main():
"""
主函数
"""
# 要编译的模块列表
modules = [
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/admin_manager.py', # 管理员管理
'core/managers/permission_manager.py', # 权限管理
'core/ws.py', # WebSocket 核心
'core/managers/plugin_manager.py', # 插件管理器
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
]
# 自动添加 events 模型
event_models = glob.glob('models/events/*.py')
event_models = [m for m in event_models if not m.endswith('__init__.py')]
modules.extend(event_models)
print(f"Found {len(modules)} modules to compile.")
success_count = 0
for module in modules:
if compile_module(module):
success_count += 1
print(f"\n--- Compilation Summary ---")
print(f"Total modules: {len(modules)}")
print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(modules) - success_count}")
if __name__ == '__main__':
main()

View File

@@ -1,25 +0,0 @@
[napcat_ws]
uri = "ws://114.66.58.203:3001"
token = "&d_VTfksE%}ul?_Y"
reconnect_interval = 5
[bot]
command = ["/"]
ignore_self_message = true #是否忽略自身消息
permission_denied_message = "权限不足,需要 {permission_name} 权限"
[redis]
host = "114.66.58.203"
port = 1931
db = 0
password = "redis_5dxyJG"
[docker]
base_url = "tcp://dockertest.k2cro4.my:2375"
sandbox_image = "python-sandbox:latest"
timeout = 10
concurrency_limit = 5
tls_verify = true
ca_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/ca.crt"
client_cert_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-cert.pem"
client_key_path = "c:/Users/镀铬酸钾/Documents/NeoBot/ca/client-key.pem"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 147 KiB

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
工具函数包
"""
# 导出核心工具
from .logger import logger
from .exceptions import *
from .json_utils import *
from .singleton import singleton
from .executor import run_in_thread_pool, initialize_executor
from .performance import (
timeit,
profile,
aprofile,
memory_profile,
memory_profile_decorator,
performance_monitor,
PerformanceStats,
performance_stats,
global_stats
)
__all__ = [
'logger',
'timeit',
'profile',
'aprofile',
'memory_profile',
'memory_profile_decorator',
'performance_monitor',
'PerformanceStats',
'performance_stats',
'global_stats',
'run_in_thread_pool',
'initialize_executor',
'singleton'
]

364
core/utils/performance.py Normal file
View File

@@ -0,0 +1,364 @@
#!/usr/bin/env python3
"""
性能分析工具模块
提供同步和异步函数的性能分析装饰器、上下文管理器和统计工具。
主要功能:
1. 函数执行时间分析(支持同步和异步)
2. 内存使用分析
3. 性能统计和报告生成
4. 低开销的生产环境监控
"""
import time
import functools
import logging
from typing import Dict, Any, Callable, Optional
import inspect
# 尝试导入性能分析库
try:
from pyinstrument import Profiler
from pyinstrument.renderers import HTMLRenderer
PYINSTRUMENT_AVAILABLE = True
except ImportError:
PYINSTRUMENT_AVAILABLE = False
# 尝试导入内存分析库
try:
from memory_profiler import memory_usage
MEMORY_PROFILER_AVAILABLE = True
except ImportError:
MEMORY_PROFILER_AVAILABLE = False
from .logger import logger
class PerformanceStats:
"""
性能统计工具类
用于收集和报告函数执行的性能指标
"""
def __init__(self):
self.stats: Dict[str, Dict[str, Any]] = {}
def record(self, func_name: str, duration: float, memory_used: Optional[float] = None):
"""
记录函数执行的性能数据
Args:
func_name: 函数名称
duration: 执行时间(秒)
memory_used: 使用的内存MB可选
"""
if func_name not in self.stats:
self.stats[func_name] = {
"count": 0,
"total_time": 0.0,
"avg_time": 0.0,
"min_time": float('inf'),
"max_time": 0.0,
"total_memory": 0.0,
"avg_memory": 0.0
}
stat = self.stats[func_name]
stat["count"] += 1
stat["total_time"] += duration
stat["avg_time"] = stat["total_time"] / stat["count"]
stat["min_time"] = min(stat["min_time"], duration)
stat["max_time"] = max(stat["max_time"], duration)
if memory_used is not None:
stat["total_memory"] += memory_used
stat["avg_memory"] = stat["total_memory"] / stat["count"]
def report(self) -> str:
"""
生成性能统计报告
Returns:
格式化的性能统计报告字符串
"""
if not self.stats:
return "暂无性能统计数据"
report = ["\n=== 性能统计报告 ===\n"]
report.append(f"{'函数名':<40} {'调用次数':<10} {'平均时间(ms)':<15} {'最长时间(ms)':<15} {'内存(MB)':<10}")
report.append("-" * 100)
for func_name, stat in sorted(self.stats.items(), key=lambda x: x[1]["total_time"], reverse=True):
memory_str = f"{stat['avg_memory']:.2f}" if stat['avg_memory'] > 0 else "-"
report.append(
f"{func_name:<40} {stat['count']:<10} {stat['avg_time']*1000:<15.2f} "
f"{stat['max_time']*1000:<15.2f} {memory_str:<10}"
)
report.append("=" * 100)
return "\n".join(report)
def reset(self):
"""
重置性能统计数据
"""
self.stats.clear()
# 创建全局性能统计实例
performance_stats = PerformanceStats()
def timeit(func: Callable = None, *, log_level: int = logging.INFO, collect_stats: bool = True):
"""
函数执行时间分析装饰器(支持同步和异步)
Args:
func: 要装饰的函数
log_level: 日志级别
collect_stats: 是否收集到全局统计中
Returns:
装饰后的函数
"""
def decorator(func: Callable) -> Callable:
func_name = func.__qualname__
is_coroutine = inspect.iscoroutinefunction(func)
if is_coroutine:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.perf_counter()
try:
result = await func(*args, **kwargs)
finally:
end_time = time.perf_counter()
duration = end_time - start_time
if collect_stats:
performance_stats.record(func_name, duration)
logger.log(log_level, f"[性能] {func_name} 执行时间: {duration*1000:.2f} ms")
return result
return async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
finally:
end_time = time.perf_counter()
duration = end_time - start_time
if collect_stats:
performance_stats.record(func_name, duration)
logger.log(log_level, f"[性能] {func_name} 执行时间: {duration*1000:.2f} ms")
return result
return sync_wrapper
if func is None:
return decorator
return decorator(func)
class profile:
"""
性能分析上下文管理器
使用 pyinstrument 进行详细的性能分析
"""
def __init__(self, enabled: bool = True, output_file: Optional[str] = None):
"""
Args:
enabled: 是否启用分析
output_file: 分析结果输出文件路径HTML格式
"""
self.enabled = enabled
self.output_file = output_file
self.profiler = None
def __enter__(self):
if self.enabled and PYINSTRUMENT_AVAILABLE:
self.profiler = Profiler()
self.profiler.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enabled and PYINSTRUMENT_AVAILABLE and self.profiler:
self.profiler.stop()
# 输出到日志
logger.info(f"[性能分析] {self.profiler.print()}")
# 如果指定了输出文件保存为HTML
if self.output_file:
try:
html = self.profiler.render(HTMLRenderer())
with open(self.output_file, 'w', encoding='utf-8') as f:
f.write(html)
logger.info(f"[性能分析] 报告已保存到: {self.output_file}")
except Exception as e:
logger.error(f"[性能分析] 保存报告失败: {e}")
async def aprofile(func: Callable, *args, **kwargs):
"""
异步函数性能分析
Args:
func: 要分析的异步函数
*args: 函数参数
**kwargs: 函数关键字参数
Returns:
函数执行结果
"""
if not PYINSTRUMENT_AVAILABLE:
logger.warning("[性能分析] pyinstrument 未安装,无法进行详细分析")
return await func(*args, **kwargs)
profiler = Profiler()
profiler.start()
try:
result = await func(*args, **kwargs)
finally:
profiler.stop()
logger.info(f"[性能分析] {profiler.print()}")
return result
class memory_profile:
"""
内存分析上下文管理器
"""
def __init__(self, interval: float = 0.1, enabled: bool = True):
"""
Args:
interval: 内存采样间隔(秒)
enabled: 是否启用内存分析
"""
self.interval = interval
self.enabled = enabled
self.memory_start = 0.0
self.memory_end = 0.0
def __enter__(self):
if self.enabled and MEMORY_PROFILER_AVAILABLE:
self.memory_start = memory_usage()[0]
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.enabled and MEMORY_PROFILER_AVAILABLE:
self.memory_end = memory_usage()[0]
memory_used = self.memory_end - self.memory_start
logger.info(f"[内存分析] 使用内存: {memory_used:.2f} MB")
def memory_profile_decorator(func: Callable = None, *, interval: float = 0.1):
"""
内存分析装饰器(支持同步函数)
Args:
func: 要装饰的函数
interval: 内存采样间隔
Returns:
装饰后的函数
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not MEMORY_PROFILER_AVAILABLE:
return func(*args, **kwargs)
mem_usage = memory_usage(
(func, args, kwargs),
interval=interval,
timeout=None,
include_children=False
)
max_memory = max(mem_usage)
logger.info(f"[内存分析] {func.__qualname__} 最大内存使用: {max_memory:.2f} MB")
return func(*args, **kwargs)
return wrapper
if func is None:
return decorator
return decorator(func)
def performance_monitor(func: Callable = None, *, threshold: float = 1.0):
"""
性能监控装饰器
仅当函数执行时间超过阈值时记录日志
适合生产环境使用
Args:
func: 要装饰的函数
threshold: 时间阈值(秒)
Returns:
装饰后的函数
"""
def decorator(func: Callable) -> Callable:
func_name = func.__qualname__
is_coroutine = inspect.iscoroutinefunction(func)
if is_coroutine:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = await func(*args, **kwargs)
end_time = time.perf_counter()
duration = end_time - start_time
if duration > threshold:
logger.warning(f"[性能监控] {func_name} 执行时间过长: {duration*1000:.2f} ms (阈值: {threshold*1000:.2f} ms)")
return result
return async_wrapper
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
duration = end_time - start_time
if duration > threshold:
logger.warning(f"[性能监控] {func_name} 执行时间过长: {duration*1000:.2f} ms (阈值: {threshold*1000:.2f} ms)")
return result
return sync_wrapper
if func is None:
return decorator
return decorator(func)
# 全局实例
global_stats = PerformanceStats()
__all__ = [
'timeit',
'profile',
'aprofile',
'memory_profile',
'memory_profile_decorator',
'performance_monitor',
'PerformanceStats',
'performance_stats',
'global_stats'
]

View File

@@ -1,10 +1,13 @@
"""
通用单例模式基类
"""
from typing import Any, Optional, Type, TypeVar
from typing import Any, Dict, Optional, Type, TypeVar
T = TypeVar('T')
# 存储每个类的实例
_instance_store: Dict[Type, Any] = {}
class Singleton:
"""
一个通用的单例基类
@@ -13,7 +16,6 @@ class Singleton:
它通过重写 __new__ 方法来确保每个类只有一个实例。
同时,它处理了重复初始化的问题,确保 __init__ 方法只在第一次实例化时被调用。
"""
_instance: Optional[Any] = None
_initialized: bool = False
def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T:
@@ -27,9 +29,10 @@ class Singleton:
Returns:
T: 单例实例
"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
# 使用全局字典存储实例,避免类型检查问题
if cls not in _instance_store:
_instance_store[cls] = super().__new__(cls)
return _instance_store[cls]
def __init__(self) -> None:
"""
@@ -38,3 +41,38 @@ class Singleton:
if self._initialized:
return
self._initialized = True
def singleton(cls: Type[T]) -> Type[T]:
"""
单例装饰器
将普通类转换为单例类,确保整个应用程序中只有一个实例。
Args:
cls: 要转换为单例的类
Returns:
Type[T]: 单例类
"""
# 为每个装饰的类创建一个实例存储
class_instance: Optional[T] = None
# 创建一个新的类,继承自原始类
class SingletonClass(cls):
"""单例包装类"""
def __new__(cls: Type[T], *args: Any, **kwargs: Any) -> T:
"""创建或返回现有的实例"""
nonlocal class_instance
if class_instance is None:
# 使用super()调用原始类的__new__方法
class_instance = cls(*args, **kwargs)
return class_instance
# 复制类的元数据
SingletonClass.__name__ = cls.__name__
SingletonClass.__doc__ = cls.__doc__
SingletonClass.__module__ = cls.__module__
return SingletonClass

View File

@@ -1,8 +0,0 @@
import subprocess
# 运行pip freeze命令获取所有依赖
result = subprocess.run(['pip', 'freeze'], capture_output=True, text=True)
# 将输出写入requirements.txt文件
with open('requirements.txt', 'w', encoding='utf-8') as f:
f.write(result.stdout)

64
main.py
View File

@@ -10,6 +10,59 @@ import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 初始化日志系统,必须在其他 core 模块导入之前执行
from core.utils.logger import logger
# 核心模块导入
from core.managers.admin_manager import admin_manager
from core.ws import WS
from core.managers import plugin_manager, matcher
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
from core.config_loader import global_config as config
# 检查 JIT 编译状态
def check_jit_status():
"""
检查 Python JIT 编译状态
该函数用于检测当前 Python 解释器是否启用了 JIT 编译功能,
并打印相关信息,帮助用户了解运行环境的性能优化状态。
"""
print("\n=== Python JIT 编译状态检查 ===")
# 检查解释器信息
print(f"Python 版本: {sys.version}")
print(f"解释器路径: {sys.executable}")
# 检查优化级别
print(f"优化级别 (-O): {sys.flags.optimize}")
# 检查 JIT 相关模块和功能
if sys.version_info >= (3, 10):
try:
# 对于 CPython 3.10+,检查是否启用了 JIT
import _opcode
if hasattr(_opcode, 'jit'):
print("JIT 状态: 已启用 (_opcode.jit)")
else:
print("JIT 状态: 未启用 (_opcode.jit 不可用)")
except ImportError:
print("JIT 状态: 未启用 (_opcode 模块不可用)")
else:
print("JIT 状态: 不可用 (需要 Python 3.10+)")
# 检查是否使用了 PyPy
if hasattr(sys, 'pypy_version_info'):
print(f"PyPy 版本: {sys.pypy_version_info}")
print("JIT 状态: 已启用 (PyPy 内置 JIT)")
print("==============================\n")
# 执行 JIT 状态检查
check_jit_status()
# 尝试使用高性能事件循环
try:
if sys.platform == 'win32':
@@ -25,17 +78,6 @@ try:
except ImportError:
print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环")
# 初始化日志系统,必须在其他 core 模块导入之前执行
from core.utils.logger import logger
from core.managers.admin_manager import admin_manager
from core.ws import WS
from core.managers import plugin_manager, matcher
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
from core.config_loader import global_config as config
# 将项目根目录添加到 sys.path
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT_DIR)

View File

@@ -4,7 +4,7 @@
定义了消息相关的事件类,包括 MessageEvent, PrivateMessageEvent, GroupMessageEvent。
"""
from dataclasses import dataclass, field
from typing import List, Optional, Union, ClassVar
from typing import List, Optional, Union
from core.permission import Permission
from models.message import MessageSegment
@@ -27,17 +27,19 @@ class Anonymous:
"""匿名用户 flag"""
# 权限级别常量,用于装饰器参数
# 定义在类外部,避免 dataclass 参数顺序问题
MESSAGE_EVENT_ADMIN = Permission.ADMIN
MESSAGE_EVENT_OP = Permission.OP
MESSAGE_EVENT_USER = Permission.USER
@dataclass(slots=True)
class MessageEvent(OneBotEvent):
"""
消息事件基类
"""
# 权限级别常量,用于装饰器参数
ADMIN: ClassVar[Permission] = Permission.ADMIN
OP: ClassVar[Permission] = Permission.OP
USER: ClassVar[Permission] = Permission.USER
message_type: str
"""消息类型: private (私聊), group (群聊)"""
@@ -70,6 +72,21 @@ class MessageEvent(OneBotEvent):
def post_type(self) -> str:
return EventType.MESSAGE
@property
def ADMIN(self) -> Permission:
"""权限级别常量,用于装饰器参数"""
return MESSAGE_EVENT_ADMIN
@property
def OP(self) -> Permission:
"""权限级别常量,用于装饰器参数"""
return MESSAGE_EVENT_OP
@property
def USER(self) -> Permission:
"""权限级别常量,用于装饰器参数"""
return MESSAGE_EVENT_USER
async def reply(self, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False):
"""
回复消息(抽象方法,由子类实现)

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
"""
性能分析配置示例
展示如何在项目中配置和使用性能分析功能。
"""
# 配置性能分析的使用方式
PERFORMANCE_CONFIG = {
# 全局性能分析开关
'enabled': True,
# 详细性能分析开关(使用 pyinstrument
'detailed': False,
# 内存分析开关
'memory': False,
# 性能监控阈值(秒)
'threshold': 0.5,
# 性能报告输出文件
'output_file': 'performance_report.html',
# 要监控的核心组件列表
'monitored_components': [
'core.ws.WS',
'core.managers.plugin_manager',
'core.managers.browser_manager',
'core.utils.executor.CodeExecutor',
'core.handlers.event_handler',
]
}
def get_performance_config():
"""
获取性能分析配置
Returns:
dict: 性能分析配置
"""
import os
import json
# 从环境变量加载配置
config = PERFORMANCE_CONFIG.copy()
if os.environ.get('PERFORMANCE_PROFILE'):
config['detailed'] = os.environ['PERFORMANCE_PROFILE'] == '1'
if os.environ.get('PERFORMANCE_MEMORY'):
config['memory'] = os.environ['PERFORMANCE_MEMORY'] == '1'
if os.environ.get('PERFORMANCE_THRESHOLD'):
try:
config['threshold'] = float(os.environ['PERFORMANCE_THRESHOLD'])
except ValueError:
pass
if os.environ.get('PERFORMANCE_OUTPUT'):
config['output_file'] = os.environ['PERFORMANCE_OUTPUT']
if os.environ.get('PERFORMANCE_STATS'):
config['enabled'] = os.environ['PERFORMANCE_STATS'] == '1'
return config
if __name__ == "__main__":
# 打印当前配置
print("当前性能分析配置:")
print("=" * 50)
config = get_performance_config()
for key, value in config.items():
print(f"{key}: {value}")

View File

@@ -30,7 +30,7 @@ HEADERS = {
# 全局共享的 ClientSession
_session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession(headers=HEADERS)
@@ -55,7 +55,7 @@ def format_duration(seconds: int) -> str:
async def get_real_url(short_url: str) -> Optional[str]:
try:
session = await get_session()
session = get_session()
async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response:
if response.status == 302:
return response.headers.get('Location')
@@ -65,22 +65,71 @@ async def get_real_url(short_url: str) -> Optional[str]:
async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
try:
session = await get_session()
async with session.get(video_url, headers=HEADERS, timeout=5) as response:
# 清理URL去掉不必要的查询参数只保留基本的视频URL
clean_url = video_url.split('?')[0]
if '#/' in clean_url:
clean_url = clean_url.split('#/')[0]
session = get_session()
async with session.get(clean_url, headers=HEADERS, timeout=5) as response:
response.raise_for_status()
text = await response.text()
soup = BeautifulSoup(text, 'html.parser')
# 尝试多种方式获取视频数据
# 方式1: 尝试获取 __INITIAL_STATE__
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag or not script_tag.string:
# 方式2: 尝试获取 __PLAYINFO__
script_tag = soup.find('script', text=re.compile('window.__PLAYINFO__'))
if not script_tag or not script_tag.string:
# 方式3: 尝试获取页面标题和其他信息
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# 提取BV号
bv_match = re.search(r'(BV\w{10})', clean_url)
bvid = bv_match.group(1) if bv_match else '未知BV号'
return {
"title": title.replace('_哔哩哔哩_bilibili', '').strip(),
"bvid": bvid,
"duration": 0,
"cover_url": '',
"play": 0,
"like": 0,
"coin": 0,
"favorite": 0,
"share": 0,
"owner_name": '未知UP主',
"owner_avatar": '',
"followers": 0,
}
return None
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^\}]*\});', script_tag.string)
# 原始解析逻辑
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{[^}]*\});', script_tag.string)
if not match:
# 尝试另一种正则表达式
match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string, re.DOTALL)
if not match:
return None
json_str = match.group(1)
data = json.loads(json_str)
# 清理JSON字符串中的潜在问题字符
json_str = json_str.strip().rstrip(';')
try:
data = json.loads(json_str)
except json.JSONDecodeError:
# 如果直接解析失败尝试清理JSON字符串
# 移除可能的注释或无效字符
cleaned_json = re.sub(r',\s*[}]', '}', json_str) # 移除末尾多余的逗号
cleaned_json = re.sub(r'/\*.*?\*/', '', cleaned_json) # 移除注释
cleaned_json = re.sub(r'//.*', '', cleaned_json) # 移除行注释
data = json.loads(cleaned_json)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
@@ -116,6 +165,10 @@ async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"解析视频信息失败: {e}")
logger.debug(f"失败的URL: {video_url}")
except Exception as e:
logger.error(f"解析视频信息时发生未知错误: {e}")
logger.debug(f"失败的URL: {video_url}")
return None
@@ -212,24 +265,32 @@ async def process_bili_link(event: MessageEvent, url: str):
:param event: 消息事件对象
:param url: 待处理的B站链接
"""
if "b23.tv" in url:
real_url = await get_real_url(url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。")
await event.reply("无法解析B站短链接")
return
else:
real_url = url.split('?')[0]
try:
if "b23.tv" in url:
real_url = await get_real_url(url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL")
await event.reply("无法解析B站短链接。")
return
else:
# 清理URL移除复杂查询参数只保留基本的视频URL
real_url = url.split('?')[0]
if '#/' in real_url:
real_url = real_url.split('#/')[0]
video_info = await parse_video_info(real_url)
if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
video_info = await parse_video_info(real_url)
if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
except Exception as e:
logger.error(f"[bili_parser] 处理B站链接时发生错误: {e}")
await event.reply("处理B站链接时发生错误请稍后再试。")
return
# 检查视频时长
video_message: Union[str, MessageSegment]
if video_info['duration'] > 300: # 5分钟 = 300秒
if video_info['duration'] > 1200: # 5分钟 = 300秒
video_message = "视频时长超过5分钟不进行解析。"
else:
direct_url = await get_direct_video_url(real_url)

391
plugins/douyin_parser.py Normal file
View File

@@ -0,0 +1,391 @@
# -*- coding: utf-8 -*-
import re
import json
import aiohttp
from typing import Optional, Dict, Any, Union
from cachetools import TTLCache
from core.utils.logger import logger
from core.managers.command_manager import matcher
from models import MessageEvent, MessageSegment
# 创建一个TTL缓存最大容量100缓存时间10秒
processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
# 插件元数据
__plugin_meta__ = {
"name": "douyin_parser",
"description": "自动解析抖音分享链接,提取视频信息和直链。",
"usage": "(自动触发)当检测到抖音分享链接时,自动发送视频信息。",
}
# 常量定义
DOUYIN_NICKNAME = "抖音视频解析"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br', # 重新启用br编码支持
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# 全局共享的 ClientSession
_session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession(headers=HEADERS)
return _session
def format_count(num: Union[int, str]) -> str:
try:
n = int(num)
if n < 10000:
return str(n)
return f"{n / 10000:.1f}"
except (ValueError, TypeError):
return str(num)
DOUYIN_URL_PATTERN = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线
DOUYIN_SHORT_PATTERN = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_]+/?", re.IGNORECASE) # 包含下划线
def extract_url_from_json_segments(segments):
"""
从消息的JSON段中提取抖音链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "json":
logger.info(f"[douyin_parser] 检测到JSON CQ码: {segment.data}")
try:
json_data = json.loads(segment.data.get("data", "{}"))
# 检查是否是抖音分享卡片
meta = json_data.get("meta", {})
if "detail_1" in meta:
detail = meta["detail_1"]
if "qqdocurl" in detail:
url = detail["qqdocurl"]
if "douyin.com" in url or "iesdouyin.com" in url:
logger.success(f"[douyin_parser] 成功从JSON卡片中提取到抖音链接: {url}")
return url
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[douyin_parser] 解析JSON失败: {e}")
continue
return None
def extract_url_from_text_segments(segments):
"""
从消息的文本段中提取抖音链接
:param segments: 消息段列表
:return: 提取到的URL或None
"""
for segment in segments:
if segment.type == "text":
text_content = segment.data.get("text", "")
# 查找抖音链接
match = DOUYIN_URL_PATTERN.search(text_content)
if match:
extracted_url = match.group(0)
logger.success(f"[douyin_parser] 成功从文本中提取到抖音链接: {extracted_url}")
return extracted_url
# 也检查是否有v.douyin.com格式的链接
short_match = DOUYIN_SHORT_PATTERN.search(text_content)
if short_match:
extracted_url = short_match.group(0)
logger.success(f"[douyin_parser] 成功从文本中提取到抖音短链接: {extracted_url}")
return extracted_url
return None
@matcher.on_message()
async def handle_douyin_share(event: MessageEvent):
"""
处理消息检测抖音分享链接JSON卡片或文本链接并进行解析。
:param event: 消息事件对象
"""
# 消息去重
if event.message_id in processed_messages:
return
processed_messages[event.message_id] = True
# 忽略机器人自己发送的消息,防止无限循环
if event.user_id == event.self_id:
return
# 1. 优先解析JSON卡片中的链接
url_to_process = extract_url_from_json_segments(event.message)
# 2. 如果未在JSON卡片中找到链接则在文本消息中查找
if not url_to_process:
url_to_process = extract_url_from_text_segments(event.message)
# 3. 如果找到了抖音链接,则进行处理
if url_to_process:
await process_douyin_link(event, url_to_process)
async def get_real_url(short_url: str) -> Optional[str]:
"""
获取抖音短链接的真实URL
:param short_url: 抖音短链接
:return: 真实URL或None
"""
try:
# 首先尝试获取重定向后的URL
async with aiohttp.ClientSession() as session:
# 添加更多头部信息模拟移动端访问
mobile_headers = HEADERS.copy() # 使用更新后的完整请求头
mobile_headers.update({
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
# 模拟移动设备的额外头部
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'https://www.douyin.com/'
})
async with session.get(short_url, headers=mobile_headers, allow_redirects=True, timeout=10) as response:
redirected_url = str(response.url)
# 检查重定向后的URL是否包含视频ID
# 抖音视频页通常包含 aweme_id 或 sec_uid 参数
if 'video/' in redirected_url or '/note/' in redirected_url:
logger.info(f"[douyin_parser] 重定向后的视频URL: {redirected_url}")
return redirected_url
elif 'share_item' in redirected_url:
# 如果URL中有share_item参数尝试从中提取视频信息
logger.info(f"[douyin_parser] 重定向后的分享URL: {redirected_url}")
return redirected_url
else:
# 如果重定向到了主页或其他非视频页面,尝试从响应中提取信息
logger.warning(f"[douyin_parser] 重定向到了非预期页面: {redirected_url}")
return redirected_url
except Exception as e:
logger.error(f"[douyin_parser] 获取真实URL失败: {e}")
return None
async def parse_douyin_video(video_url: str) -> Optional[Dict[str, Any]]:
"""
解析抖音视频信息
:param video_url: 抖音视频链接
:return: 视频信息字典或None
"""
try:
# 使用新的第三方API解析抖音视频
api_url = f"http://api.xhus.cn/api/douyin?url={video_url}"
session = await get_session()
async with session.get(api_url, headers=HEADERS, timeout=10) as response:
if response.status != 200:
logger.error(f"[douyin_parser] API请求失败状态码: {response.status}")
return None
response_data = await response.json()
if not isinstance(response_data, dict):
logger.error(f"[douyin_parser] API返回格式错误: {response_data}")
return None
if response_data.get("code") != 200:
logger.error(f"[douyin_parser] API返回错误: {response_data}")
return None
data = response_data.get("data", {})
if not data:
logger.error("[douyin_parser] API返回数据为空")
return None
# 新API的响应格式转换
return {
"type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image",
"video_url": data.get("url", ""), # 核心字段:视频播放地址
"video_url_HQ": data.get("url", ""), # 新API没有HQ字段使用同一个地址
"nickname": data.get("author", "未知作者"),
"desc": data.get("title", "无描述"),
"aweme_id": data.get("uid", ""),
"like": data.get("like", 0),
"cover": data.get("cover", ""),
"time": data.get("time", 0),
"author_avatar": data.get("avatar", ""),
"music": data.get("music", {}),
}
except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e:
logger.error(f"[douyin_parser] 解析抖音视频信息失败: {e}")
logger.debug(f"失败的URL: {video_url}")
except Exception as e:
logger.error(f"[douyin_parser] 解析抖音视频时发生未知错误: {e}")
logger.debug(f"失败的URL: {video_url}")
return None
async def process_douyin_link(event: MessageEvent, url: str):
"""
处理抖音链接,获取信息并回复
:param event: 消息事件对象
:param url: 待处理的抖音链接
"""
try:
# 直接将原始链接传递给API不需要获取真实URL
video_info = await parse_douyin_video(url)
if not video_info:
logger.error(f"[douyin_parser] 无法从 {url} 解析视频信息。")
await event.reply("无法获取视频信息,可能是抖音接口变动或视频不存在。")
return
# 构建回复消息,包含原分享中的文本内容(如果有)
original_text = ""
for segment in event.message:
if segment.type == "text":
text_content = segment.data.get("text", "")
# 提取除了链接以外的文本内容
# 移除链接和复制提示
cleaned_text = re.sub(DOUYIN_URL_PATTERN, '', text_content)
cleaned_text = re.sub(DOUYIN_SHORT_PATTERN, '', cleaned_text)
cleaned_text = re.sub(r'复制此链接打开Dou音搜索直接观看视频', '', cleaned_text)
cleaned_text = cleaned_text.strip()
if cleaned_text:
original_text = cleaned_text
break
# 构建回复消息
text_parts = ["抖音视频解析"]
text_parts.append("--------------------")
if original_text:
text_parts.append(f" 分享内容: {original_text}")
text_parts.append("--------------------")
text_parts.append(f" 作者: {video_info['nickname']}")
text_parts.append(f" 抖音号: {video_info['aweme_id']}")
text_parts.append(f" 标题: {video_info['desc']}")
text_parts.append(f" 点赞: {format_count(video_info['like'])}")
text_parts.append(f" 类型: {video_info['type']}")
# 如果是音乐,添加音乐信息
if video_info.get('music'):
music_info = video_info['music']
text_parts.append("--------------------")
text_parts.append(" 背景音乐:")
text_parts.append(f" 标题: {music_info.get('title', '')}")
text_parts.append(f" 作者: {music_info.get('author', '')}")
text_parts.append("--------------------")
text_parts.append(f" 原始链接: {url}")
text_message = "\n".join(text_parts)
# 准备转发消息节点
nodes = []
# 添加文本信息节点
text_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=text_message
)
nodes.append(text_node)
# 添加封面图片节点(如果有)
if video_info.get('cover'):
try:
cover_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=[
MessageSegment.text("抖音视频封面:\n"),
MessageSegment.image(video_info['cover'])
]
)
nodes.append(cover_node)
except Exception as e:
logger.warning(f"[douyin_parser] 无法添加封面图片: {e}")
# 添加作者头像节点(如果有)
if video_info.get('author_avatar'):
try:
avatar_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=[
MessageSegment.text("作者头像:\n"),
MessageSegment.image(video_info['author_avatar'])
]
)
nodes.append(avatar_node)
except Exception as e:
logger.warning(f"[douyin_parser] 无法添加作者头像: {e}")
# 尝试添加视频直链(单独节点)
video_success = False
try:
if video_info.get('video_url'):
video_url = video_info.get('video_url', '')
# 检查视频类型
if video_info.get('type') == 'video':
video_message = MessageSegment.video(video_url)
video_type_text = "视频直链:"
else: # image类型
video_message = MessageSegment.image(video_url) # 单个图片
video_type_text = "图集首图:"
# 构建视频/图片节点
video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message=[
MessageSegment.text(video_type_text + "\n"),
video_message
]
)
nodes.append(video_node)
video_success = True
except Exception as e:
logger.error(f"[douyin_parser] 无法添加视频/图片: {e}")
# 如果无法添加视频,添加提示信息
if not video_success:
no_video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=DOUYIN_NICKNAME,
message="视频解析成功,但无法获取直链或播放视频。"
)
nodes.append(no_video_node)
logger.success(f"[douyin_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['desc'][:20]}...")
# 发送合并转发消息
try:
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
await event.bot.send_forwarded_messages(target=event, nodes=nodes)
except Exception as e:
# 如果发送合并转发失败,尝试单独发送文本信息
logger.error(f"[douyin_parser] 发送合并转发失败: {e}")
# 构建替代的简单文本回复,避免电脑端显示问题
simple_reply = f"抖音视频解析成功\n{text_message}\n\n如果无法查看视频内容,请复制原始链接到浏览器打开:{url}"
await event.reply(simple_reply)
# 如果有封面,尝试单独发送
if video_info.get('cover'):
try:
await event.reply(MessageSegment.image(video_info['cover']))
except Exception:
pass
except Exception as e:
logger.error(f"[douyin_parser] 处理抖音链接时发生错误: {e}")
await event.reply("处理抖音链接时发生错误,请稍后再试。")
return

94
profile_main.py Normal file
View File

@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
性能分析入口文件
用于启动带有性能分析功能的应用程序。
使用方法:
python profile_main.py [options]
选项:
-h, --help 显示帮助信息
--profile, -p 启用详细性能分析(使用 pyinstrument
--memory, -m 启用内存使用分析
--output, -o FILE 性能分析报告输出文件HTML格式
--threshold, -t SEC 设置性能监控阈值(秒)
--stats, -s 在程序结束时输出性能统计报告
"""
import sys
import argparse
import os
# 将项目根目录添加到 sys.path
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT_DIR)
# 解析命令行参数
parser = argparse.ArgumentParser(description='性能分析入口文件')
parser.add_argument('--profile', '-p', action='store_true', help='启用详细性能分析(使用 pyinstrument')
parser.add_argument('--memory', '-m', action='store_true', help='启用内存使用分析')
parser.add_argument('--output', '-o', type=str, default='performance_report.html', help='性能分析报告输出文件HTML格式')
parser.add_argument('--threshold', '-t', type=float, default=0.5, help='设置性能监控阈值(秒)')
parser.add_argument('--stats', '-s', action='store_true', help='在程序结束时输出性能统计报告')
args = parser.parse_args()
# 设置全局性能分析配置
os.environ['PERFORMANCE_PROFILE'] = '1' if args.profile else '0'
os.environ['PERFORMANCE_MEMORY'] = '1' if args.memory else '0'
os.environ['PERFORMANCE_OUTPUT'] = args.output
os.environ['PERFORMANCE_THRESHOLD'] = str(args.threshold)
os.environ['PERFORMANCE_STATS'] = '1' if args.stats else '0'
# 导入并运行主程序
from core.utils.performance import profile, aprofile
from main import main
import asyncio
async def main_with_profile():
"""
带有性能分析的主函数入口
"""
if args.profile:
# 使用 pyinstrument 进行详细性能分析
from pyinstrument import Profiler
from pyinstrument.renderers import HTMLRenderer
profiler = Profiler()
profiler.start()
try:
await main()
finally:
profiler.stop()
# 输出分析结果到控制台
print("\n" + "=" * 80)
print("性能分析结果")
print("=" * 80)
print(profiler.print())
# 保存HTML报告
try:
html = profiler.render(HTMLRenderer())
with open(args.output, 'w', encoding='utf-8') as f:
f.write(html)
print(f"\n性能分析报告已保存到: {args.output}")
except Exception as e:
print(f"\n保存性能分析报告失败: {e}")
else:
# 不使用详细分析,直接运行
await main()
if __name__ == "__main__":
try:
asyncio.run(main_with_profile())
finally:
# 输出性能统计报告
if args.stats:
from core.utils.performance import performance_stats
print("\n" + "=" * 80)
print("性能统计报告")
print("=" * 80)
print(performance_stats.report())

4
requirements-dev.txt Normal file
View File

@@ -0,0 +1,4 @@
# 开发依赖
pyinstrument>=4.5.0 # 性能分析工具,支持异步代码
memory-profiler>=0.61.0 # 内存分析工具
psutil>=5.9.8 # 系统资源监控

View File

@@ -1,5 +1,349 @@
#!/usr/bin/env python3
"""
优化版跨平台 Python 模块编译脚本
将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。
此版本基于对项目结构的深入分析,包含了更多高频使用的模块。
支持的平台:
- Windows: 生成 .pyd 文件
- Linux: 生成 .so 文件
使用方法:
python compile_machine_code.py [options]
选项:
--compile, -c 编译指定的模块(默认)
--list, -l 列出已编译的模块
--clean, -k 清理编译生成的文件
--help, -h 显示帮助信息
注意:
1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC)
2. 需要安装 mypyc: pip install mypyc
3. 编译后的文件是平台相关的,不能跨平台复制
4. 建议在部署的目标环境上运行此脚本
5. Mypyc 不支持动态特性,如 eval/exec/getattr/setattr 等
"""
import os
import sys
import glob
import subprocess
import shutil
import argparse
# 检测当前平台和 Python 版本
PLATFORM = sys.platform
PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" # 例如 "314"
if PLATFORM.startswith('win'):
EXTENSION = '.pyd'
BUILD_PREFIX = f'cp{PYTHON_VERSION}-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-{PYTHON_VERSION}')
elif PLATFORM.startswith('linux'):
EXTENSION = '.so'
BUILD_PREFIX = f'cp{PYTHON_VERSION}-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-{PYTHON_VERSION}')
else:
print(f"不支持的平台: {PLATFORM}")
sys.exit(1)
# 根据项目分析,优化要编译的模块列表
# 这些是项目中使用频率最高的模块,编译后能显著提升性能
MODULES = [
# 工具模块 - 高频使用
'core/utils/json_utils.py', # JSON 处理 - 高频使用
'core/utils/executor.py', # 代码执行引擎 - 高频使用
'core/utils/exceptions.py', # 自定义异常 - 基础组件
'core/utils/performance.py', # 性能监控工具 - 重要组件
'core/utils/logger.py', # 日志模块 - 高频使用
'core/utils/singleton.py', # 单例模式 - 基础组件
# 核心管理模块 - 高频使用
# 'core/managers/command_manager.py', # 指令匹配和分发 - 包含动态特性,不适合编译
# 'core/managers/admin_manager.py', # 管理员管理 - 包含动态特性,不适合编译
# 'core/managers/permission_manager.py', # 权限管理 - 包含动态特性,不适合编译
# 'core/managers/plugin_manager.py', # 插件管理器 - 包含动态特性,不适合编译
# 'core/managers/redis_manager.py', # Redis 管理器 - 包含动态特性,不适合编译
# 'core/managers/image_manager.py', # 图片管理器 - 包含动态特性,不适合编译
# 核心基础模块 - 高频使用
'core/ws.py', # WebSocket 核心 - 核心通信被10个文件引用
# 'core/bot.py', # Bot 核心抽象 - 使用多重继承,不适合编译
'core/config_loader.py', # 配置加载 - 启动必需被7个文件引用
# 'core/config_models.py', # 配置模型 - 包含复杂类型定义,不适合编译
# 'core/permission.py', # 权限枚举 - 包含动态属性,不适合编译
# 数据模型 - 高频使用
'models/message.py', # 消息段模型 - 高频消息处理
'models/sender.py', # 发送者模型 - 高频消息处理
'models/objects.py', # API 响应数据模型 - 高频数据处理
# 事件处理相关 - 高频使用
'core/handlers/event_handler.py', # 事件处理器 - 核心事件处理
# 事件模型 - 高频使用但包含dataclass可能有编译问题暂时排除
# 'models/events/message.py', # 消息事件 - 最高频事件类型
# 'models/events/notice.py', # 通知事件 - 高频事件类型
# 'models/events/request.py', # 请求事件 - 高频事件类型
# 'models/events/meta.py', # 元事件 - 高频事件类型
# 注意:以下文件不适合编译
# - 主程序文件main.py
# - 测试文件tests/目录)
# - 插件文件plugins/目录)
# - 编译脚本compile_machine_code.py等
# - 包含复杂动态特性的文件
# - API 基础类(由于多重继承问题)
]
def list_compiled_modules():
"""列出已编译的模块"""
print(f"\n已编译的 {PLATFORM} 模块:")
print("=" * 50)
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
size = os.path.getsize(f) // 1024 # KB
print(f"{f} ({size} KB)")
else:
print(f"未找到已编译的 {EXTENSION} 文件")
print(f"\n总计: {len(compiled_files)} 个文件")
def clean_compiled_files():
"""清理编译生成的文件"""
print(f"\n清理编译生成的 {EXTENSION} 文件...")
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
try:
os.remove(f)
print(f"已删除: {f}")
except Exception as e:
print(f"删除失败 {f}: {e}")
# 清理 build 目录
if os.path.exists('build'):
try:
shutil.rmtree('build')
print("已删除 build 目录")
except Exception as e:
print(f"删除 build 目录失败: {e}")
else:
print(f"没有可清理的 {EXTENSION} 文件")
def get_platform_specific_module_name(module_path):
"""获取平台特定的模块文件名"""
module_name = module_path.replace('.py', '')
return f"{module_name}.{BUILD_PREFIX}{EXTENSION}"
def compile_module(module_path):
"""编译单个模块"""
print(f"\n编译: {module_path}")
try:
# 直接调用 mypyc 命令行工具
# 使用二进制模式捕获输出以避免编码问题
result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path],
capture_output=True,
check=True
)
# 解码输出时处理可能的编码错误
try:
stdout_text = result.stdout.decode('utf-8', errors='replace')
stderr_text = result.stderr.decode('utf-8', errors='replace')
except AttributeError:
# 如果已经是字符串Python 3.7+),则直接使用
stdout_text = result.stdout
stderr_text = result.stderr
# 获取平台特定的模块名
platform_module = get_platform_specific_module_name(module_path)
mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}')
# 检查编译产物是否在当前目录
if os.path.exists(platform_module):
print(f" ✓ 编译成功: {platform_module}")
return True
else:
# 检查 build 目录中是否有编译产物
build_module_path = os.path.join(BUILD_PATH, platform_module)
build_mypyc_path = os.path.join(BUILD_PATH, mypyc_platform_module)
if os.path.exists(build_module_path):
# 如果在 build 目录中,复制到正确位置
os.makedirs(os.path.dirname(platform_module), exist_ok=True)
shutil.copy2(build_module_path, platform_module)
if os.path.exists(build_mypyc_path):
shutil.copy2(build_mypyc_path, mypyc_platform_module)
print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}")
return True
else:
print(" ✗ 编译失败:找不到编译产物")
if result.stdout:
print(f" 编译输出:{stdout_text[:500]}...")
if result.stderr:
print(f" 错误信息:{stderr_text[:500]}...")
return False
except subprocess.CalledProcessError as e:
print(f" ✗ 编译失败,退出码: {e.returncode}")
if hasattr(e, 'stdout') and e.stdout:
try:
stdout_text = e.stdout.decode('utf-8', errors='replace') if isinstance(e.stdout, bytes) else e.stdout
print(f" 编译输出:{stdout_text[:500]}...")
except Exception:
print(f" 编译输出:{str(e.stdout)[:500]}...")
if hasattr(e, 'stderr') and e.stderr:
try:
stderr_text = e.stderr.decode('utf-8', errors='replace') if isinstance(e.stderr, bytes) else e.stderr
print(f" 错误信息:{stderr_text[:500]}...")
except Exception:
print(f" 错误信息:{str(e.stderr)[:500]}...")
return False
except Exception as e:
print(f" ✗ 编译失败,意外错误: {e}")
import traceback
traceback.print_exc()
return False
def should_skip_module(module_path):
"""检查模块是否应该被跳过编译"""
try:
with open(module_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含抽象基类相关代码
if 'from abc import ABC' in content or 'from abc import abstractmethod' in content:
return True, "包含抽象基类,不适合编译"
# 检查是否包含危险的动态特性
# 注意我们允许基本的动态特性如getattr但对于eval、exec等危险操作仍然阻止
if ('eval(' in content or 'exec(' in content or
'compile(' in content):
return True, "包含危险动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if ('__dict__' in content or '__class__' in content or
'__module__' in content or '__bases__' in content):
return True, "包含复杂动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if '.__dict__' in content or '.__class__' in content:
return True, "包含复杂动态特性,不适合编译"
return False, ""
except Exception as e:
return True, f"读取文件时出错: {e}"
def compile_all_modules():
"""编译所有指定的模块"""
print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})")
print("=" * 60)
# 验证模块文件是否存在并检查是否适合编译
valid_modules = []
skipped_modules = []
for module_path in MODULES:
if os.path.exists(module_path):
should_skip, reason = should_skip_module(module_path)
if should_skip:
print(f"跳过: {module_path} ({reason})")
skipped_modules.append((module_path, reason))
else:
valid_modules.append(module_path)
else:
print(f"警告: 模块 {module_path} 不存在,将被跳过")
print(f"\n有效模块: {len(valid_modules)}, 跳过模块: {len(skipped_modules)}")
if not valid_modules:
print("错误: 没有有效的模块可编译")
return False
# 编译模块
success_count = 0
failed_modules = []
for module_path in valid_modules:
if compile_module(module_path):
success_count += 1
else:
failed_modules.append(module_path)
print("\n" + "=" * 60)
print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功")
if failed_modules:
print(f"失败模块: {failed_modules}")
if success_count == len(valid_modules):
print("✓ 所有模块编译成功")
return True
else:
print("✗ 部分模块编译失败")
return False
def main():
"""主函数"""
# 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor >= 8):
print("警告: 推荐使用 Python 3.8+ 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题")
print()
parser = argparse.ArgumentParser(description='优化版跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()
group.add_argument('--compile', '-c', action='store_true', default=True,
help='编译指定的模块 (默认)')
group.add_argument('--list', '-l', action='store_true',
help='列出已编译的模块')
group.add_argument('--clean', '-k', action='store_true',
help='清理编译生成的文件')
args = parser.parse_args()
# 检查是否安装了 mypyc
try:
import mypyc
except ImportError:
print("错误: 未安装 mypyc请先安装: pip install mypyc")
sys.exit(1)
if args.list:
list_compiled_modules()
elif args.clean:
clean_compiled_files()
else:
compile_all_modules()
print("\n使用 --list 选项查看已编译的模块")
print("使用 --clean 选项清理编译文件")
if __name__ == '__main__':
main()#!/usr/bin/env python3
"""
跨平台 Python 模块编译脚本
将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。
@@ -30,18 +374,16 @@ import subprocess
import shutil
import argparse
# 检测当前平台和 Python 版本
# 检测当前平台
PLATFORM = sys.platform
PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" # 例如 "314"
if PLATFORM.startswith('win'):
EXTENSION = '.pyd'
BUILD_PREFIX = f'cp{PYTHON_VERSION}-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-{PYTHON_VERSION}')
BUILD_PREFIX = 'cp314-win_amd64'
BUILD_PATH = os.path.join('build', f'lib.win-amd64-cpython-314')
elif PLATFORM.startswith('linux'):
EXTENSION = '.so'
BUILD_PREFIX = f'cp{PYTHON_VERSION}-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-{PYTHON_VERSION}')
BUILD_PREFIX = 'cp314-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', f'lib.linux-x86_64-cpython-314')
else:
print(f"不支持的平台: {PLATFORM}")
sys.exit(1)
@@ -265,13 +607,6 @@ def compile_all_modules():
def main():
"""主函数"""
# 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor == 14):
print("警告: 推荐使用 Python 3.14 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题")
print()
parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()

View File

@@ -66,7 +66,7 @@ def main():
if compile_module(module):
success_count += 1
print(f"\n--- Compilation Summary ---")
print("\n--- Compilation Summary ---")
print(f"Total modules: {len(modules)}")
print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(modules) - success_count}")

View File

@@ -10,11 +10,8 @@ Mypyc 编译脚本
2. 编译后的文件 (.pyd 或 .so) 是平台相关的,不能跨平台复制。
3. 建议在部署的目标环境 (Linux) 上运行此脚本。
"""
from distutils.core import setup
from mypyc.build import mypycify
import os
import sys
import glob
import subprocess
# 基础模块列表
@@ -102,7 +99,7 @@ for module_path in valid_modules:
print(f" ✓ Compiled successfully (copied from build directory): {pyd_path}")
success_count += 1
else:
print(f" ✗ Compiled but cannot find pyd file")
print(" ✗ Compiled but cannot find pyd file")
print(f" Build output:\n{result.stdout[:500]}...")
except subprocess.CalledProcessError as e:
print(f" ✗ Compilation failed with exit code {e.returncode}")
@@ -110,7 +107,7 @@ for module_path in valid_modules:
except Exception as e:
print(f" ✗ Unexpected error: {e}")
print(f"\n--- Compilation Summary ---")
print("\n--- Compilation Summary ---")
print(f"Total modules: {len(valid_modules)}")
print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(valid_modules) - success_count}")

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
简单的性能分析功能测试脚本
"""
import asyncio
import time
from core.utils.performance import (
timeit,
profile,
performance_stats
)
print("=" * 80)
print("性能分析功能测试")
print("=" * 80)
# 重置全局性能统计
performance_stats.reset()
# 测试1: 同步函数的时间测量
@timeit
def sync_test():
"""同步测试函数"""
time.sleep(0.1)
return "sync done"
# 测试2: 异步函数的时间测量
@timeit
async def async_test():
"""异步测试函数"""
await asyncio.sleep(0.1)
return "async done"
# 异步主函数
async def main():
# 同步函数测试
print("执行同步函数...")
sync_result = sync_test()
print(f"同步函数结果: {sync_result}")
# 异步函数测试
print("\n执行异步函数...")
async_result = await async_test()
print(f"异步函数结果: {async_result}")
# 测试3: 详细性能分析
print("\n2. 测试性能分析上下文管理器:")
print("=" * 80)
with profile(enabled=False): # 禁用实际分析以避免输出太多
await asyncio.sleep(0.05)
print("性能分析上下文管理器测试完成")
# 测试4: 性能统计报告
print("\n3. 测试性能统计报告:")
print("=" * 80)
# 执行多次函数调用
for _ in range(3):
sync_test()
await async_test()
# 生成并打印性能报告
print("\n性能统计报告:")
print(performance_stats.report())
# 执行测试
print("\n1. 测试时间测量装饰器:")
print("=" * 80)
# 使用 asyncio.run() 执行异步主函数
asyncio.run(main())
print("\n" + "=" * 80)
print("所有测试完成!")
print("=" * 80)

266
tests/test_performance.py Normal file
View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
性能分析工具测试
测试各种性能分析功能的正确性和可用性。
"""
import asyncio
import time
import pytest
from typing import Optional
# 导入性能分析工具
from core.utils.performance import (
timeit,
profile,
aprofile,
memory_profile,
memory_profile_decorator,
performance_monitor,
PerformanceStats,
performance_stats
)
# 重置全局性能统计
def setup_module():
performance_stats.reset()
def teardown_module():
performance_stats.reset()
class TestTimeitDecorator:
"""测试 timeit 装饰器"""
@timeit(log_level=20) # 使用 INFO 级别
def test_sync_function(self):
"""测试同步函数的时间测量"""
time.sleep(0.1)
return "done"
@timeit(log_level=20)
async def test_async_function(self):
"""测试异步函数的时间测量"""
await asyncio.sleep(0.1)
return "done"
def test_sync_function_works(self):
"""验证同步函数能正常执行"""
result = self.test_sync_function()
assert result == "done"
@pytest.mark.asyncio
async def test_async_function_works(self):
"""验证异步函数能正常执行"""
result = await self.test_async_function()
assert result == "done"
class TestProfileContextManager:
"""测试 profile 上下文管理器"""
def test_profile_sync_code(self):
"""测试同步代码的性能分析"""
# 捕获标准输出
import io
import sys
from contextlib import redirect_stdout
f = io.StringIO()
with redirect_stdout(f):
with profile(enabled=False): # 禁用实际分析以提高测试速度
time.sleep(0.01)
output = f.getvalue()
# 应该没有输出(因为 enabled=False
assert "性能分析" not in output
@pytest.mark.asyncio
async def test_aprofile_async_function(self):
"""测试异步函数的性能分析"""
async def async_test():
await asyncio.sleep(0.01)
return "test"
result = await aprofile(async_test)
assert result == "test"
class TestPerformanceMonitor:
"""测试 performance_monitor 装饰器"""
@performance_monitor(threshold=0.05)
def test_slow_sync_function(self):
"""测试慢速同步函数的监控"""
time.sleep(0.1) # 超过阈值
return "slow"
@performance_monitor(threshold=0.05)
def test_fast_sync_function(self):
"""测试快速同步函数的监控"""
time.sleep(0.01) # 低于阈值
return "fast"
@performance_monitor(threshold=0.05)
async def test_slow_async_function(self):
"""测试慢速异步函数的监控"""
await asyncio.sleep(0.1)
return "slow_async"
def test_slow_function_triggers_warning(self):
"""验证慢速函数会触发警告"""
result = self.test_slow_sync_function()
assert result == "slow"
def test_fast_function_no_warning(self):
"""验证快速函数不会触发警告"""
result = self.test_fast_sync_function()
assert result == "fast"
@pytest.mark.asyncio
async def test_slow_async_function_triggers_warning(self):
"""验证慢速异步函数会触发警告"""
result = await self.test_slow_async_function()
assert result == "slow_async"
class TestMemoryAnalysis:
"""测试内存分析功能"""
def test_memory_profile_context_manager(self):
"""测试内存分析上下文管理器"""
# 禁用内存分析以提高测试速度
with memory_profile(enabled=False):
data = [i for i in range(1000)]
sum(data)
@memory_profile_decorator
def test_memory_intensive_function(self):
"""测试内存密集型函数"""
# 小数据集,避免测试耗时过长
data = [i for i in range(1000)]
return sum(data)
def test_memory_function_works(self):
"""验证内存分析函数能正常执行"""
result = self.test_memory_intensive_function()
assert result == 499500
class TestPerformanceStats:
"""测试性能统计功能"""
def test_stats_initialization(self):
"""测试性能统计对象初始化"""
stats = PerformanceStats()
assert isinstance(stats, PerformanceStats)
assert stats.stats == {}
def test_stats_record(self):
"""测试记录性能数据"""
stats = PerformanceStats()
stats.record("test_func", 0.1)
assert "test_func" in stats.stats
assert stats.stats["test_func"]["count"] == 1
assert stats.stats["test_func"]["total_time"] == 0.1
assert stats.stats["test_func"]["avg_time"] == 0.1
def test_stats_report(self):
"""测试生成性能报告"""
stats = PerformanceStats()
stats.record("func1", 0.1)
stats.record("func2", 0.2)
report = stats.report()
assert isinstance(report, str)
assert "func1" in report
assert "func2" in report
def test_stats_reset(self):
"""测试重置性能统计"""
stats = PerformanceStats()
stats.record("test_func", 0.1)
stats.reset()
assert stats.stats == {}
def test_global_stats_recording(self):
"""测试全局性能统计记录"""
# 先重置全局统计
performance_stats.reset()
@timeit(collect_stats=True)
def test_func():
time.sleep(0.01)
test_func()
# 验证是否记录了性能数据
assert "test_func" in performance_stats.stats
assert performance_stats.stats["test_func"]["count"] == 1
class TestIntegration:
"""综合测试"""
@pytest.mark.asyncio
async def test_combined_features(self):
"""测试多种性能分析功能的组合使用"""
# 重置全局统计
performance_stats.reset()
@timeit(collect_stats=True)
@performance_monitor(threshold=0.05)
async def test_async_func():
await asyncio.sleep(0.06) # 超过阈值
return "combined"
result = await test_async_func()
assert result == "combined"
# 验证性能统计
assert "test_async_func" in performance_stats.stats
assert performance_stats.stats["test_async_func"]["count"] == 1
if __name__ == "__main__":
# 运行基本测试
print("开始性能分析功能测试...")
# 测试同步函数
@timeit
def test_sync():
time.sleep(0.1)
return "sync"
# 测试异步函数
@timeit
async def test_async():
await asyncio.sleep(0.1)
return "async"
# 测试性能监控
@performance_monitor(threshold=0.05)
def slow_func():
time.sleep(0.1)
return "slow"
# 运行测试
sync_result = test_sync()
async_result = asyncio.run(test_async())
slow_result = slow_func()
print(f"\n测试结果:")
print(f"sync_result: {sync_result}")
print(f"async_result: {async_result}")
print(f"slow_result: {slow_result}")
# 输出性能统计报告
print("\n性能统计报告:")
print(performance_stats.report())
print("\n性能分析功能测试完成!")