feat: 添加状态监控插件和Redis原子操作支持

- 新增 `/status` 指令,展示机器人运行状态和系统指标
- 实现Redis Lua脚本支持原子化计数器操作
- 添加消息收发统计功能
- 完善文档,包括插件开发和性能优化指南
- 重构WebSocket连接池,增加健康检查机制
- 移除旧版编译脚本,优化项目结构
This commit is contained in:
2026-01-23 15:54:45 +08:00
parent 489dd8c77d
commit d458413e4b
28 changed files with 1529 additions and 1177 deletions

View File

@@ -329,16 +329,24 @@ class WS:
echo_id = str(uuid.uuid4()) echo_id = str(uuid.uuid4())
payload = {"action": action, "params": params or {}, "echo": echo_id} payload = {"action": action, "params": params or {}, "echo": echo_id}
loop = asyncio.get_running_loop()
future = loop.create_future()
self._pending_requests[echo_id] = future
try:
await conn.send(orjson.dumps(payload)) await conn.send(orjson.dumps(payload))
result = await asyncio.wait_for(future, timeout=30.0)
return result # 在当前连接上等待特定 echo 的响应,并设置超时
try:
async def wait_for_response():
async for message in conn.conn:
data = orjson.loads(message)
if data.get("echo") == echo_id:
return data
return await asyncio.wait_for(wait_for_response(), timeout=30.0)
except asyncio.TimeoutError:
raise # 重新抛出超时异常
except Exception as e:
raise WebSocketError(f"在等待API响应时连接出错: {e}")
except asyncio.TimeoutError: except asyncio.TimeoutError:
self._pending_requests.pop(echo_id, None)
self.logger.warning(f"API 调用超时: action={action}, params={params}") self.logger.warning(f"API 调用超时: action={action}, params={params}")
return create_error_response( return create_error_response(
code=ErrorCode.TIMEOUT_ERROR, code=ErrorCode.TIMEOUT_ERROR,
@@ -346,7 +354,6 @@ class WS:
data={"action": action, "params": params} data={"action": action, "params": params}
) )
except Exception as e: except Exception as e:
self._pending_requests.pop(echo_id, None)
self.logger.exception(f"API 调用异常: action={action}, error={str(e)}") self.logger.exception(f"API 调用异常: action={action}, error={str(e)}")
return create_error_response( return create_error_response(
code=ErrorCode.WS_MESSAGE_ERROR, code=ErrorCode.WS_MESSAGE_ERROR,

View File

@@ -5,11 +5,35 @@
状态设置等相关的 OneBot v11 API 封装。 状态设置等相关的 OneBot v11 API 封装。
""" """
import orjson import orjson
from typing import Dict, Any from typing import Dict, Any, Type, TypeVar
from dataclasses import is_dataclass, fields
from .base import BaseAPI from .base import BaseAPI
from models.objects import LoginInfo, VersionInfo, Status from models.objects import LoginInfo, VersionInfo, Status
from ..managers.redis_manager import redis_manager from ..managers.redis_manager import redis_manager
T = TypeVar('T')
def _safe_dataclass_from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
"""
安全地从字典创建 dataclass 实例,忽略多余的键。
"""
if not data:
try:
return cls()
except TypeError:
raise ValueError(f"无法在没有数据的情况下创建 {cls.__name__} 的实例")
# 使用官方的 is_dataclass 进行检查,对 MyPyC 更友好
if not is_dataclass(cls):
raise TypeError(f"{cls.__name__} 不是一个 dataclass")
# 获取 dataclass 的所有字段名
known_fields = {f.name for f in fields(cls)}
# 过滤出 dataclass 认识的键值对
filtered_data = {k: v for k, v in data.items() if k in known_fields}
return cls(**filtered_data)
class AccountAPI(BaseAPI): class AccountAPI(BaseAPI):
""" """
@@ -30,11 +54,11 @@ class AccountAPI(BaseAPI):
if not no_cache: if not no_cache:
cached_data = await redis_manager.get(cache_key) cached_data = await redis_manager.get(cache_key)
if cached_data: if cached_data:
return LoginInfo(**orjson.loads(cached_data)) return _safe_dataclass_from_dict(LoginInfo, orjson.loads(cached_data))
res = await self.call_api("get_login_info") res = await self.call_api("get_login_info")
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return LoginInfo(**res) return _safe_dataclass_from_dict(LoginInfo, res)
async def get_version_info(self) -> VersionInfo: async def get_version_info(self) -> VersionInfo:
""" """
@@ -43,8 +67,8 @@ class AccountAPI(BaseAPI):
Returns: Returns:
VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。 VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。
""" """
res = await self.call_api("get_friend_list") res = await self.call_api("get_version_info")
return VersionInfo(**res) return _safe_dataclass_from_dict(VersionInfo, res)
async def get_status(self) -> Status: async def get_status(self) -> Status:
""" """
@@ -54,7 +78,7 @@ class AccountAPI(BaseAPI):
Status: 包含 OneBot 状态信息的 `Status` 数据对象。 Status: 包含 OneBot 状态信息的 `Status` 数据对象。
""" """
res = await self.call_api("get_status") res = await self.call_api("get_status")
return Status(**res) return _safe_dataclass_from_dict(Status, res)
async def bot_exit(self) -> Dict[str, Any]: async def bot_exit(self) -> Dict[str, Any]:
""" """
@@ -162,56 +186,25 @@ class AccountAPI(BaseAPI):
""" """
return await self.call_api("clean_cache") return await self.call_api("clean_cache")
async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> Any: async def get_profile_like(self) -> Dict[str, Any]:
""" """
获取陌生人信息。 获取个人资料的点赞信息。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_profile_like")
async def nc_get_user_status(self, user_id: int) -> Dict[str, Any]:
"""
获取用户的在线状态 (NapCat 特有 API)。
Args: Args:
user_id (int): 目标用户的 QQ 号。 user_id (int): 目标用户的 QQ 号。
no_cache (bool, optional): 是否不使用缓存。Defaults to False.
Returns: Returns:
Any: 包含陌生人信息的字典或对象 Dict[str, Any]: OneBot API 的响应数据
""" """
return await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) return await self.call_api("nc_get_user_status", {"user_id": user_id})
async def get_friend_list(self, no_cache: bool = False) -> list:
"""
获取好友列表。
Args:
no_cache (bool, optional): 是否不使用缓存。Defaults to False.
Returns:
list: 好友列表。
"""
cache_key = f"neobot:cache:get_friend_list:{self.self_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return orjson.loads(cached_data)
res = await self.call_api("get_friend_list")
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return res
async def get_group_list(self, no_cache: bool = False) -> list:
"""
获取群列表。
Args:
no_cache (bool, optional): 是否不使用缓存。Defaults to False.
Returns:
list: 群列表。
"""
cache_key = f"neobot:cache:get_group_list:{self.self_id}"
if not no_cache:
cached_data = await redis_manager.get(cache_key)
if cached_data:
return orjson.loads(cached_data)
res = await self.call_api("get_group_list")
await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时
return res

View File

@@ -3,6 +3,7 @@ API 基础模块
定义了 API 调用的基础接口和统一处理逻辑。 定义了 API 调用的基础接口和统一处理逻辑。
""" """
import copy
from typing import Any, Dict, Optional, TYPE_CHECKING from typing import Any, Dict, Optional, TYPE_CHECKING
from ..utils.logger import logger from ..utils.logger import logger
@@ -35,7 +36,32 @@ class BaseAPI:
params = {} params = {}
try: try:
logger.debug(f"调用API -> action: {action}, params: {params}") # 日志记录前,对敏感或过长的参数进行处理
log_params = copy.deepcopy(params)
if 'message' in log_params:
if isinstance(log_params['message'], list):
for segment in log_params['message']:
if segment.get('type') == 'image' and 'file' in segment.get('data', {}):
file_data = segment['data']['file']
if file_data.startswith('data:image/'):
segment['data']['file'] = f"{file_data[:50]}... (base64 truncated)"
elif isinstance(log_params['message'], str) and log_params['message'].startswith('data:image/'):
log_params['message'] = f"{log_params['message'][:50]}... (base64 truncated)"
# 如果是发送消息的动作,则原子化地增加发送消息总数
if action in ["send_private_msg", "send_group_msg", "send_msg"]:
from ..managers.redis_manager import redis_manager
try:
lua_script = "return redis.call('INCR', KEYS[1])"
await redis_manager.execute_lua_script(
script=lua_script,
keys=["neobot:stats:messages_sent"],
args=[]
)
except Exception as e:
logger.error(f"发送消息计数失败: {e}")
logger.debug(f"调用API -> action: {action}, params: {log_params}")
response = await self._ws.call_api(action, params) response = await self._ws.call_api(action, params)
logger.debug(f"API响应 <- {response}") logger.debug(f"API响应 <- {response}")

View File

@@ -84,3 +84,76 @@ class FriendAPI(BaseAPI):
""" """
return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark}) return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark})
async def get_friends_with_category(self) -> Dict[str, Any]:
"""
获取带分类的好友列表。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_friends_with_category")
async def get_unidirectional_friend_list(self) -> Dict[str, Any]:
"""
获取单向好友列表。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_unidirectional_friend_list")
async def friend_poke(self, user_id: int) -> Dict[str, Any]:
"""
发送好友戳一戳。
Args:
user_id (int): 目标用户的 QQ 号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("friend_poke", {"user_id": user_id})
async def mark_private_msg_as_read(self, user_id: int, time: int = 0) -> Dict[str, Any]:
"""
标记私聊消息为已读。
Args:
user_id (int): 目标用户的 QQ 号。
time (int, optional): 标记此时间戳之前的消息为已读。Defaults to 0 (全部标记)。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
params = {"user_id": user_id}
if time > 0:
params["time"] = time
return await self.call_api("mark_private_msg_as_read", params)
async def get_friend_msg_history(self, user_id: int, count: int = 20) -> Dict[str, Any]:
"""
获取私聊消息历史记录。
Args:
user_id (int): 目标用户的 QQ 号。
count (int, optional): 要获取的消息数量。Defaults to 20.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_friend_msg_history", {"user_id": user_id, "count": count})
async def forward_friend_single_msg(self, user_id: int, message_id: str) -> Dict[str, Any]:
"""
转发单条好友消息。
Args:
user_id (int): 目标用户的 QQ 号。
message_id (str): 要转发的消息 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("forward_friend_single_msg", {"user_id": user_id, "message_id": message_id})

View File

@@ -282,3 +282,183 @@ class GroupAPI(BaseAPI):
""" """
return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason}) return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason})
async def get_group_info_ex(self, group_id: int) -> Dict[str, Any]:
"""
获取群扩展信息 (NapCat 特有 API)。
Args:
group_id (int): 目标群组的群号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_group_info_ex", {"group_id": group_id})
async def delete_essence_msg(self, message_id: int) -> Dict[str, Any]:
"""
删除精华消息。
Args:
message_id (int): 目标消息的 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("delete_essence_msg", {"message_id": message_id})
async def group_poke(self, group_id: int, user_id: int) -> Dict[str, Any]:
"""
在群内发送 "戳一戳"
Args:
group_id (int): 目标群组的群号。
user_id (int): 目标成员的 QQ 号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("group_poke", {"group_id": group_id, "user_id": user_id})
async def mark_group_msg_as_read(self, group_id: int, time: int = 0) -> Dict[str, Any]:
"""
标记群消息为已读。
Args:
group_id (int): 目标群组的群号。
time (int, optional): 标记此时间戳之前的消息为已读。Defaults to 0 (全部标记)。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
params = {"group_id": group_id}
if time > 0:
params["time"] = time
return await self.call_api("mark_group_msg_as_read", params)
async def forward_group_single_msg(self, group_id: int, message_id: str) -> Dict[str, Any]:
"""
转发单条群消息。
Args:
group_id (int): 目标群组的群号。
message_id (str): 要转发的消息 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("forward_group_single_msg", {"group_id": group_id, "message_id": message_id})
async def set_group_portrait(self, group_id: int, file: str, cache: int = 1) -> Dict[str, Any]:
"""
设置群头像。
Args:
group_id (int): 目标群组的群号。
file (str): 图片文件的路径或 URL 或 Base64。
cache (int, optional): 是否使用缓存 (1: 是, 0: 否)。Defaults to 1.
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_portrait", {"group_id": group_id, "file": file, "cache": cache})
async def _send_group_notice(self, group_id: int, content: str, **kwargs) -> Dict[str, Any]:
"""
发送群公告。
Args:
group_id (int): 目标群组的群号。
content (str): 公告内容。
**kwargs: 其他可选参数 (如 image)。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
params = {"group_id": group_id, "content": content}
params.update(kwargs)
return await self.call_api("_send_group_notice", params)
async def _get_group_notice(self, group_id: int) -> Dict[str, Any]:
"""
获取群公告。
Args:
group_id (int): 目标群组的群号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("_get_group_notice", {"group_id": group_id})
async def _del_group_notice(self, group_id: int, notice_id: str) -> Dict[str, Any]:
"""
删除群公告。
Args:
group_id (int): 目标群组的群号。
notice_id (str): 公告 ID。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("_del_group_notice", {"group_id": group_id, "notice_id": notice_id})
async def get_group_at_all_remain(self, group_id: int) -> Dict[str, Any]:
"""
获取 @全体成员 的剩余次数。
Args:
group_id (int): 目标群组的群号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_group_at_all_remain", {"group_id": group_id})
async def get_group_system_msg(self) -> Dict[str, Any]:
"""
获取群系统消息。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_group_system_msg")
async def get_group_shut_list(self, group_id: int) -> Dict[str, Any]:
"""
获取群禁言列表。
Args:
group_id (int): 目标群组的群号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("get_group_shut_list", {"group_id": group_id})
async def set_group_remark(self, group_id: int, remark: str) -> Dict[str, Any]:
"""
设置群备注。
Args:
group_id (int): 目标群组的群号。
remark (str): 要设置的备注。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_remark", {"group_id": group_id, "remark": remark})
async def set_group_sign(self, group_id: int) -> Dict[str, Any]:
"""
设置群签到。
Args:
group_id (int): 目标群组的群号。
Returns:
Dict[str, Any]: OneBot API 的响应数据。
"""
return await self.call_api("set_group_sign", {"group_id": group_id})

View File

@@ -37,3 +37,13 @@ class MediaAPI(BaseAPI):
:return: OneBot v11标准响应 :return: OneBot v11标准响应
""" """
return await self.call_api(action="get_image", params={"file": file}) return await self.call_api(action="get_image", params={"file": file})
async def get_file(self, file_id: str) -> Dict[str, Any]:
"""
获取文件信息
:param file_id: 文件ID
:return: OneBot v11标准响应
"""
return await self.call_api(action="get_file", params={"file_id": file_id})

View File

@@ -127,6 +127,19 @@ class MessageHandler(BaseHandler):
""" """
处理消息事件,分发给命令处理器或通用消息处理器 处理消息事件,分发给命令处理器或通用消息处理器
""" """
# 原子化地增加接收消息总数
from ..managers.redis_manager import redis_manager
from ..utils.logger import logger
try:
lua_script = "return redis.call('INCR', KEYS[1])"
await redis_manager.execute_lua_script(
script=lua_script,
keys=["neobot:stats:messages_received"],
args=[]
)
except Exception as e:
logger.error(f"接收消息计数失败: {e}")
from ..managers import permission_manager from ..managers import permission_manager
for handler_info in self.message_handlers: for handler_info in self.message_handlers:
consumed = await self._run_handler(handler_info["func"], bot, event) consumed = await self._run_handler(handler_info["func"], bot, event)
@@ -165,6 +178,19 @@ class MessageHandler(BaseHandler):
await bot.send(event, message_template.format(permission_name=permission_name)) await bot.send(event, message_template.format(permission_name=permission_name))
return return
# 在执行指令前,原子化地增加指令调用次数
from ..managers.redis_manager import redis_manager
from ..utils.logger import logger
try:
lua_script = "return redis.call('HINCRBY', KEYS[1], ARGV[1], 1)"
await redis_manager.execute_lua_script(
script=lua_script,
keys=["neobot:command_stats"],
args=[command_name]
)
except Exception as e:
logger.error(f"指令 /{command_name} 调用次数统计失败: {e}")
await self._run_handler( await self._run_handler(
func, func,
bot, bot,

View File

@@ -122,7 +122,13 @@ class ImageManager(Singleton):
content = f.read() content = f.read()
mime_type = "image/jpeg" if image_type == "jpeg" else "image/png" mime_type = "image/jpeg" if image_type == "jpeg" else "image/png"
return f"data:{mime_type};base64," + base64.b64encode(content).decode("utf-8") base64_str = base64.b64encode(content).decode("utf-8")
# 记录摘要日志,避免刷屏
log_message = f"Base64 图片已生成 (MIME: {mime_type}, Size: {len(base64_str)/1024:.2f} KB, Preview: {base64_str[:30]}...{base64_str[-30:]})"
logger.debug(log_message)
return f"data:{mime_type};base64," + base64_str
except Exception as e: except Exception as e:
logger.error(f"读取图片文件失败: {e}") logger.error(f"读取图片文件失败: {e}")
return None return None

View File

@@ -412,7 +412,7 @@ class PermissionManager(Singleton):
""" """
try: try:
# 创建空的权限数据 # 创建空的权限数据
empty_data = {"users": {}} empty_data: Dict[str, Dict] = {"users": {}}
# 原子性写入文件 # 原子性写入文件
temp_file = self.data_file + ".tmp" temp_file = self.data_file + ".tmp"

View File

@@ -67,5 +67,27 @@ class RedisManager(Singleton):
""" """
return await self.redis.set(name, value, ex=ex) return await self.redis.set(name, value, ex=ex)
async def execute_lua_script(self, script: str, keys: list, args: list):
"""
以原子方式执行 Lua 脚本
Args:
script (str): 要执行的 Lua 脚本字符串
keys (list): 脚本中使用的 Redis 键 (KEYS[1], KEYS[2], ...)
args (list): 传递给脚本的参数 (ARGV[1], ARGV[2], ...)
Returns:
Any: 脚本的返回值
"""
try:
# redis-py 内部会自动处理脚本的缓存 (EVAL/EVALSHA)
lua_script = self.redis.register_script(script)
return await lua_script(keys=keys, args=args)
except Exception as e:
logger.error(f"执行 Lua 脚本失败: {e}")
logger.debug(f"脚本内容: {script}")
raise
# 全局 Redis 管理器实例 # 全局 Redis 管理器实例
redis_manager = RedisManager() redis_manager = RedisManager()

View File

@@ -7,7 +7,7 @@ WebSocket 连接池模块
import asyncio import asyncio
import websockets import websockets
from websockets.legacy.client import WebSocketClientProtocol from websockets.legacy.client import WebSocketClientProtocol
from typing import Optional, Dict, Any, cast from typing import Optional, Dict, Any, cast, Union
import uuid import uuid
from loguru import logger from loguru import logger
@@ -28,7 +28,7 @@ class WSConnection:
self.is_active = True self.is_active = True
self._pending_requests: Dict[str, asyncio.Future] = {} self._pending_requests: Dict[str, asyncio.Future] = {}
async def send(self, data: dict): async def send(self, data: Union[Dict[Any, Any], bytes]):
""" """
发送数据到 WebSocket 连接 发送数据到 WebSocket 连接
""" """
@@ -57,6 +57,19 @@ class WSConnection:
self.is_active = False self.is_active = False
raise WebSocketError(f"接收数据失败: {e}") raise WebSocketError(f"接收数据失败: {e}")
async def ping(self, timeout: int = 5) -> bool:
"""
对 WebSocket 连接执行 ping-pong 健康检查
"""
if not self.is_active:
return False
try:
await asyncio.wait_for(self.conn.ping(), timeout=timeout)
return True
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
self.is_active = False
return False
async def close(self): async def close(self):
""" """
关闭 WebSocket 连接 关闭 WebSocket 连接
@@ -132,7 +145,7 @@ class WSConnectionPool:
async def get_connection(self) -> WSConnection: async def get_connection(self) -> WSConnection:
""" """
从连接池获取一个连接 从连接池获取一个健康的连接,包含健康检查。
""" """
if self._closed: if self._closed:
raise WebSocketError("连接池已关闭") raise WebSocketError("连接池已关闭")
@@ -141,18 +154,21 @@ class WSConnectionPool:
# 尝试从连接池获取连接 # 尝试从连接池获取连接
conn = await asyncio.wait_for(self.pool.get(), timeout=5) conn = await asyncio.wait_for(self.pool.get(), timeout=5)
# 检查连接是否活跃 # 健康检查
if not conn.is_active: if await conn.ping():
logger.warning(f"连接 {conn.conn_id} 已失效,重新创建") logger.debug(f"连接 {conn.conn_id} 健康检查通过")
return await self._create_connection()
return conn return conn
else:
logger.warning(f"连接 {conn.conn_id} 健康检查失败,丢弃并获取新连接")
await conn.close()
return await self.get_connection() # 递归获取下一个
except asyncio.TimeoutError: except asyncio.TimeoutError:
# 连接池为空,创建新连接 # 连接池为空,创建新连接
logger.warning("连接池为空,创建临时连接") logger.warning("连接池在5秒内无可用连接,创建连接")
return await self._create_connection() return await self._create_connection()
except Exception as e: except Exception as e:
raise WebSocketError(f"获取连接失败: {e}") raise WebSocketError(f"获取连接时发生未知错误: {e}")
async def release_connection(self, conn: WSConnection): async def release_connection(self, conn: WSConnection):
""" """

View File

@@ -77,6 +77,32 @@ print(f"正常: {status.good}")
- `status`: 状态描述 - `status`: 状态描述
- `good`: 运行是否正常 - `good`: 运行是否正常
### `get_profile_like` - 获取资料点赞信息
```python
async def get_profile_like(self) -> Dict[str, Any]
```
获取个人资料的点赞信息。
**返回值:**
- 包含点赞信息的字典
### `nc_get_user_status` - 获取用户在线状态 (NapCat)
```python
async def nc_get_user_status(self, user_id: int) -> Dict[str, Any]
```
获取指定用户的在线状态NapCatQQ 特有 API
**参数:**
- `user_id`: 目标用户的 QQ 号
**返回值:**
- 包含用户状态信息的字典
## 状态设置 ## 状态设置
### `set_self_longnick` - 设置个性签名 ### `set_self_longnick` - 设置个性签名
@@ -381,16 +407,6 @@ async def handle_restore_profile(event: MessageEvent, args: str):
3. **客户端支持**: 不是所有 OneBot 客户端都支持全部 API使用前最好测试一下。 3. **客户端支持**: 不是所有 OneBot 客户端都支持全部 API使用前最好测试一下。
4. **谨慎操作**: `bot_exit` 会让机器人下线,谨慎使用。 4. **谨慎操作**: `bot_exit` 会让机器人下线,谨慎使用。
## 重复的方法
`AccountAPI` 中还包含了一些与好友、群组相关的方法,这些方法在其他模块中也有定义:
- `get_stranger_info()`: 同 [好友 API](./friend.md#get_stranger_info---获取陌生人信息)
- `get_friend_list()`: 同 [好友 API](./friend.md#get_friend_list---获取好友列表)
- `get_group_list()`: 同 [群组 API](./group.md#get_group_list---获取群列表)
这些方法在 `AccountAPI` 中的实现可能略有不同(比如缓存逻辑),但功能相同。建议使用对应模块中的版本,因为那些是专门为该功能设计的。
## 下一步 ## 下一步
- [好友 API](./friend.md): 管理好友相关功能 - [好友 API](./friend.md): 管理好友相关功能

View File

@@ -81,6 +81,28 @@ async def handle_who(event: MessageEvent, args: str):
- `level`: QQ 等级 - `level`: QQ 等级
- 其他可能的信息字段 - 其他可能的信息字段
### `get_friends_with_category` - 获取分类好友列表
```python
async def get_friends_with_category(self) -> Dict[str, Any]
```
获取带分组信息的好友列表。
**返回值:**
- 包含分组和好友信息的字典
### `get_unidirectional_friend_list` - 获取单向好友列表
```python
async def get_unidirectional_friend_list(self) -> Dict[str, Any]
```
获取单向好友(你加了对方,对方没加你)的列表。
**返回值:**
- 单向好友列表
## 互动功能 ## 互动功能
### `send_like` - 发送点赞(戳一戳) ### `send_like` - 发送点赞(戳一戳)
@@ -119,6 +141,55 @@ async def handle_like(event: MessageEvent, args: str):
- 有每日次数限制,不要滥用 - 有每日次数限制,不要滥用
- 对方可能关闭了"戳一戳"功能,这时会失败 - 对方可能关闭了"戳一戳"功能,这时会失败
### `friend_poke` - 发送好友戳一戳 (新)
```python
async def friend_poke(self, user_id: int) -> Dict[str, Any]
```
对指定好友发送"戳一戳"(比 `send_like` 更通用的接口)。
**参数:**
- `user_id`: 目标用户的 QQ 号
## 消息历史与状态
### `mark_private_msg_as_read` - 标记私聊已读
```python
async def mark_private_msg_as_read(self, user_id: int, time: int = 0) -> Dict[str, Any]
```
将与指定用户的私聊消息标记为已读。
**参数:**
- `user_id`: 目标用户的 QQ 号
- `time`: 将此时间戳(秒)之前的消息标记为已读,传 `0` 表示全部标记
### `get_friend_msg_history` - 获取私聊历史
```python
async def get_friend_msg_history(self, user_id: int, count: int = 20) -> Dict[str, Any]
```
获取与指定用户的私聊历史记录。
**参数:**
- `user_id`: 目标用户的 QQ 号
- `count`: 要获取的消息数量,默认 20
### `forward_friend_single_msg` - 转发单条消息
```python
async def forward_friend_single_msg(self, user_id: int, message_id: str) -> Dict[str, Any]
```
将一条消息转发给指定好友。
**参数:**
- `user_id`: 接收消息的好友 QQ 号
- `message_id`: 要转发的消息的 ID
## 加好友请求处理 ## 加好友请求处理
### `set_friend_add_request` - 处理加好友请求 ### `set_friend_add_request` - 处理加好友请求

View File

@@ -408,6 +408,181 @@ honor = await bot.get_group_honor_info(123456, "talkative")
print(f"本周龙王: {honor.current_talkative.user_id}") print(f"本周龙王: {honor.current_talkative.user_id}")
``` ```
### `get_group_info_ex` - 获取群扩展信息 (NapCat)
```python
async def get_group_info_ex(self, group_id: int) -> Dict[str, Any]
```
获取群的扩展信息NapCatQQ 特有 API
**参数:**
- `group_id`: 群号
**返回值:**
- 包含群扩展信息的字典
## 精华消息
### `delete_essence_msg` - 删除精华消息
```python
async def delete_essence_msg(self, message_id: int) -> Dict[str, Any]
```
删除一条精华消息。
**参数:**
- `message_id`: 目标消息的 ID
## 互动与状态
### `group_poke` - 群内戳一戳
```python
async def group_poke(self, group_id: int, user_id: int) -> Dict[str, Any]
```
在群内对指定成员发送"戳一戳"。
**参数:**
- `group_id`: 群号
- `user_id`: 目标成员的 QQ 号
### `mark_group_msg_as_read` - 标记群消息已读
```python
async def mark_group_msg_as_read(self, group_id: int, time: int = 0) -> Dict[str, Any]
```
将指定群聊的消息标记为已读。
**参数:**
- `group_id`: 群号
- `time`: 将此时间戳(秒)之前的消息标记为已读,传 `0` 表示全部标记
## 消息转发
### `forward_group_single_msg` - 转发单条群消息
```python
async def forward_group_single_msg(self, group_id: int, message_id: str) -> Dict[str, Any]
```
将一条群消息转发到当前群聊。
**参数:**
- `group_id`: 群号
- `message_id`: 要转发的消息的 ID
## 群设置 (高级)
### `set_group_portrait` - 设置群头像
```python
async def set_group_portrait(self, group_id: int, file: str, cache: int = 1) -> Dict[str, Any]
```
设置群头像。
**参数:**
- `group_id`: 群号
- `file`: 图片文件的路径、URL 或 Base64 字符串
- `cache`: 是否使用缓存(`1` 是,`0` 否)
### `set_group_remark` - 设置群备注
```python
async def set_group_remark(self, group_id: int, remark: str) -> Dict[str, Any]
```
设置群备注NapCatQQ 特有 API
**参数:**
- `group_id`: 群号
- `remark`: 要设置的备注
### `set_group_sign` - 群签到
```python
async def set_group_sign(self, group_id: int) -> Dict[str, Any]
```
在指定群聊中进行签到。
**参数:**
- `group_id`: 群号
## 群公告
### `_send_group_notice` - 发送群公告
```python
async def _send_group_notice(self, group_id: int, content: str, **kwargs) -> Dict[str, Any]
```
发送群公告。
**参数:**
- `group_id`: 群号
- `content`: 公告内容
- `**kwargs`: 其他可选参数,如 `image`
### `_get_group_notice` - 获取群公告
```python
async def _get_group_notice(self, group_id: int) -> Dict[str, Any]
```
获取群公告列表。
**参数:**
- `group_id`: 群号
### `_del_group_notice` - 删除群公告
```python
async def _del_group_notice(self, group_id: int, notice_id: str) -> Dict[str, Any]
```
删除指定的群公告。
**参数:**
- `group_id`: 群号
- `notice_id`: 要删除的公告的 ID
## 其他信息获取
### `get_group_at_all_remain` - 获取@全体剩余次数
```python
async def get_group_at_all_remain(self, group_id: int) -> Dict[str, Any]
```
获取当天在指定群聊中 @全体成员 的剩余次数。
**参数:**
- `group_id`: 群号
### `get_group_system_msg` - 获取群系统消息
```python
async def get_group_system_msg(self) -> Dict[str, Any]
```
获取群系统消息(如加群请求、退群通知等)。
### `get_group_shut_list` - 获取群禁言列表
```python
async def get_group_shut_list(self, group_id: int) -> Dict[str, Any]
```
获取被禁言的群成员列表。
**参数:**
- `group_id`: 群号
## 加群请求处理 ## 加群请求处理
### `set_group_add_request` - 处理加群请求/邀请 ### `set_group_add_request` - 处理加群请求/邀请

View File

@@ -85,6 +85,20 @@ async def handle_imageinfo(event: MessageEvent):
await event.reply("消息中没有图片") await event.reply("消息中没有图片")
``` ```
### `get_file` - 获取文件信息
```python
async def get_file(self, file_id: str) -> Dict[str, Any]
```
获取文件的详细信息比如文件名、大小、URL 等。
**参数:**
- `file_id`: 文件 ID通常从群文件上传事件中获取
**返回值:**
- 包含文件信息的字典
## 实际应用示例 ## 实际应用示例
### 图片转发器 ### 图片转发器

View File

@@ -119,4 +119,22 @@ python setup_mypyc.py
- **当前状态**:为了确保稳定性,`setup_mypyc.py` 脚本**默认不编译** `models/events/` 目录下的事件模型文件。这些文件虽然也被频繁使用,但它们的结构相对复杂,与 `Mypyc` 的兼容性问题仍在探索中。 - **当前状态**:为了确保稳定性,`setup_mypyc.py` 脚本**默认不编译** `models/events/` 目录下的事件模型文件。这些文件虽然也被频繁使用,但它们的结构相对复杂,与 `Mypyc` 的兼容性问题仍在探索中。
- **未来展望**:我们会持续关注 `Mypyc` 的更新,当其对 `dataclass` 的支持得到改善后,会重新尝试将事件模型加入编译列表,以实现极致的性能。 - **未来展望**:我们会持续关注 `Mypyc` 的更新,当其对 `dataclass` 的支持得到改善后,会重新尝试将事件模型加入编译列表,以实现极致的性能。
## 7. 健壮的 WebSocket 连接池
### 痛点
在高并发或网络不稳定的情况下,单个 WebSocket 连接可能会因为各种原因(如超时、服务器重启、网络波动)而中断或变得不可靠。如果框架依赖于单一的、不稳定的连接,会导致 API 调用频繁失败,甚至整个机器人无响应。
### 解决方案
`NeoBot` 实现了一个健壮的 `WebSocket 连接池` (`core/ws_pool.py`),它不仅管理多个连接,还具备智能的健康检查和恢复机制。
- **多连接管理**: 启动时会建立一个包含多个 WebSocket 连接的池API 调用会被分发到这些连接上,实现负载均衡。
- **自动健康检查**: 连接池会定期对池中的每个连接进行健康检查(发送 `get_status` 心跳包)。如果一个连接在规定时间内没有响应,它会被标记为“不健康”。
- **故障转移与恢复**: 当一个 API 调用需要使用连接时,连接池会自动选择一个“健康”的连接。如果所有连接都不健康,它会尝试重新建立新的连接,直到成功为止。
- **无感切换**: 对于上层调用者(如插件开发者)来说,这一切都是透明的。你只需要正常调用 `bot.call_api()`,连接池会在底层处理好所有的连接问题。
### 收益
- **高可用性**: 即使部分连接失效,机器人依然可以通过健康的连接继续提供服务,大大减少了因网络问题导致的停机时间。
- **高并发性能**: 通过连接池,多个 API 请求可以并行地通过不同的连接发送,提高了在高并发场景下的吞吐量。
- **自动恢复**: 无需手动重启机器人,连接池能够自动从网络故障中恢复,增强了系统的稳定性和无人值守能力。
通过这种方式,我们在保证核心模块性能的同时,也维持了项目的稳定性和可维护性。 通过这种方式,我们在保证核心模块性能的同时,也维持了项目的稳定性和可维护性。

View File

@@ -133,3 +133,42 @@ if await permission_manager.check_permission(user_id, Permission.ADMIN):
## 总结 ## 总结
通过以文件为权威数据源、Redis 为缓存层的设计结合原子操作机制NEO Bot 的权限管理系统在保证数据可靠性的同时,提供了高性能的访问能力。这种设计既满足了数据一致性的要求,又兼顾了系统性能的需求。 通过以文件为权威数据源、Redis 为缓存层的设计结合原子操作机制NEO Bot 的权限管理系统在保证数据可靠性的同时,提供了高性能的访问能力。这种设计既满足了数据一致性的要求,又兼顾了系统性能的需求。
## 扩展应用:指令调用统计
除了权限管理,原子操作的思想也应用在了指令调用统计中,但实现方式更为高效。
### 痛点
如果每次调用指令都执行 `GET` -> `(本地+1)` -> `SET` 的流程在高并发下会产生“竞争条件”Race Condition导致计数不准确。例如两个请求同时读取到计数值 10各自加一后都写回 11而正确的结果应该是 12。呵呵其实是看到zmd事件紧急添加的功能
### 解决方案Lua 脚本
`NeoBot` 使用 Redis 的 `EVAL` 命令执行一个 Lua 脚本来实现原子化的计数器。
```lua
-- Lua 脚本 (简化版)
local current = redis.call('HGET', KEYS[1], ARGV[1])
local count = tonumber(current) or 0
count = count + 1
redis.call('HSET', KEYS[1], ARGV[1], count)
return count
```
- **原子性**: Redis 会保证整个 Lua 脚本的执行是原子性的,执行期间不会被其他命令打断。
- **高效性**: 将多个操作(读取、计算、写入)在 Redis 服务器端一次性完成,减少了网络往返的开销。
### 核心实现
`RedisManager` 中,我们封装了 `execute_lua_script` 方法,使得在 Python 中调用 Lua 脚本变得非常简单。
```python
# Python 调用示例
await redis_manager.execute_lua_script(
"atomic_hincrby.lua",
keys=["neobot:stats:command_usage"],
args=[command_name]
)
```
### 收益
- **数据准确性**: 彻底杜绝了高并发下的计数错误问题。
- **高性能**: 相比于传统的“读取-修改-写入”模式,使用 Lua 脚本能显著提升性能,特别是在指令调用这种高频场景下。
- **可扩展性**: 这种模式可以轻松应用于其他需要原子操作的场景,如频率限制、资源池管理等。

View File

@@ -1,357 +1,144 @@
# NEO Bot 开发规范与公约 # NeoBot 开发规范
写代码很简单,但写出**高性能、不炸裂、好维护**的代码需要遵守规矩 本文档为 `NeoBot` 项目的官方开发规范,旨在确保代码的高性能、高可读性和高可维护性。所有贡献者都应遵循这些规范
本文档定义了 NEO Bot 项目的开发守则、编码公约、注意事项和代码规范。所有贡献者和插件开发者都**必须**遵循这些规范,确保机器人稳定运行、代码质量统一 本文档以 [PEP 8 -- Style Guide for Python Code](https://peps.python.org/pep-0008/) 为基础,并在此之上补充了针对本项目的特定约定
> 如果你觉得规范太麻烦,可以问问镀铬酸钾,他会给你一对一教学。。。但最好还是遵守规矩。 ## 核心开发原则
**补充阅读** ### 1. 异步优先
- [插件开发最佳实践](./plugin-development/best-practices.md) - 必读!写插件的基本规矩 **永远不要阻塞事件循环**。任何同步阻塞操作(如 `time.sleep()`、同步网络请求、大规模文件读写)都会导致整个机器人框架卡死。
- [项目结构](./project-structure.md) - 了解代码组织
- [核心概念](./core-concepts/architecture.md) - 理解框架设计
## 1. 开发守则(基本原则) - **应当**: 使用 `asyncio.sleep()`、异步库(如 `aiohttp`),并通过 `asyncio.to_thread``run_in_executor` 将同步代码移出主事件循环。
- **禁止**: 直接在异步函数中使用任何可能阻塞的同步调用。
### 1.1 异步优先原则 ### 2. 资源管理
- **绝对不要阻塞事件循环**NeoBot 采用单线程异步架构,任何同步阻塞操作都会导致整个机器人卡死 **复用优于重建**。频繁创建和销毁资源(如网络连接、浏览器页面)会严重影响性能
- **禁止**`time.sleep()`、同步 `requests`、密集 CPU 计算
- **必须**:使用 `await asyncio.sleep()`、异步 HTTP 客户端、线程池执行同步任务
- **异步任务处理**:长时间运行的任务应使用 `run_in_thread_pool``asyncio.create_task` 执行,避免阻塞主循环 - **应当**: 通过框架提供的单例管理器(如 `redis_manager`, `browser_manager`)获取和管理资源
- **禁止**: 自行实例化管理器或在插件中创建独立的资源实例(如 `aiohttp.ClientSession`)。
### 1.2 资源管理原则 ### 3. 错误处理
- **连接复用**:禁止重复创建连接和资源实例 **健壮性是第一要务**。插件的异常不应影响框架的稳定运行
- HTTP 请求:使用全局 `aiohttp` session 或插件提供的 `get_session()`
- 浏览器操作:必须通过 `browser_manager.get_page()` 获取页面实例
- Redis 连接:通过 `redis_manager` 单例访问
- **资源池化**:浏览器页面、数据库连接等资源必须使用框架提供的池化机制 - **应当**: 在插件和业务逻辑中进行充分的 `try...except` 异常捕获,并向用户返回友好的错误提示
- **禁止**: 抛出未被捕获的异常,或向用户暴露原始的错误堆栈信息。
### 1.3 性能优化原则 ### 4. 跨平台兼容性
- **缓存策略**:频繁访问的外部数据必须添加缓存 代码必须同时兼容 **Windows开发环境****Linux生产环境**
- 短期缓存(<1小时使用 Redis 或内存缓存
- 长期缓存:考虑持久化存储
- **懒加载**:大型资源或初始化成本高的组件应延迟加载 - **应当**: 使用 `pathlib.Path` 处理文件路径,它能自动处理不同操作系统的路径分隔符
- **禁止**: 硬编码路径分隔符(如 `"data\\temp"``"data/temp"`)。
### 1.4 错误处理原则 ## 代码风格规范
- **异常捕获**:所有插件代码都应妥善处理异常,避免插件崩溃影响机器人运行。
- **友好提示**:向用户返回清晰、友好的错误信息,避免暴露内部细节。
- **日志记录**:所有重要操作和错误都应记录日志,使用 `ModuleLogger` 进行结构化日志记录。
### 1.5 安全性原则 ### 1. 命名规范 (PEP 8)
- **输入验证**:所有用户输入都必须验证和清理,防止注入攻击。 - **模块 (Module)**: `lower_case_with_underscores.py`
- **代码执行安全**:使用沙箱环境执行用户代码,隔离系统资源。 - **包 (Package)**: `lower_case_with_underscores`
- **权限控制**:严格遵循权限管理系统,禁止越权操作。 - **类 (Class)**: `PascalCase`
- **函数 (Function) / 方法 (Method) / 变量 (Variable)**: `snake_case`
- **常量 (Constant)**: `UPPER_SNAKE_CASE`
- **私有成员**: 以单下划线 `_` 开头。
### 1.6 跨平台兼容性原则 ### 2. 类型提示 (PEP 484)
NEO Bot 需要在 **Windows 开发环境**和 **Linux 生产环境**中都能正常运行 **所有函数和方法的签名都必须包含类型提示**。这是强制性要求,因为它对 `Mypyc` 编译和代码可读性至关重要
- **路径处理** - **应当**: 明确指定所有参数和返回值的类型。对于可能返回 `None` 的情况,使用 `Optional[...]`
- 使用 `pathlib.Path` 处理文件路径,避免手动拼接字符串。 - **示例**:
- 使用 `/` 作为路径分隔符Python 会自动转换)。
- 禁止使用硬编码的路径分隔符(如 `\\``/`)。
- **系统依赖**
- 避免使用平台特定的系统调用。
- 如果必须使用,通过 `sys.platform` 检测平台并提供备选方案。
- **环境变量**
- 通过 `global_config` 获取配置,而不是直接读取环境变量。
- 敏感信息(如 API 密钥)必须通过配置管理。
- **文件权限**
- 在 Linux 上注意文件权限设置,确保 Bot 有读写权限。
- 临时文件应放在系统临时目录(`tempfile.gettempdir()`)。
## 2. 公约(编码约定)
### 2.1 项目结构公约
- **插件位置**:所有插件必须放置在 `plugins/` 目录下,单个 `.py` 文件或包含 `__init__.py` 的目录。
- **模块导入**:遵循标准导入顺序:标准库 → 第三方库 → 本地模块。
- **配置访问**:通过 `global_config` 单例访问配置,禁止硬编码配置值。
### 2.2 单例管理器使用公约
NEO Bot 的核心是**单例管理器**`core/managers/` 目录下的类)。所有全局资源都必须通过管理器访问。
- **禁止重复创建**:严禁自己实例化管理器类,必须通过导入的单例对象访问。
-`from core.managers.redis_manager import redis_manager`
-`RedisManager()` (错误!会创建新实例)
- **资源池化**:浏览器页面、数据库连接等资源必须使用管理器提供的池化接口。
-`await browser_manager.get_page()`
-`playwright.chromium.launch()` (错误!会创建新浏览器进程)
- **数据一致性**:单例管理器确保全局数据一致性,不要绕过管理器直接操作底层资源。
### 2.2.1 单例模式实现机制
NEO Bot 提供了两种单例模式实现方式,位于 `core/utils/singleton.py`
#### 1. Singleton 基类(继承方式)
```python
from core.utils.singleton import Singleton
class MyManager(Singleton):
"""通过继承 Singleton 基类实现单例"""
def __init__(self, config: dict):
"""
初始化管理器
Args:
config: 配置字典
"""
# 调用父类 __init__ 确保单例初始化
super().__init__()
# 检查是否已经初始化(防止 __init__ 被多次调用)
if hasattr(self, '_my_initialized') and self._my_initialized:
return
# 执行一次性初始化逻辑
self.config = config
self.resource = None
self._initialize_resource()
# 标记为已初始化
self._my_initialized = True
def _initialize_resource(self):
"""初始化资源(只执行一次)"""
self.resource = initialize_resource(self.config)
async def cleanup(self):
"""清理资源(单例管理器应实现清理方法)"""
if self.resource:
await self.resource.close()
```
**特性**
- 通过重写 `__new__` 方法确保每个类只有一个实例
- 自动处理重复初始化问题,但建议子类添加额外的初始化检查
- 使用全局字典存储实例,避免类型检查问题
- 支持带参数的 `__init__` 方法
#### 2. @singleton 装饰器(装饰器方式)
```python
from core.utils.singleton import singleton
@singleton
class MyManager:
"""通过装饰器实现单例"""
def __init__(self, config):
self.config = config
self.resource = None
async def initialize(self):
self.resource = await load_resource()
```
**特性**
- 将普通类转换为单例类,无需修改类继承关系
- 保持原始类的元数据(名称、文档字符串等)
- 适用于无法修改基类的现有类
#### 3. 使用建议
- **新管理器类**:优先使用 **Singleton 基类继承方式**,结构更清晰
- **现有类转换**:使用 **@singleton 装饰器**,无需重构
- **线程安全**:两种方式都假设在单线程异步环境中使用,如需线程安全请自行加锁
- **导入方式**:单例类应该通过模块级别的实例变量导出,如:
```python ```python
# redis_manager.py async def get_user_data(user_id: int) -> Optional[Dict[str, Any]]:
class RedisManager(Singleton): # ...
...
redis_manager = RedisManager() # 创建并导出单例实例
``` ```
#### 4. 重要注意事项 ### 3. 文档字符串 (PEP 257)
- **避免循环导入**:单例类的导入应谨慎处理,避免循环依赖 **所有公开的模块、类、函数和方法都必须拥有文档字符串**。
- **初始化时机**:单例在第一次导入时创建,确保所需依赖已就绪
- **__init__ 调用语义**:虽然实例是单例,但 `__init__` 方法可能被多次调用(如重新导入时)。应添加额外检查确保一次性逻辑只执行一次。
- **资源清理**:单例管理器应在程序退出时清理资源,实现 `cleanup()` 方法
### 2.3 命名公约 - **格式**: 遵循 Google Python Style Guide 的文档字符串格式。它清晰、简洁且易于阅读。
- **文件命名**:使用小写字母和下划线,例如 `my_plugin.py`。 - **内容**:
- **类命名**:使用 `PascalCase`,例如 `CommandManager` - **模块/类**: 简要描述其职责和功能
- **函数/方法命名**:使用 `snake_case`,例如 `handle_message`。 - **函数/方法**:
- **常量命名**:使用 `UPPER_SNAKE_CASE`,例如 `MAX_RETRY_COUNT` - 一行总结其功能
- **变量命名**:使用 `snake_case`,具有描述性,避免单字母变量(循环变量除外) - `Args:`: 描述每个参数的类型和含义
- `Returns:`: 描述返回值的类型和含义。
### 2.4 类型提示公约 - `Raises:`: (可选) 描述可能抛出的主要异常。
- **全面使用**:所有函数、方法、类属性都应提供类型提示。**这是强制要求**,因为框架开启了 Mypyc 编译。 - **示例**:
- **性能优化**:类型提示不仅帮助发现 Bug还能让 Mypyc 生成更高效的机器码。 ```python
- **返回类型**:明确指定返回类型,包括 `None`。 async def fetch_data(url: str, timeout: int = 10) -> str:
- **复杂类型**:使用 `typing` 模块中的泛型,如 `List[str]`、`Dict[str, Any]`。 """Fetches content from a URL.
- **可选参数**:使用 `Optional[...]` 或默认值 `= None`。
**示例**
```python
# 好的写法
async def handle(event: MessageEvent, args: list[str]) -> None:
...
# 不好写法(会导致编译警告)
async def handle(event, args):
...
```
### 2.5 异常处理公约
- **自定义异常**:使用框架提供的自定义异常类,避免抛出通用的 `Exception`。
- **异常链**:保留原始异常信息,使用 `raise CustomError(...) from e`。
- **资源清理**:使用 `try...finally` 或上下文管理器确保资源释放。
### 2.6 日志记录公约
- **模块化日志**:每个模块使用 `ModuleLogger("ModuleName")` 创建专用日志记录器。
- **日志级别**
- `DEBUG`:调试信息,详细操作记录
- `INFO`:常规操作记录
- `WARNING`:预期内的异常或潜在问题
- `ERROR`:操作失败但可恢复的错误
- `CRITICAL`:系统级错误,需要立即关注
## 3. 注意事项(常见陷阱)
### 3.1 异步编程陷阱
- **忘记 await**:异步函数调用必须使用 `await`,否则任务不会执行。
- **阻塞循环**:在异步函数中执行同步阻塞操作会冻结整个事件循环。
- **任务泄漏**:创建的异步任务必须被妥善管理,避免内存泄漏。
### 3.2 资源管理陷阱
- **连接泄漏**:未关闭的 HTTP 连接、数据库连接会导致资源耗尽。
- **文件句柄泄漏**:打开的文件必须显式关闭或使用上下文管理器。
- **缓存雪崩**:大量缓存同时过期可能导致系统负载激增。
### 3.3 性能陷阱
- **N+1 查询**:避免在循环中执行数据库或 API 查询,使用批量操作。
- **内存泄漏**:大型数据结构长时间驻留内存,应定期清理。
- **重复计算**:相同的计算结果应缓存,避免重复计算。
### 3.4 安全性陷阱
- **SQL 注入**:使用参数化查询或 ORM禁止拼接 SQL 字符串。
- **XSS 攻击**:渲染用户输入时必须进行 HTML 转义。
- **路径遍历**:用户提供的文件路径必须进行规范化验证。
## 4. 代码规范(详细指南)
### 4.1 文档字符串规范(强制要求)
**所有代码必须包含完整的文档字符串**,这是项目质量保证的基础。缺少文档字符串的代码将在审查中被拒绝。
- **模块级文档**:每个模块顶部应有文档字符串,描述模块功能和主要接口。
- **类级文档**:每个类应有文档字符串,描述类的职责、使用方法和示例。
- **函数/方法级文档**:每个公共函数和方法必须有文档字符串,包含参数说明、返回值和异常信息。
**参数注释要求**
1. 每个参数都必须有类型提示和简要说明
2. 返回值必须明确说明类型和含义
3. 可能抛出的异常必须列出
4. 复杂的函数应提供使用示例
**标准格式示例:**
```python
def process_data(data: List[str], timeout: int = 30) -> Dict[str, Any]:
"""
处理数据并返回结果。
Args: Args:
data: 待处理的数据列表 url: The URL to fetch from.
timeout: 操作超时时间,单位秒 timeout: The request timeout in seconds.
Returns: Returns:
处理结果的字典,包含状态和详情 The content of the response as a string.
Raises: Raises:
TimeoutError: 处理超时时抛出 asyncio.TimeoutError: If the request times out.
ValueError: 数据格式错误时抛出
Example:
>>> result = process_data(["item1", "item2"])
>>> print(result["status"])
""" """
``` # ...
### 4.2 函数设计规范
- **单一职责**:每个函数只做一件事,保持功能简洁。
- **参数数量**:函数参数不宜过多(建议 ≤5过多时考虑使用 `dataclass` 或 `TypedDict`。
- **默认参数**:避免使用可变对象作为默认参数,使用 `None` 代替。
### 4.3 类设计规范
- **单一职责**:每个类应有明确的单一职责。
- **组合优于继承**:优先使用组合而非继承来复用功能。
- **属性访问控制**:使用 `@property` 装饰器控制属性访问,隐藏内部实现。
### 4.4 错误处理规范
- **错误码统一**:使用框架定义的 `ErrorCode` 枚举,避免自定义魔法数字。
- **错误响应格式**:使用 `exception_to_error_response` 生成统一错误响应。
- **用户友好消息**:错误消息应同时包含技术细节(日志)和用户友好提示(界面)。
### 4.5 测试规范
- **测试覆盖率**:核心功能应达到 80% 以上的测试覆盖率。
- **异步测试**:使用 `pytest-asyncio` 进行异步测试。
- **测试隔离**:测试用例之间应相互独立,避免依赖执行顺序。
## 5. 提交与协作规范
### 5.1 Git 提交规范
- **提交信息格式**:遵循 Conventional Commits 规范
``` ```
<type>(<scope>): <subject>
<body> ### 4. 导入规范
- **顺序**: 遵循 PEP 8 的建议,将导入分为三组,每组按字母顺序排列:
1. **标准库** (e.g., `asyncio`, `sys`)
2. **第三方库** (e.g., `aiohttp`, `loguru`)
3. **本项目模块** (e.g., `from core.managers import ...`)
- **绝对导入**: 优先使用绝对导入路径(`from core.utils import ...`),避免使用相对导入(`from ..utils import ...`),以增强代码清晰度。
<footer> ### 5. 日志记录
``` - **应当**: 使用 `from core.utils.logger import logger` 获取全局日志记录器实例。在需要区分模块来源时,可以使用 `ModuleLogger("MyModule")`
- **type**feat、fix、docs、style、refactor、test、chore - **日志级别**:
- **scope**:影响的模块或功能区域 - `DEBUG`: 用于详细的诊断信息。
- **subject**简洁的描述50字符以内 - `INFO`: 用于记录常规的操作流程。
- **body**:详细说明(可选) - `WARNING`: 用于表示发生了预期内的小问题,或提示潜在风险。
- **footer**Breaking Changes 或 Issue 引用 - `ERROR`: 用于记录影响功能但程序仍可运行的错误。
- `CRITICAL`: 用于记录导致程序崩溃的严重错误。
### 5.2 代码审查规范 ## 项目特定约定
- **审查重点**:功能正确性、代码规范、性能影响、安全性。
- **审查态度**:建设性反馈,避免人身攻击。
- **审查时效**24小时内响应审查请求。
### 5.3 分支管理规范 ### 1. 单例管理器
- **主分支**`main` 分支始终保持可部署状态 框架的核心功能由 `core/managers/` 下的单例管理器提供
- **功能分支**:从 `main` 创建,命名格式 `feature/简短描述`。
- **修复分支**:从 `main` 创建,命名格式 `fix/问题描述`。
### 5.4 发布规范 - **获取方式**: 必须通过导入模块级别的实例来使用,例如 `from core.managers.redis_manager import redis_manager`。
- **版本号**遵循语义化版本控制SemVer`主版本.次版本.修订版本` - **核心职责**: 这些管理器负责维护全局状态和资源池,是确保性能和数据一致性的关键。
- **更新日志**:每次发布都应更新 `CHANGELOG.md`。
- **向后兼容**:非主版本更新应保持 API 向后兼容。
## 6. 插件开发特别规范 ### 2. 配置管理
- **访问方式**: 所有配置项都应通过 `from core.config_loader import global_config` 来访问。
- **禁止**: 在代码中硬编码任何配置值(如 API 地址、端口、文件路径等)。
### 3. 插件元信息
每个插件文件都应在顶部定义 `__plugin_meta__` 字典,以供帮助系统使用。
### 6.1 插件元数据
每个插件必须在文件顶部定义 `__plugin_meta__` 字典:
```python ```python
__plugin_meta__ = { __plugin_meta__ = {
"name": "插件名称", "name": "插件名称",
"description": "插件功能描述", "description": "插件功能的简要描述",
"usage": "使用说明,包括命令格式和示例", "usage": "插件的使用方法,例如 `/command [args]`。"
"author": "作者名(可选)",
"version": "版本号(可选)",
} }
``` ```
### 6.2 命令注册 ## Git 提交约定
- **命令前缀**:使用配置中定义的前缀,不要硬编码。
- **权限控制**:使用 `Permission` 枚举指定命令权限级别。
- **参数解析**:利用框架的自动参数解析功能,避免手动解析。
### 6.3 插件生命周期 为了保持提交历史的清晰,我们采用一种简化的提交信息格式:
- **初始化**:避免在模块级别执行初始化操作,使用函数包装。
- **资源清理**:提供清理函数或使用上下文管理器管理资源。
- **错误恢复**:插件崩溃后应能优雅恢复,不影响其他插件。
## 7. 总结 `<type>: <subject>`
遵循这些规范将确保 NeoBot 项目保持高质量、高性能和高可维护性。所有贡献者都应阅读并理解这些规范,并在代码审查中互相监督执行。 - **`<type>`**:
- `feat`: 新功能
- `fix`: Bug 修复
- `docs`: 文档变更
- `style`: 代码格式调整(不影响逻辑)
- `refactor`: 代码重构
- `test`: 添加或修改测试
- `chore`: 构建过程或辅助工具的变动
- **`<subject>`**:
- 对本次提交的简明扼要的描述。
- 使用祈使句,例如 `add user authentication` 而不是 `added user authentication`。
**记住:规范不是束缚,而是高效协作的基础。** **示例**:
```
feat: Add /status command to show bot health
fix: Correctly handle empty messages in parser
docs: Update development standards with new guidelines
```

View File

@@ -44,6 +44,8 @@ source venv/bin/activate
pip install -r requirements.txt pip install -r requirements.txt
``` ```
这个文件里包含了所有需要的 Python 库,比如 `aiohttp` (HTTP 请求), `orjson` (JSON 解析), `jinja2` (模板渲染), `psutil` (系统监控) 等等。
### d. 安装 Playwright 依赖 ### d. 安装 Playwright 依赖
我们用 Playwright 来截图画画,它需要一个浏览器核心。 我们用 Playwright 来截图画画,它需要一个浏览器核心。

View File

@@ -72,3 +72,9 @@ Bot 应该会回复你:“你好,[你的昵称]!”
* `args: str`: 如果命令有参数(比如 `/echo hello world``args` 就是 `hello world` 这部分字符串。 * `args: str`: 如果命令有参数(比如 `/echo hello world``args` 就是 `hello world` 这部分字符串。
就这么简单,一个最基础的插件就写完了。 就这么简单,一个最基础的插件就写完了。
## 进阶阅读
- [指令处理](./command-handling.md): 了解如何处理参数、获取用户输入。
- [最佳实践](./best-practices.md): 学习如何编写更健壮、更高效的插件。
- [插件详解:/status 状态监控](./status-plugin.md): 深入了解内置的状态监控插件是如何实现的。

View File

@@ -0,0 +1,82 @@
# 插件详解:`/status` 状态监控
`/status``NeoBot` 内置的一个强大插件,它能让你实时了解机器人的运行状态、性能指标和指令调用情况。这不仅是一个酷炫的功能,更是一个重要的运维工具。
## 功能概览
发送 `/status` 指令后,机器人会返回一张精心设计的状态图,包含以下核心信息:
1. **系统信息**:
* **CPU 使用率**: 当前服务器的 CPU 负载情况。
* **内存占用**: 机器人进程占用了多少物理内存。
* **磁盘空间**: 服务器磁盘的使用情况。
2. **机器人核心指标**:
* **启动时间**: 机器人本次运行了多久。
* **连接状态**: 与 OneBot 客户端的连接是否正常。
* **消息收发**: 接收和发送了多少条消息。
3. **指令调用统计**:
* **总调用次数**: 所有指令一共被调用了多少次。
* **热门指令**: 哪些指令被使用的频率最高。
4. **版本信息**:
* **框架版本**: `NeoBot` 的版本号。
* **客户端信息**: 连接的 OneBot 客户端名称和版本(如 NapCatQQ
## 实现技术
这个插件综合运用了 `NeoBot` 框架的多种核心能力:
- **系统监控 (`psutil`)**: 通过 `psutil` 库获取实时的系统性能数据。
- **原子化统计 (`Redis + Lua`)**: 指令调用次数通过 Redis 的 Lua 脚本进行原子化递增,保证高并发下的数据准确性。
- **异步任务**: 启动时间、消息计数等信息在后台通过异步任务持续更新。
- **动态 HTML 渲染 (`Jinja2`)**: 状态信息被注入到一个 HTML 模板中。
- **网页截图 (`Playwright`)**: 渲染好的 HTML 页面通过 Playwright 的页面池进行截图,生成最终的状态图片。
## 如何使用
直接在与机器人聊天的任何地方(私聊或群聊)发送:
```
/status
```
机器人会处理几秒钟(主要是截图耗时),然后将状态图片发送给你。
## 自定义与扩展
想在状态图中添加你自己的信息?很简单!
1. **找到插件文件**: `plugins/bot_status.py`
2. **修改 `get_bot_status` 函数**: 这个函数负责收集所有需要展示的数据。你可以在这里添加新的数据源。
```python
# plugins/bot_status.py
async def get_bot_status() -> Dict[str, Any]:
# ... 已有的代码 ...
# 添加你自己的数据
my_plugin_data = {
"custom_metric": await get_my_metric(),
"plugin_version": "1.2.3"
}
status_data.update(my_plugin_data)
return status_data
```
3. **修改 HTML 模板**: `templates/status.html`。
在这个文件中,你可以用 Jinja2 的语法把你刚刚添加的数据展示出来。
```html
<!-- templates/status.html -->
<!-- ... 已有的代码 ... -->
<div class="card">
<h2>我的插件状态</h2>
<p>自定义指标: {{ custom_metric }}</p>
<p>插件版本: {{ plugin_version }}</p>
</div>
```
通过这种方式,你可以轻松地将 `/status` 打造成一个专属于你的、功能更加丰富的机器人仪表盘。

143
plugins/bot_status.py Normal file
View File

@@ -0,0 +1,143 @@
"""
Bot 状态查询插件
提供 /status 指令,以图片形式展示机器人当前的综合运行状态。
"""
import os
import psutil
import time
from datetime import datetime, timedelta
from core.bot import Bot
from core.managers.command_manager import matcher
from core.managers.image_manager import image_manager
from core.managers.redis_manager import redis_manager
from core.utils.executor import run_in_thread_pool
from core.utils.logger import logger
from models.events.message import MessageEvent, MessageSegment
from models.objects import LoginInfo, Status, VersionInfo
__plugin_meta__ = {
"name": "bot_status",
"description": "以图片形式展示机器人当前的综合运行状态",
"usage": "/status 或 /状态",
}
# 记录机器人启动时间
START_TIME = time.time()
# 获取当前进程
PROCESS = psutil.Process(os.getpid())
def _get_system_info():
"""
同步函数:使用 psutil 获取系统信息,避免阻塞事件循环。
"""
# interval=1 会阻塞1秒必须在线程池中运行
cpu_percent = psutil.cpu_percent(interval=1)
mem_info = psutil.virtual_memory()
bot_mem_mb = PROCESS.memory_info().rss / (1024 * 1024)
return {
"cpu_percent": f"{cpu_percent:.1f}",
"mem_percent": f"{mem_info.percent:.1f}",
"bot_mem_mb": f"{bot_mem_mb:.2f}",
}
@matcher.command("status", "状态")
async def handle_status(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理 status 指令,生成并回复机器人状态图片。
"""
logger.info(f"收到用户 {event.user_id} 的状态查询指令,开始生成状态图...")
try:
# 1. 获取API信息 (增加独立错误处理)
try:
# 优先使用 get_stranger_info 获取自身信息,比 get_login_info 更轻量
stranger_info = await bot.get_stranger_info(user_id=bot.self_id)
nickname = stranger_info.nickname
except Exception as e:
logger.warning(f"获取 stranger_info 失败: {e}, 将回退到 login_info")
try:
login_info = await bot.get_login_info()
nickname = login_info.nickname
except Exception as e2:
logger.warning(f"获取 login_info 也失败了: {e2}")
nickname = "获取失败"
# 状态信息:如果能响应此命令,说明机器人必然在线且状态良好
# 这避免了依赖可能超时或未实现的 get_status API
logger.debug("正在推断机器人状态...")
status_info = Status(online=True, good=True)
logger.debug(f"推断状态成功: online={status_info.online}, good={status_info.good}")
try:
version_info = await bot.get_version_info()
except Exception as e:
logger.warning(f"获取 version_info 失败: {e}")
version_info = VersionInfo(app_name="获取失败", app_version="N/A", protocol_version="N/A")
# 2. 计算运行时长
uptime_seconds = int(time.time() - START_TIME)
uptime_delta = timedelta(seconds=uptime_seconds)
days = uptime_delta.days
hours, remainder = divmod(uptime_delta.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = f"{days}{hours:02}:{minutes:02}:{seconds:02}"
bot_info_data = {
"user_id": bot.self_id,
"nickname": nickname,
"avatar_url": f"https://q1.qlogo.cn/g?b=qq&nk={bot.self_id}&s=640",
"start_time": datetime.fromtimestamp(START_TIME).strftime("%Y-%m-%d %H:%M:%S"),
"uptime": uptime_str,
}
# 3. 获取统计数据
msgs_recv = await redis_manager.get("neobot:stats:messages_received") or 0
msgs_sent = await redis_manager.get("neobot:stats:messages_sent") or 0
command_stats_raw = await redis_manager.redis.hgetall("neobot:command_stats")
total_commands = sum(int(v) for v in command_stats_raw.values())
stats_data = {
"messages_received": int(msgs_recv),
"messages_sent": int(msgs_sent),
"total_commands": total_commands,
}
command_stats_data = sorted(
[{"name": k, "count": int(v)} for k, v in command_stats_raw.items()],
key=lambda x: x["count"],
reverse=True
)
# 4. 异步获取系统信息
system_data = await run_in_thread_pool(_get_system_info)
# 5. 准备模板所需的所有数据
template_data = {
"bot_info": bot_info_data,
"status_info": status_info,
"version_info": version_info,
"stats": stats_data,
"system": system_data,
"command_stats": command_stats_data,
}
# 6. 渲染图片
base64_str = await image_manager.render_template_to_base64(
template_name="status.html",
data=template_data,
output_name="status.png",
image_type="png"
)
if base64_str:
await event.reply(MessageSegment.image(base64_str))
else:
# 如果渲染失败image_manager 内部会记录错误,这里给用户一个通用提示
await event.reply("状态图片生成失败,可能是渲染服务出现问题,请联系管理员。")
except Exception as e:
logger.exception(f"生成状态图时发生意外错误, 用户: {event.user_id}")
await event.reply(f"获取状态信息时发生未知错误,请稍后再试或联系管理员。")

View File

@@ -9,6 +9,7 @@ from datetime import datetime
from core.bot import Bot from core.bot import Bot
from core.managers.command_manager import matcher from core.managers.command_manager import matcher
from core.managers.redis_manager import redis_manager
from core.utils.executor import run_in_thread_pool from core.utils.executor import run_in_thread_pool
from models.events.message import MessageEvent, MessageSegment from models.events.message import MessageEvent, MessageSegment
@@ -91,6 +92,36 @@ async def handle_jrcd(bot: Bot, event: MessageEvent, args: list[str]):
reply_segments = [MessageSegment.at(user_id), MessageSegment.from_text(msg_text)] reply_segments = [MessageSegment.at(user_id), MessageSegment.from_text(msg_text)]
await event.reply(reply_segments) await event.reply(reply_segments)
# 使用 Lua 脚本原子化地增加总调用次数
lua_script = "return redis.call('INCR', KEYS[1])"
try:
total_calls = await redis_manager.execute_lua_script(
script=lua_script,
keys=["neobot:jrcd:total_calls"],
args=[]
)
if total_calls:
logger.info(f"jrcd 总调用次数: {total_calls}")
except Exception as e:
logger.error(f"jrcd 插件增加调用次数失败: {e}")
@matcher.command("jrcd_stats")
async def handle_jrcd_stats(bot: Bot, event: MessageEvent, args: list[str]):
"""
处理 jrcd_stats 指令,查询 /jrcd 的总调用次数。
:param bot: Bot 实例。
:param event: 消息事件对象。
:param args: 指令参数列表(未使用)。
"""
total_calls = await redis_manager.get("neobot:jrcd:total_calls")
if not total_calls:
total_calls = 0
reply_text = f"/jrcd 指令已被大家调用了 {total_calls} 次啦!"
await event.reply(reply_text)
@matcher.command("bbcd") @matcher.command("bbcd")
async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]): async def handle_bbcd(bot: Bot, event: MessageEvent, args: list[str]):

View File

@@ -42,6 +42,7 @@ platformdirs==4.5.1
playwright==1.57.0 playwright==1.57.0
pluggy==1.6.0 pluggy==1.6.0
propcache==0.4.1 propcache==0.4.1
psutil==5.9.8
pycparser==2.23 pycparser==2.23
pydantic==2.12.5 pydantic==2.12.5
pydantic_core==2.41.5 pydantic_core==2.41.5

View File

@@ -1,40 +1,38 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
优化版跨平台 Python 模块编译脚本 自动化跨平台 Python 模块编译脚本 (v2.2)
将核心 Python 模块编译为机器码.pyd 或 .so以提升性能。 将核心 Python 模块编译为机器码 (.pyd 或 .so) 以提升性能。
此版本基于对项目结构的深入分析,包含了更多高频使用的模块 此版本实现了全自动清理和保守的编译范围,以确保稳定性
支持的平台: 支持的平台:
- Windows: 生成 .pyd 文件 - Windows: 生成 .pyd 文件
- Linux: 生成 .so 文件 - Linux: 生成 .so 文件
使用方法: 使用方法:
python compile_machine_code.py [options] python scripts/compile_machine_code.py
选项: 脚本会自动执行以下步骤:
--compile, -c 编译指定的模块(默认) 1. 清理所有旧的编译文件 (.pyd, .so) 和 build 目录。
--list, -l 列出已编译的模块 2. 编译模块列表中所有兼容的模块
--clean, -k 清理编译生成的文件 3. 列出所有成功编译的模块。
--help, -h 显示帮助信息
注意: 注意:
1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC) 1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC)
2. 需要安装 mypyc: pip install mypyc 2. 需要安装 mypyc: pip install mypyc
3. 编译后的文件是平台相关的,不能跨平台复制 3. 编译后的文件是平台相关的,不能跨平台复制
4. 建议在部署的目标环境上运行此脚本
5. Mypyc 不支持动态特性,如 eval/exec/getattr/setattr 等
""" """
import os import os
import sys import sys
import glob import glob
import subprocess import subprocess
import shutil import shutil
import argparse
# --- 配置区 ---
# 检测当前平台和 Python 版本 # 检测当前平台和 Python 版本
PLATFORM = sys.platform PLATFORM = sys.platform
PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}" # 例如 "314" PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}"
if PLATFORM.startswith('win'): if PLATFORM.startswith('win'):
EXTENSION = '.pyd' EXTENSION = '.pyd'
@@ -48,637 +46,159 @@ else:
print(f"不支持的平台: {PLATFORM}") print(f"不支持的平台: {PLATFORM}")
sys.exit(1) sys.exit(1)
# 根据项目分析,优化要编译模块列表 # 经过测试的稳定编译模块列表
# 这些是项目中使用频率最高的模块,编译后能显著提升性能
MODULES = [ MODULES = [
# 工具模块 - 高频使用 # 工具模块
'core/utils/json_utils.py', # JSON 处理 - 高频使用 'core/utils/executor.py',
'core/utils/executor.py', # 代码执行引擎 - 高频使用 'core/utils/exceptions.py',
'core/utils/exceptions.py', # 自定义异常 - 基础组件 'core/utils/logger.py',
'core/utils/performance.py', # 性能监控工具 - 重要组件
'core/utils/logger.py', # 日志模块 - 高频使用
'core/utils/singleton.py', # 单例模式 - 基础组件
# 核心管理模块 - 高频使用 # 核心基础模块
# 'core/managers/command_manager.py', # 指令匹配和分发 - 包含动态特性,不适合编译 'core/ws.py',
# 'core/managers/admin_manager.py', # 管理员管理 - 包含动态特性,不适合编译 'core/config_loader.py',
# 'core/managers/permission_manager.py', # 权限管理 - 包含动态特性,不适合编译
# 'core/managers/plugin_manager.py', # 插件管理器 - 包含动态特性,不适合编译
# 'core/managers/redis_manager.py', # Redis 管理器 - 包含动态特性,不适合编译
# 'core/managers/image_manager.py', # 图片管理器 - 包含动态特性,不适合编译
# 核心基础模块 - 高频使用 # 数据模型 (仅包含安全的、无复杂元类的部分)
'core/ws.py', # WebSocket 核心 - 核心通信被10个文件引用 'models/message.py',
# 'core/bot.py', # Bot 核心抽象 - 使用多重继承,不适合编译 'models/sender.py',
'core/config_loader.py', # 配置加载 - 启动必需被7个文件引用 'models/objects.py',
# 'core/config_models.py', # 配置模型 - 包含复杂类型定义,不适合编译
# 'core/permission.py', # 权限枚举 - 包含动态属性,不适合编译
# 数据模型 - 高频使用
'models/message.py', # 消息段模型 - 高频消息处理
'models/sender.py', # 发送者模型 - 高频消息处理
'models/objects.py', # API 响应数据模型 - 高频数据处理
# 事件处理相关 - 高频使用
'core/handlers/event_handler.py', # 事件处理器 - 核心事件处理
# 事件模型 - 高频使用但包含dataclass可能有编译问题暂时排除
# 'models/events/message.py', # 消息事件 - 最高频事件类型
# 'models/events/notice.py', # 通知事件 - 高频事件类型
# 'models/events/request.py', # 请求事件 - 高频事件类型
# 'models/events/meta.py', # 元事件 - 高频事件类型
# 注意:以下文件不适合编译
# - 主程序文件main.py
# - 测试文件tests/目录)
# - 插件文件plugins/目录)
# - 编译脚本compile_machine_code.py等
# - 包含复杂动态特性的文件
# - API 基础类(由于多重继承问题)
] ]
# --- 功能函数 ---
def list_compiled_modules(): def list_compiled_modules():
"""列出已编译的模块""" """列出已编译的模块"""
print(f"\n已编译的 {PLATFORM} 模块:") print(f"\n已编译的 {PLATFORM} 模块:")
print("=" * 50) print("=" * 50)
# 查找所有编译后的文件 compiled_files = glob.glob(f'**/*{EXTENSION}', recursive=True)
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f] compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files: if compiled_files:
for f in sorted(compiled_files): for f in sorted(compiled_files):
size = os.path.getsize(f) // 1024 # KB try:
size = os.path.getsize(f) // 1024
print(f"{f} ({size} KB)") print(f"{f} ({size} KB)")
except FileNotFoundError:
continue
else: else:
print(f"未找到已编译的 {EXTENSION} 文件") print(f"未找到已编译的 {EXTENSION} 文件")
print(f"\n总计: {len(compiled_files)} 个文件") print(f"\n总计: {len(compiled_files)} 个文件")
def clean_compiled_files(): def clean_compiled_files():
"""清理编译生成的文件""" """清理所有编译生成的文件和目录"""
print(f"\n清理编译生成的 {EXTENSION} 文件...") print("--- 步骤 1: 清理旧的编译文件 ---")
# 查找所有编译后的文件 compiled_files = glob.glob(f'**/*{EXTENSION}', recursive=True)
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f] compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files: if compiled_files:
for f in sorted(compiled_files): for f in sorted(compiled_files):
try: try:
os.remove(f) os.remove(f)
print(f"已删除: {f}") print(f" 已删除: {f}")
except Exception as e: except Exception as e:
print(f"删除失败 {f}: {e}") print(f" 删除失败 {f}: {e}")
else:
print(" 没有可清理的文件")
# 清理 build 目录
if os.path.exists('build'): if os.path.exists('build'):
try: try:
shutil.rmtree('build') shutil.rmtree('build')
print("已删除 build 目录") print(" 已删除 build 目录")
except Exception as e: except Exception as e:
print(f"删除 build 目录失败: {e}") print(f" 删除 build 目录失败: {e}")
else: print("-" * 35)
print(f"没有可清理的 {EXTENSION} 文件")
def get_platform_specific_module_name(module_path):
"""获取平台特定的模块文件名"""
module_name = module_path.replace('.py', '')
return f"{module_name}.{BUILD_PREFIX}{EXTENSION}"
def compile_module(module_path): def compile_module(module_path):
"""编译单个模块""" """编译单个模块"""
print(f"\n编译: {module_path}") print(f"\n编译: {module_path}")
try: try:
# 直接调用 mypyc 命令行工具
# 使用二进制模式捕获输出以避免编码问题
result = subprocess.run( result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path], [sys.executable, '-m', 'mypyc', module_path],
capture_output=True, capture_output=True,
check=True check=True,
text=True,
encoding='utf-8',
errors='replace'
) )
# 解码输出时处理可能的编码错误 # 智能查找编译产物
try: base_name = os.path.basename(module_path).replace('.py', '')
stdout_text = result.stdout.decode('utf-8', errors='replace') search_pattern = os.path.join(BUILD_PATH, '**', f'{base_name}.*{EXTENSION}')
stderr_text = result.stderr.decode('utf-8', errors='replace')
except AttributeError:
# 如果已经是字符串Python 3.7+),则直接使用
stdout_text = result.stdout
stderr_text = result.stderr
# 获取平台特定的模块名 found_files = glob.glob(search_pattern, recursive=True)
platform_module = get_platform_specific_module_name(module_path)
mypyc_platform_module = platform_module.replace(EXTENSION, f'__mypyc{EXTENSION}')
# 检查编译产物是否在当前目录 if found_files:
if os.path.exists(platform_module): build_module_path = found_files[0]
print(f" ✓ 编译成功: {platform_module}") dest_path = os.path.join(os.path.dirname(module_path), os.path.basename(build_module_path))
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
shutil.copy2(build_module_path, dest_path)
print(f" ✓ 编译成功: {dest_path}")
return True return True
else: else:
# 检查 build 目录中是否有编译产物 print(f" ✗ 编译失败:在 {BUILD_PATH} 中找不到编译产物")
build_module_path = os.path.join(BUILD_PATH, platform_module) if result.stdout: print(f" 输出: {result.stdout[:500]}...")
build_mypyc_path = os.path.join(BUILD_PATH, mypyc_platform_module) if result.stderr: print(f" 错误: {result.stderr[:500]}...")
if os.path.exists(build_module_path):
# 如果在 build 目录中,复制到正确位置
os.makedirs(os.path.dirname(platform_module), exist_ok=True)
shutil.copy2(build_module_path, platform_module)
if os.path.exists(build_mypyc_path):
shutil.copy2(build_mypyc_path, mypyc_platform_module)
print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}")
return True
else:
print(" ✗ 编译失败:找不到编译产物")
if result.stdout:
print(f" 编译输出:{stdout_text[:500]}...")
if result.stderr:
print(f" 错误信息:{stderr_text[:500]}...")
return False return False
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f" ✗ 编译失败,退出码: {e.returncode}") print(f" ✗ 编译失败 (Exit Code: {e.returncode})")
if hasattr(e, 'stdout') and e.stdout: if e.stdout: print(f" 输出: {e.stdout[:500]}...")
try: if e.stderr: print(f" 错误: {e.stderr[:500]}...")
stdout_text = e.stdout.decode('utf-8', errors='replace') if isinstance(e.stdout, bytes) else e.stdout
print(f" 编译输出:{stdout_text[:500]}...")
except Exception:
print(f" 编译输出:{str(e.stdout)[:500]}...")
if hasattr(e, 'stderr') and e.stderr:
try:
stderr_text = e.stderr.decode('utf-8', errors='replace') if isinstance(e.stderr, bytes) else e.stderr
print(f" 错误信息:{stderr_text[:500]}...")
except Exception:
print(f" 错误信息:{str(e.stderr)[:500]}...")
return False return False
except Exception as e: except Exception as e:
print(f" ✗ 编译失败,意外错误: {e}") print(f" ✗ 编译时发生意外错误: {e}")
import traceback
traceback.print_exc()
return False return False
def should_skip_module(module_path):
"""检查模块是否应该被跳过编译"""
try:
with open(module_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含抽象基类相关代码
if 'from abc import ABC' in content or 'from abc import abstractmethod' in content:
return True, "包含抽象基类,不适合编译"
# 检查是否包含危险的动态特性
# 注意我们允许基本的动态特性如getattr但对于eval、exec等危险操作仍然阻止
if ('eval(' in content or 'exec(' in content or
'compile(' in content):
return True, "包含危险动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if ('__dict__' in content or '__class__' in content or
'__module__' in content or '__bases__' in content):
return True, "包含复杂动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if '.__dict__' in content or '.__class__' in content:
return True, "包含复杂动态特性,不适合编译"
return False, ""
except Exception as e:
return True, f"读取文件时出错: {e}"
def compile_all_modules(): def compile_all_modules():
"""编译所有指定的模块""" """编译所有指定的模块"""
print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})") print("\n--- 步骤 2: 开始编译模块 ---")
print("=" * 60)
# 验证模块文件是否存在并检查是否适合编译
valid_modules = [] valid_modules = []
skipped_modules = [] for module in MODULES:
if os.path.exists(module):
for module_path in MODULES: valid_modules.append(module)
if os.path.exists(module_path):
should_skip, reason = should_skip_module(module_path)
if should_skip:
print(f"跳过: {module_path} ({reason})")
skipped_modules.append((module_path, reason))
else: else:
valid_modules.append(module_path) print(f"警告: 模块 {module} 不存在,已跳过")
else:
print(f"警告: 模块 {module_path} 不存在,将被跳过")
print(f"\n有效模块: {len(valid_modules)}, 跳过模块: {len(skipped_modules)}") print(f"\n找到 {len(valid_modules)} 个有效模块进行编译。")
if not valid_modules:
print("错误: 没有有效的模块可编译")
return False
# 编译模块
success_count = 0 success_count = 0
failed_modules = [] failed_modules = []
for module_path in valid_modules: for module in valid_modules:
if compile_module(module_path): if compile_module(module):
success_count += 1 success_count += 1
else: else:
failed_modules.append(module_path) failed_modules.append(module)
print("\n" + "=" * 60) print("\n" + "=" * 50)
print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功") print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功")
if failed_modules: if failed_modules:
print(f"失败模块: {failed_modules}") print(f"失败模块: {failed_modules}")
if success_count == len(valid_modules):
print("✓ 所有模块编译成功")
return True
else:
print("✗ 部分模块编译失败") print("✗ 部分模块编译失败")
return False else:
print("✓ 所有模块编译成功")
print("=" * 50)
def main(): def main():
"""主函数""" """主函数:执行清理、编译和列出结果的全过程"""
# 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor >= 8): if not (sys.version_info.major == 3 and sys.version_info.minor >= 8):
print("警告: 推荐使用 Python 3.8+ 以获得最佳性能") print("警告: 推荐使用 Python 3.8+ 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题")
print()
parser = argparse.ArgumentParser(description='优化版跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()
group.add_argument('--compile', '-c', action='store_true', default=True,
help='编译指定的模块 (默认)')
group.add_argument('--list', '-l', action='store_true',
help='列出已编译的模块')
group.add_argument('--clean', '-k', action='store_true',
help='清理编译生成的文件')
args = parser.parse_args()
# 检查是否安装了 mypyc
try: try:
import mypyc import mypyc
except ImportError: except ImportError:
print("错误: 未安装 mypyc先安装: pip install mypyc") print("错误: 未安装 mypyc运行: pip install mypyc")
sys.exit(1) sys.exit(1)
if args.list:
list_compiled_modules()
elif args.clean:
clean_compiled_files() clean_compiled_files()
else:
compile_all_modules() compile_all_modules()
print("\n使用 --list 选项查看已编译的模块")
print("使用 --clean 选项清理编译文件")
if __name__ == '__main__':
main()#!/usr/bin/env python3
"""
跨平台 Python 模块编译脚本
将核心 Python 模块编译为机器码(.pyd 或 .so以提升性能。
支持的平台:
- Windows: 生成 .pyd 文件
- Linux: 生成 .so 文件
使用方法:
python compile_machine_code.py [options]
选项:
--compile, -c 编译指定的模块(默认)
--list, -l 列出已编译的模块
--clean, -k 清理编译生成的文件
--help, -h 显示帮助信息
注意:
1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC)
2. 需要安装 mypyc: pip install mypyc
3. 编译后的文件是平台相关的,不能跨平台复制
4. 建议在部署的目标环境上运行此脚本
"""
import os
import sys
# 检测当前平台
PLATFORM = sys.platform
if PLATFORM.startswith('win'):
EXTENSION = '.pyd'
BUILD_PREFIX = 'cp314-win_amd64'
BUILD_PATH = os.path.join('build', 'lib.win-amd64-cpython-314')
elif PLATFORM.startswith('linux'):
EXTENSION = '.so'
BUILD_PREFIX = 'cp314-x86_64-linux-gnu'
BUILD_PATH = os.path.join('build', 'lib.linux-x86_64-cpython-314')
else:
print(f"不支持的平台: {PLATFORM}")
sys.exit(1)
# 要编译的模块列表
# 注意Mypyc 对动态特性支持有限,只选择计算密集或类型明确的模块
MODULES = [
# 工具模块 - 高频使用
'core/utils/json_utils.py', # JSON 处理 - 高频使用
'core/utils/executor.py', # 代码执行引擎 - 高频使用
'core/utils/exceptions.py', # 自定义异常 - 基础组件
'core/utils/performance.py', # 性能监控工具 - 重要组件
'core/utils/logger.py', # 日志模块 - 高频使用
'core/utils/singleton.py', # 单例模式 - 基础组件
# 核心管理模块 - 高频使用
# 'core/managers/command_manager.py', # 指令匹配和分发 - 包含动态特性,不适合编译
# 'core/managers/admin_manager.py', # 管理员管理 - 包含动态特性,不适合编译
# 'core/managers/permission_manager.py', # 权限管理 - 包含动态特性,不适合编译
# 'core/managers/plugin_manager.py', # 插件管理器 - 包含动态特性,不适合编译
# 'core/managers/redis_manager.py', # Redis 管理器 - 包含动态特性,不适合编译
# 'core/managers/image_manager.py', # 图片管理器 - 包含动态特性,不适合编译
# 核心基础模块 - 高频使用
'core/ws.py', # WebSocket 核心 - 核心通信被10个文件引用
# 'core/bot.py', # Bot 核心抽象 - 使用多重继承,不适合编译
'core/config_loader.py', # 配置加载 - 启动必需被7个文件引用
# 'core/config_models.py', # 配置模型 - 包含复杂类型定义,不适合编译
# 'core/permission.py', # 权限枚举 - 包含动态属性,不适合编译
# 数据模型 - 高频使用
'models/message.py', # 消息段模型 - 高频消息处理
'models/sender.py', # 发送者模型 - 高频消息处理
'models/objects.py', # API 响应数据模型 - 高频数据处理
# 事件处理相关 - 高频使用
'core/handlers/event_handler.py', # 事件处理器 - 核心事件处理
# 事件模型 - 高频使用但包含dataclass可能有编译问题暂时排除
# 'models/events/message.py', # 消息事件 - 最高频事件类型
# 'models/events/notice.py', # 通知事件 - 高频事件类型
# 'models/events/request.py', # 请求事件 - 高频事件类型
# 'models/events/meta.py', # 元事件 - 高频事件类型
# 注意:以下文件不适合编译
# - 主程序文件main.py
# - 测试文件tests/目录)
# - 插件文件plugins/目录)
# - 编译脚本compile_machine_code.py等
# - 包含复杂动态特性的文件
# - API 基础类(由于多重继承问题)
]
def list_compiled_modules():
"""列出已编译的模块"""
print(f"\n已编译的 {PLATFORM} 模块:")
print("=" * 50)
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
size = os.path.getsize(f) // 1024 # KB
print(f"{f} ({size} KB)")
else:
print(f"未找到已编译的 {EXTENSION} 文件")
print(f"\n总计: {len(compiled_files)} 个文件")
def clean_compiled_files():
"""清理编译生成的文件"""
print(f"\n清理编译生成的 {EXTENSION} 文件...")
# 查找所有编译后的文件
compiled_files = []
for ext in [EXTENSION, f'__mypyc{EXTENSION}']:
compiled_files.extend(glob.glob(f'**/*{ext}', recursive=True))
# 过滤掉虚拟环境中的文件
compiled_files = [f for f in compiled_files if 'venv' not in f and '.venv' not in f]
if compiled_files:
for f in sorted(compiled_files):
try:
os.remove(f)
print(f"已删除: {f}")
except Exception as e:
print(f"删除失败 {f}: {e}")
# 清理 build 目录
if os.path.exists('build'):
try:
shutil.rmtree('build')
print("已删除 build 目录")
except Exception as e:
print(f"删除 build 目录失败: {e}")
else:
print(f"没有可清理的 {EXTENSION} 文件")
def get_platform_specific_module_name(module_path):
"""获取平台特定的模块文件名"""
# 只获取模块名,不包含路径
module_name = os.path.basename(module_path).replace('.py', '')
return f"{module_name}.{BUILD_PREFIX}{EXTENSION}"
def compile_module(module_path):
"""编译单个模块"""
print(f"\n编译: {module_path}")
try:
# 直接调用 mypyc 命令行工具
# 使用二进制模式捕获输出以避免编码问题
result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path],
capture_output=True,
check=True
)
# 解码输出时处理可能的编码错误
try:
stdout_text = result.stdout.decode('utf-8', errors='replace')
stderr_text = result.stderr.decode('utf-8', errors='replace')
except AttributeError:
# 如果已经是字符串Python 3.7+),则直接使用
stdout_text = result.stdout
stderr_text = result.stderr
# 获取平台特定的模块名
# 获取模块名和目录
module_dir = os.path.dirname(module_path)
module_basename = os.path.basename(module_path).replace('.py', '')
# 生成平台特定的模块文件名(仅文件名,不含路径)
platform_module_name = f"{module_basename}.{BUILD_PREFIX}{EXTENSION}"
mypyc_platform_module_name = f"{module_basename}__mypyc.{BUILD_PREFIX}{EXTENSION}"
# 完整路径构造
platform_module = os.path.join(module_dir, platform_module_name)
mypyc_platform_module = os.path.join(module_dir, mypyc_platform_module_name)
# 检查编译产物是否在当前目录
if os.path.exists(platform_module):
print(f" ✓ 编译成功: {platform_module}")
return True
else:
# 检查 build 目录中是否有编译产物
build_module_path = os.path.join(BUILD_PATH, module_dir, platform_module_name)
build_mypyc_path = os.path.join(BUILD_PATH, module_dir, mypyc_platform_module_name)
if os.path.exists(build_module_path):
# 如果在 build 目录中,复制到正确位置
os.makedirs(module_dir, exist_ok=True)
shutil.copy2(build_module_path, platform_module)
if os.path.exists(build_mypyc_path):
shutil.copy2(build_mypyc_path, mypyc_platform_module)
print(f" ✓ 编译成功(已从 build 目录复制): {platform_module}")
return True
else:
print(" ✗ 编译失败:找不到编译产物")
if result.stdout:
print(f" 编译输出:{stdout_text[:500]}...")
if result.stderr:
print(f" 错误信息:{stderr_text[:500]}...")
return False
except subprocess.CalledProcessError as e:
print(f" ✗ 编译失败,退出码: {e.returncode}")
if hasattr(e, 'stdout') and e.stdout:
try:
stdout_text = e.stdout.decode('utf-8', errors='replace') if isinstance(e.stdout, bytes) else e.stdout
print(f" 编译输出:{stdout_text[:500]}...")
except Exception:
print(f" 编译输出:{str(e.stdout)[:500]}...")
if hasattr(e, 'stderr') and e.stderr:
try:
stderr_text = e.stderr.decode('utf-8', errors='replace') if isinstance(e.stderr, bytes) else e.stderr
print(f" 错误信息:{stderr_text[:500]}...")
except Exception:
print(f" 错误信息:{str(e.stderr)[:500]}...")
return False
except Exception as e:
print(f" ✗ 编译失败,意外错误: {e}")
import traceback
traceback.print_exc()
return False
def should_skip_module(module_path):
"""检查模块是否应该被跳过编译"""
try:
with open(module_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否包含抽象基类相关代码
if 'from abc import ABC' in content or 'from abc import abstractmethod' in content:
return True, "包含抽象基类,不适合编译"
# 检查是否包含危险的动态特性
# 注意我们允许基本的动态特性如getattr但对于eval、exec等危险操作仍然阻止
if ('eval(' in content or 'exec(' in content or
'compile(' in content):
return True, "包含危险动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if ('__dict__' in content or '__class__' in content or
'__module__' in content or '__bases__' in content):
return True, "包含复杂动态特性,不适合编译"
# 检查是否包含复杂的动态属性访问
if '.__dict__' in content or '.__class__' in content:
return True, "包含复杂动态特性,不适合编译"
return False, ""
except Exception as e:
return True, f"读取文件时出错: {e}"
def compile_all_modules():
"""编译所有指定的模块"""
print(f"\n开始编译 {len(MODULES)} 个模块 (平台: {PLATFORM})")
print("=" * 60)
# 验证模块文件是否存在并检查是否适合编译
valid_modules = []
skipped_modules = []
for module_path in MODULES:
if os.path.exists(module_path):
should_skip, reason = should_skip_module(module_path)
if should_skip:
print(f"跳过: {module_path} ({reason})")
skipped_modules.append((module_path, reason))
else:
valid_modules.append(module_path)
else:
print(f"警告: 模块 {module_path} 不存在,将被跳过")
print(f"\n有效模块: {len(valid_modules)}, 跳过模块: {len(skipped_modules)}")
if not valid_modules:
print("错误: 没有有效的模块可编译")
return False
# 编译模块
success_count = 0
failed_modules = []
for module_path in valid_modules:
if compile_module(module_path):
success_count += 1
else:
failed_modules.append(module_path)
print("\n" + "=" * 60)
print(f"编译完成: {success_count}/{len(valid_modules)} 个模块成功")
if failed_modules:
print(f"失败模块: {failed_modules}")
if success_count == len(valid_modules):
print("✓ 所有模块编译成功")
return True
else:
print("✗ 部分模块编译失败")
return False
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='跨平台 Python 模块编译脚本')
group = parser.add_mutually_exclusive_group()
group.add_argument('--compile', '-c', action='store_true', default=True,
help='编译指定的模块 (默认)')
group.add_argument('--list', '-l', action='store_true',
help='列出已编译的模块')
group.add_argument('--clean', '-k', action='store_true',
help='清理编译生成的文件')
args = parser.parse_args()
# 检查是否安装了 mypyc
try:
import mypyc
except ImportError:
print("错误: 未安装 mypyc请先安装: pip install mypyc")
sys.exit(1)
if args.list:
list_compiled_modules() list_compiled_modules()
elif args.clean:
clean_compiled_files()
else:
compile_all_modules()
print("\n使用 --list 选项查看已编译的模块")
print("使用 --clean 选项清理编译文件")
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -1,74 +0,0 @@
#!/usr/bin/env python3
"""
编译模块脚本
这个脚本会单独编译每个Python模块确保每个模块都在正确位置生成独立的.pyd文件。
"""
import os
import sys
import glob
from mypyc.build import mypycify
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
def compile_module(module_path):
"""
编译单个模块
Args:
module_path: 要编译的Python模块路径
"""
print(f"\nCompiling {module_path}...")
try:
ext_modules = mypycify([module_path])
setup(name=f'compiled_{os.path.basename(module_path).replace(".py", "")}',
ext_modules=ext_modules)
return True
except Exception as e:
print(f"Error compiling {module_path}: {e}")
return False
def main():
"""
主函数
"""
# 检查 Python 版本
if not (sys.version_info.major == 3 and sys.version_info.minor == 14):
print("警告: 推荐使用 Python 3.14 以获得最佳性能")
print(f"当前版本: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print("继续编译可能导致兼容性问题")
print()
# 要编译的模块列表
modules = [
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/permission_manager.py', # 权限管理(包含管理员管理功能)
'core/ws.py', # WebSocket 核心
'core/managers/plugin_manager.py', # 插件管理器
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
]
# 自动添加 events 模型
event_models = glob.glob('models/events/*.py')
event_models = [m for m in event_models if not m.endswith('__init__.py')]
modules.extend(event_models)
print(f"Found {len(modules)} modules to compile.")
success_count = 0
for module in modules:
if compile_module(module):
success_count += 1
print("\n--- Compilation Summary ---")
print(f"Total modules: {len(modules)}")
print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(modules) - success_count}")
if __name__ == '__main__':
main()

