""" NEO Bot 主程序入口 负责启动 WebSocket 连接,初始化插件系统,并提供热重载功能。 """ import asyncio import os import sys import time from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 初始化日志系统,必须在其他 core 模块导入之前执行 from core.utils.logger import logger # 核心模块导入 from core.managers.admin_manager import admin_manager from core.ws import WS from core.managers import plugin_manager, matcher from core.managers.redis_manager import redis_manager from core.managers.browser_manager import browser_manager from core.utils.executor import run_in_thread_pool, initialize_executor from core.config_loader import global_config as config # 检查 JIT 编译状态 def check_jit_status(): """ 检查 Python JIT 编译状态 该函数用于检测当前 Python 解释器是否启用了 JIT 编译功能, 并打印相关信息,帮助用户了解运行环境的性能优化状态。 """ print("\n=== Python JIT 编译状态检查 ===") # 检查解释器信息 print(f"Python 版本: {sys.version}") print(f"解释器路径: {sys.executable}") # 检查优化级别 print(f"优化级别 (-O): {sys.flags.optimize}") # 检查 JIT 相关模块和功能 if sys.version_info >= (3, 10): try: # 对于 CPython 3.10+,检查是否启用了 JIT import _opcode if hasattr(_opcode, 'jit'): print("JIT 状态: 已启用 (_opcode.jit)") else: print("JIT 状态: 未启用 (_opcode.jit 不可用)") except ImportError: print("JIT 状态: 未启用 (_opcode 模块不可用)") else: print("JIT 状态: 不可用 (需要 Python 3.10+)") # 检查是否使用了 PyPy if hasattr(sys, 'pypy_version_info'): print(f"PyPy 版本: {sys.pypy_version_info}") print("JIT 状态: 已启用 (PyPy 内置 JIT)") print("==============================\n") # 执行 JIT 状态检查 check_jit_status() # 尝试使用高性能事件循环 try: if sys.platform == 'win32': # winloop 与 Playwright 存在兼容性问题 (不支持 startupinfo),暂时禁用 # import winloop # asyncio.set_event_loop_policy(winloop.EventLoopPolicy()) # print("已启用 winloop 高性能事件循环") print("Windows 平台检测到 Playwright,已自动禁用 winloop 以确保兼容性") else: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) print("已启用 uvloop 高性能事件循环") except ImportError: print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环") # 将项目根目录添加到 sys.path ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, ROOT_DIR) # 获取插件目录的绝对路径 PLUGIN_DIR = os.path.join(ROOT_DIR, "plugins") async def reload_plugin_and_sync_help(module_name: str): """ 重载插件并同步帮助图片 """ await run_in_thread_pool(plugin_manager.reload_plugin, module_name) # 插件重载后,重新生成帮助图片 await matcher.sync_help_pic() class PluginReloadHandler(FileSystemEventHandler): """ 文件变更处理器,用于热重载插件 继承自 watchdog.events.FileSystemEventHandler, 监听 plugins 目录下的文件变化,并触发插件重载。 """ def __init__(self, loop: asyncio.AbstractEventLoop): """ 初始化处理器 设置冷却时间,并保存主事件循环的引用。 """ self.loop = loop self.last_reload_time = 0 self.cooldown = 1.0 # 冷却时间,防止短时间内多次重载 def on_any_event(self, file_system_event): """ 处理所有文件事件 :param file_system_event: watchdog 事件对象 """ if file_system_event.is_directory: return src_path = file_system_event.src_path # 只监控 py 文件 if not src_path.endswith(".py"): return # 过滤掉一些临时文件和__init__.py文件 if "__pycache__" in src_path or not src_path.startswith(PLUGIN_DIR) or os.path.basename(src_path) == "__init__.py": return # 简单的防抖动 current_time = time.time() if current_time - self.last_reload_time < self.cooldown: return self.last_reload_time = current_time # 从文件路径解析出模块名 # 例如: C:\path\to\project\plugins\bili_parser.py -> plugins.bili_parser relative_path = os.path.relpath(src_path, ROOT_DIR) module_name = os.path.splitext(relative_path.replace(os.sep, '.'))[0] logger.info(f"检测到文件变更: {src_path}") logger.info(f"准备重载插件: {module_name}...") try: # 使用线程安全的方式在主事件循环中运行异步的插件重载函数 asyncio.run_coroutine_threadsafe(run_in_thread_pool(plugin_manager.reload_plugin, module_name), self.loop) logger.success(f"插件 {module_name} 重载任务已提交") except Exception as e: logger.exception(f"重载失败: {e}") @logger.catch async def main(): """ 主函数 1. 启动文件监控(热重载) 2. 初始化 WebSocket 客户端 3. 建立连接并保持运行 """ # 插件加载已移至 core.managers.__init__.py 中自动执行 # 初始化 Redis 连接 await redis_manager.initialize() # 同步帮助图片 await matcher.sync_help_pic() # 初始化管理员管理器 await admin_manager.initialize() # 初始化浏览器管理器 (使用页面池) await browser_manager.init_pool(size=3) # 启动文件监控 # 监控 plugins 目录 plugin_path = os.path.join(os.path.dirname(__file__), "plugins") # 获取当前事件循环并传递给处理器 loop = asyncio.get_running_loop() event_handler = PluginReloadHandler(loop) observer = Observer() if os.path.exists(plugin_path): observer.schedule(event_handler, plugin_path, recursive=True) observer.start() logger.info(f"已启动插件热重载监控: {plugin_path}") else: logger.warning(f"插件目录不存在 {plugin_path}") try: # 初始化代码执行器 code_executor = initialize_executor(config) websocket_client = WS(code_executor=code_executor) # 启动代码执行器的后台 worker logger.debug("[Main] 检查是否需要启动代码执行 Worker...") if code_executor and code_executor.docker_client: logger.info("[Main] Docker 连接成功,正在启动代码执行 Worker...") asyncio.create_task(code_executor.worker()) else: logger.warning("[Main] 未启动代码执行 Worker,因为 Docker 客户端未初始化或连接失败。") await websocket_client.connect() finally: if observer.is_alive(): observer.stop() observer.join() if __name__ == "__main__": """ 程序主入口,添加全局异常捕获和友好提示 """ from core.utils.error_codes import exception_to_error_response from core.utils.logger import ModuleLogger # 创建主程序日志记录器 main_logger = ModuleLogger("Main") try: asyncio.run(main()) except KeyboardInterrupt: main_logger.info("程序已被用户中断") exit(0) except Exception as e: main_logger.exception("程序发生未处理的全局异常") # 生成统一的错误响应 error_response = exception_to_error_response(e) # 打印友好的错误提示 print("\n" + "=" * 60) print("程序发生错误,请检查以下信息:") print("=" * 60) print(f"错误代码: {error_response['code']}") print(f"错误信息: {error_response['message']}") print("=" * 60) print("详细错误信息已记录到日志文件中") print("请检查 logs 目录下的日志文件以获取更多调试信息") print("=" * 60) # 根据错误类型给出不同的建议 if hasattr(e, "original_error") and e.original_error: print(f"\n原始错误: {e.original_error}") if "WebSocket" in str(type(e).__name__): print("\n建议检查:") print("1. WebSocket 服务是否正在运行") print("2. 配置文件中的 WebSocket 地址和令牌是否正确") print("3. 网络连接是否正常") elif "Config" in str(type(e).__name__): print("\n建议检查:") print("1. 配置文件 config.toml 是否存在") print("2. 配置文件格式是否正确") print("3. 所有必填配置项是否都已设置") elif "Plugin" in str(type(e).__name__): print("\n建议检查:") print("1. 插件目录是否存在") print("2. 插件文件是否有语法错误") print("3. 插件是否符合插件开发规范") exit(1)