Files
NeoBot/main.py
K2cr2O1 9f54a98c17 feat: 添加抖音视频解析插件并优化代码结构
添加抖音视频解析插件,支持自动解析抖音分享链接并提取视频信息。优化现有代码结构,包括:
- 重构单例模式实现
- 移除未使用的导入和文件
- 修复性能测试脚本中的异步调用
- 优化消息事件模型中的权限常量定义
- 改进编译脚本的错误处理
- 增强B站解析插件的稳定性

同时清理了多个废弃脚本和临时文件,提升代码可维护性。
2026-01-19 01:16:22 +08:00

220 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
NEO Bot 主程序入口
负责启动 WebSocket 连接,初始化插件系统,并提供热重载功能。
"""
import asyncio
import os
import sys
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 初始化日志系统,必须在其他 core 模块导入之前执行
from core.utils.logger import logger
# 核心模块导入
from core.managers.admin_manager import admin_manager
from core.ws import WS
from core.managers import plugin_manager, matcher
from core.managers.redis_manager import redis_manager
from core.managers.browser_manager import browser_manager
from core.utils.executor import run_in_thread_pool, initialize_executor
from core.config_loader import global_config as config
# 检查 JIT 编译状态
def check_jit_status():
"""
检查 Python JIT 编译状态
该函数用于检测当前 Python 解释器是否启用了 JIT 编译功能,
并打印相关信息,帮助用户了解运行环境的性能优化状态。
"""
print("\n=== Python JIT 编译状态检查 ===")
# 检查解释器信息
print(f"Python 版本: {sys.version}")
print(f"解释器路径: {sys.executable}")
# 检查优化级别
print(f"优化级别 (-O): {sys.flags.optimize}")
# 检查 JIT 相关模块和功能
if sys.version_info >= (3, 10):
try:
# 对于 CPython 3.10+,检查是否启用了 JIT
import _opcode
if hasattr(_opcode, 'jit'):
print("JIT 状态: 已启用 (_opcode.jit)")
else:
print("JIT 状态: 未启用 (_opcode.jit 不可用)")
except ImportError:
print("JIT 状态: 未启用 (_opcode 模块不可用)")
else:
print("JIT 状态: 不可用 (需要 Python 3.10+)")
# 检查是否使用了 PyPy
if hasattr(sys, 'pypy_version_info'):
print(f"PyPy 版本: {sys.pypy_version_info}")
print("JIT 状态: 已启用 (PyPy 内置 JIT)")
print("==============================\n")
# 执行 JIT 状态检查
check_jit_status()
# 尝试使用高性能事件循环
try:
if sys.platform == 'win32':
# winloop 与 Playwright 存在兼容性问题 (不支持 startupinfo),暂时禁用
# import winloop
# asyncio.set_event_loop_policy(winloop.EventLoopPolicy())
# print("已启用 winloop 高性能事件循环")
print("Windows 平台检测到 Playwright已自动禁用 winloop 以确保兼容性")
else:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
print("已启用 uvloop 高性能事件循环")
except ImportError:
print("未检测到高性能事件循环库 (uvloop/winloop),将使用默认事件循环")
# 将项目根目录添加到 sys.path
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT_DIR)
# 获取插件目录的绝对路径
PLUGIN_DIR = os.path.join(ROOT_DIR, "plugins")
async def reload_plugin_and_sync_help(module_name: str):
"""
重载插件并同步帮助图片
"""
await run_in_thread_pool(plugin_manager.reload_plugin, module_name)
# 插件重载后,重新生成帮助图片
await matcher.sync_help_pic()
class PluginReloadHandler(FileSystemEventHandler):
"""
文件变更处理器,用于热重载插件
继承自 watchdog.events.FileSystemEventHandler
监听 plugins 目录下的文件变化,并触发插件重载。
"""
def __init__(self, loop: asyncio.AbstractEventLoop):
"""
初始化处理器
设置冷却时间,并保存主事件循环的引用。
"""
self.loop = loop
self.last_reload_time = 0
self.cooldown = 1.0 # 冷却时间,防止短时间内多次重载
def on_any_event(self, file_system_event):
"""
处理所有文件事件
:param file_system_event: watchdog 事件对象
"""
if file_system_event.is_directory:
return
src_path = file_system_event.src_path
# 只监控 py 文件
if not src_path.endswith(".py"):
return
# 过滤掉一些临时文件
if "__pycache__" in src_path or not src_path.startswith(PLUGIN_DIR):
return
# 简单的防抖动
current_time = time.time()
if current_time - self.last_reload_time < self.cooldown:
return
self.last_reload_time = current_time
# 从文件路径解析出模块名
# 例如: C:\path\to\project\plugins\bili_parser.py -> plugins.bili_parser
relative_path = os.path.relpath(src_path, ROOT_DIR)
module_name = os.path.splitext(relative_path.replace(os.sep, '.'))[0]
logger.info(f"检测到文件变更: {src_path}")
logger.info(f"准备重载插件: {module_name}...")
try:
# 使用线程安全的方式在主事件循环中运行异步的插件重载函数
asyncio.run_coroutine_threadsafe(run_in_thread_pool(plugin_manager.reload_plugin, module_name), self.loop)
logger.success(f"插件 {module_name} 重载任务已提交")
except Exception as e:
logger.exception(f"重载失败: {e}")
@logger.catch
async def main():
"""
主函数
1. 启动文件监控(热重载)
2. 初始化 WebSocket 客户端
3. 建立连接并保持运行
"""
# 插件加载已移至 core.managers.__init__.py 中自动执行
# 初始化 Redis 连接
await redis_manager.initialize()
# 同步帮助图片
await matcher.sync_help_pic()
# 初始化管理员管理器
await admin_manager.initialize()
# 初始化浏览器管理器 (使用页面池)
await browser_manager.init_pool(size=3)
# 启动文件监控
# 监控 plugins 目录
plugin_path = os.path.join(os.path.dirname(__file__), "plugins")
# 获取当前事件循环并传递给处理器
loop = asyncio.get_running_loop()
event_handler = PluginReloadHandler(loop)
observer = Observer()
if os.path.exists(plugin_path):
observer.schedule(event_handler, plugin_path, recursive=True)
observer.start()
logger.info(f"已启动插件热重载监控: {plugin_path}")
else:
logger.warning(f"插件目录不存在 {plugin_path}")
try:
# 初始化代码执行器
code_executor = initialize_executor(config)
websocket_client = WS(code_executor=code_executor)
# 启动代码执行器的后台 worker
logger.debug("[Main] 检查是否需要启动代码执行 Worker...")
if code_executor and code_executor.docker_client:
logger.info("[Main] Docker 连接成功,正在启动代码执行 Worker...")
asyncio.create_task(code_executor.worker())
else:
logger.warning("[Main] 未启动代码执行 Worker因为 Docker 客户端未初始化或连接失败。")
await websocket_client.connect()
finally:
if observer.is_alive():
observer.stop()
observer.join()
if __name__ == "__main__":
asyncio.run(main())