View File

@@ -1,117 +0,0 @@
"""
Mypyc 编译脚本
用于将核心 Python 模块编译为 C 扩展,以提升性能。
使用方法:
python setup_mypyc.py build_ext --inplace
注意:
1. 需要安装 C 编译器 (Windows 上需要 Visual Studio Build Tools, Linux 上需要 GCC)。
2. 编译后的文件 (.pyd 或 .so) 是平台相关的,不能跨平台复制。
3. 建议在部署的目标环境 (Linux) 上运行此脚本。
"""
import os
import sys
import subprocess
# 基础模块列表
# 注意Mypyc 对动态特性支持有限,只选择计算密集或类型明确的模块
modules = [
# 工具模块
'core/utils/json_utils.py', # JSON 处理
'core/utils/executor.py', # 代码执行引擎
'core/utils/singleton.py', # 单例模式基类
'core/utils/exceptions.py', # 自定义异常
'core/utils/logger.py', # 日志模块
# 核心管理模块
'core/managers/command_manager.py', # 指令匹配和分发
'core/managers/permission_manager.py', # 权限管理(包含管理员管理功能)
'core/managers/plugin_manager.py', # 插件管理器
# 核心基础模块
'core/ws.py', # WebSocket 核心
'core/bot.py', # Bot 核心抽象
'core/config_loader.py', # 配置加载
'core/config_models.py', # 配置模型
'core/permission.py', # 权限枚举
# API 基础模块
'core/api/base.py', # API 基础类
# 数据模型(适合编译的高频使用数据类)
'models/message.py', # 消息段模型
'models/sender.py', # 发送者模型
'models/objects.py', # API 响应数据模型
]
# 注意:事件模型文件暂时不编译,因为它们与 mypyc 存在兼容性问题
# mypyc 对某些数据类特性和继承结构的支持有限,会导致运行时错误
# event_models = glob.glob('models/events/*.py')
# event_models = [m for m in event_models if not m.endswith('__init__.py')]
# modules.extend(event_models)
# 确保文件存在
valid_modules = []
for m in modules:
if os.path.exists(m):
valid_modules.append(m)
else:
print(f"Warning: Module {m} not found, skipping.")
if not valid_modules:
print("No valid modules found to compile.")
sys.exit(1)
print(f"Compiling the following modules with mypyc: {valid_modules}")
# 使用 mypyc 命令行工具单独编译每个模块,确保位置正确
success_count = 0
for module_path in valid_modules:
print(f"\nCompiling {module_path}...")
try:
# 直接调用 mypyc 命令行工具
result = subprocess.run(
[sys.executable, '-m', 'mypyc', module_path],
capture_output=True,
text=True,
check=True
)
# 验证编译产物是否在正确位置
module_name = module_path.replace('.py', '')
pyd_path = module_name + '.cp314-win_amd64.pyd'
mypyc_path = module_name + '__mypyc.cp314-win_amd64.pyd'
if os.path.exists(pyd_path):
print(f" ✓ Compiled successfully: {pyd_path}")
success_count += 1
else:
# 检查 build 目录中是否有编译产物
build_pyd_path = os.path.join('build', 'lib.win-amd64-cpython-314', pyd_path)
if os.path.exists(build_pyd_path):
# 如果在 build 目录中,复制到正确位置
os.makedirs(os.path.dirname(pyd_path), exist_ok=True)
import shutil
shutil.copy2(build_pyd_path, pyd_path)
shutil.copy2(os.path.join('build', 'lib.win-amd64-cpython-314', mypyc_path), mypyc_path)
print(f" ✓ Compiled successfully (copied from build directory): {pyd_path}")
success_count += 1
else:
print(" ✗ Compiled but cannot find pyd file")
print(f" Build output:\n{result.stdout[:500]}...")
except subprocess.CalledProcessError as e:
print(f" ✗ Compilation failed with exit code {e.returncode}")
print(f" Error:\n{e.stderr[:500]}...")
except Exception as e:
print(f" ✗ Unexpected error: {e}")
print("\n--- Compilation Summary ---")
print(f"Total modules: {len(valid_modules)}")
print(f"Successfully compiled: {success_count}")
print(f"Failed: {len(valid_modules) - success_count}")
if success_count == 0:
print("No modules were compiled successfully. Exiting with error.")
sys.exit(1)

279
templates/status.html Normal file
View File

@@ -0,0 +1,279 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Bot Status</title>
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500&family=Noto+Sans+SC:wght@400;500;700&display=swap" rel="stylesheet">
<style>
:root {
--bg-color: #0f172a;
--window-bg: rgba(30, 41, 59, 0.85);
--border-color: rgba(255, 255, 255, 0.08);
--accent: #6366f1;
--text-title: #f8fafc;
--text-desc: #94a3b8;
--text-cmd: #a5f3fc;
--card-bg: rgba(0, 0, 0, 0.2);
--cmd-bg: #0b1120;
}
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Noto Sans SC', system-ui, sans-serif;
background-color: var(--bg-color);
color: var(--text-title);
display: flex;
justify-content: center;
padding: 0;
min-height: 100vh;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
.window {
width: 100%;
min-height: 100vh;
background: var(--window-bg);
backdrop-filter: blur(20px);
-webkit-backdrop-filter: blur(20px);
display: flex;
flex-direction: column;
}
.header {
padding: 32px 40px;
border-bottom: 1px solid var(--border-color);
background: rgba(255, 255, 255, 0.02);
display: flex;
justify-content: space-between;
align-items: center;
}
.dots { display: flex; gap: 8px; }
.dot { width: 12px; height: 12px; border-radius: 50%; }
.red { background: #ef4444; }
.yellow { background: #f59e0b; }
.green { background: #10b981; }
.title {
font-size: 18px;
font-weight: 700;
letter-spacing: 1.5px;
color: var(--text-desc);
text-transform: uppercase;
}
.content {
padding: 40px;
display: flex;
flex-direction: column;
gap: 32px;
}
.page-title {
display: flex;
align-items: center;
gap: 24px;
margin-bottom: 10px;
}
.bot-avatar {
width: 80px;
height: 80px;
border-radius: 50%;
border: 3px solid var(--accent);
}
.page-title h1 {
font-size: 48px;
background: linear-gradient(to right, #fff, #94a3b8);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
}
.page-title p {
color: var(--text-desc);
font-size: 20px;
margin-top: 12px;
}
.status-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(350px, 1fr));
gap: 24px;
}
.status-card {
background: var(--card-bg);
border: 1px solid var(--border-color);
border-radius: 14px;
padding: 24px;
position: relative;
overflow: hidden;
}
.status-card::before {
content: '';
position: absolute;
left: 0; top: 0; bottom: 0;
width: 4px;
background: var(--accent);
opacity: 0.6;
}
.card-title {
font-size: 20px;
font-weight: 700;
color: #fff;
margin-bottom: 16px;
}
.info-list {
list-style: none;
}
.info-item {
display: flex;
justify-content: space-between;
padding: 12px 0;
border-bottom: 1px solid var(--border-color);
font-size: 16px;
}
.info-item:last-child {
border-bottom: none;
}
.info-label {
color: var(--text-desc);
}
.info-value {
font-family: 'JetBrains Mono', monospace;
color: var(--text-title);
font-weight: 500;
}
.info-value.online { color: #22c55e; }
.info-value.offline { color: #ef4444; }
.command-stats {
column-count: 2;
column-gap: 24px;
}
.footer {
margin-top: auto;
padding: 32px 40px;
border-top: 1px solid var(--border-color);
background: rgba(0, 0, 0, 0.1);
color: var(--text-desc);
font-size: 14px;
text-align: center;
letter-spacing: 1px;
}
</style>
</head>
<body>
<div class="window">
<div class="header">
<div class="dots">
<div class="dot red"></div>
<div class="dot yellow"></div>
<div class="dot green"></div>
</div>
<div class="title">NeoBot System</div>
</div>
<div class="content">
<div class="page-title">
<img src="{{ bot_info.avatar_url }}" alt="Bot Avatar" class="bot-avatar">
<div>
<h1>{{ bot_info.nickname }} - 运行状态</h1>
<p>System Status & Information</p>
</div>
</div>
<div class="status-grid">
<div class="status-card">
<h2 class="card-title">核心指标</h2>
<ul class="info-list">
<li class="info-item">
<span class="info-label">机器人 QQ</span>
<span class="info-value">{{ bot_info.user_id }}</span>
</li>
<li class="info-item">
<span class="info-label">运行状态</span>
<span class="info-value {{ 'online' if status_info.online else 'offline' }}">{{ '在线' if status_info.online else '离线' }}</span>
</li>
<li class="info-item">
<span class="info-label">启动于</span>
<span class="info-value">{{ bot_info.start_time }}</span>
</li>
<li class="info-item">
<span class="info-label">已运行时长</span>
<span class="info-value">{{ bot_info.uptime }}</span>
</li>
</ul>
</div>
<div class="status-card">
<h2 class="card-title">消息与指令统计</h2>
<ul class="info-list">
<li class="info-item">
<span class="info-label">接收消息总数</span>
<span class="info-value">{{ stats.messages_received }}</span>
</li>
<li class="info-item">
<span class="info-label">发送消息总数</span>
<span class="info-value">{{ stats.messages_sent }}</span>
</li>
<li class="info-item">
<span class="info-label">指令调用总数</span>
<span class="info-value">{{ stats.total_commands }}</span>
</li>
</ul>
</div>
<div class="status-card">
<h2 class="card-title">系统资源 (System)</h2>
<ul class="info-list">
<li class="info-item">
<span class="info-label">CPU 占用</span>
<span class="info-value">{{ system.cpu_percent }}%</span>
</li>
<li class="info-item">
<span class="info-label">内存占用</span>
<span class="info-value">{{ system.mem_percent }}%</span>
</li>
<li class="info-item">
<span class="info-label">Bot 进程内存</span>
<span class="info-value">{{ system.bot_mem_mb }} MB</span>
</li>
</ul>
</div>
<div class="status-card">
<h2 class="card-title">调试信息 (Debug)</h2>
<ul class="info-list">
<li class="info-item">
<span class="info-label">客户端</span>
<span class="info-value">{{ version_info.app_name }}</span>
</li>
<li class="info-item">
<span class="info-label">客户端版本</span>
<span class="info-value">v{{ version_info.app_version }}</span>
</li>
<li class="info-item">
<span class="info-label">OneBot 协议</span>
<span class="info-value">{{ version_info.protocol_version }}</span>
</li>
</ul>
</div>
</div>
<div class="status-card">
<h2 class="card-title">指令调用排行榜</h2>
<div class="command-stats">
<ul class="info-list">
{% for command in command_stats %}
<li class="info-item">
<span class="info-label">/{{ command.name }}</span>
<span class="info-value">{{ command.count }} 次</span>
</li>
{% else %}
<li class="info-item">
<span class="info-label">暂无指令调用记录</span>
<span class="info-value"></span>
</li>
{% endfor %}
</ul>
</div>
</div>
</div>
<div class="footer">
NeoBot System - 运行状态面板
</div>
</div>
</body>
</html>