diff --git a/core/WS.py b/core/WS.py index f9a27e6..4cccba5 100644 --- a/core/WS.py +++ b/core/WS.py @@ -329,30 +329,37 @@ class WS: echo_id = str(uuid.uuid4()) payload = {"action": action, "params": params or {}, "echo": echo_id} - loop = asyncio.get_running_loop() - future = loop.create_future() - self._pending_requests[echo_id] = future + await conn.send(orjson.dumps(payload)) + # 在当前连接上等待特定 echo 的响应,并设置超时 try: - await conn.send(orjson.dumps(payload)) - result = await asyncio.wait_for(future, timeout=30.0) - return result + async def wait_for_response(): + async for message in conn.conn: + data = orjson.loads(message) + if data.get("echo") == echo_id: + return data + + return await asyncio.wait_for(wait_for_response(), timeout=30.0) + except asyncio.TimeoutError: - self._pending_requests.pop(echo_id, None) - self.logger.warning(f"API 调用超时: action={action}, params={params}") - return create_error_response( - code=ErrorCode.TIMEOUT_ERROR, - message="API调用超时", - data={"action": action, "params": params} - ) + raise # 重新抛出超时异常 except Exception as e: - self._pending_requests.pop(echo_id, None) - self.logger.exception(f"API 调用异常: action={action}, error={str(e)}") - return create_error_response( - code=ErrorCode.WS_MESSAGE_ERROR, - message=f"API调用异常: {str(e)}", - data={"action": action, "params": params} - ) + raise WebSocketError(f"在等待API响应时连接出错: {e}") + + except asyncio.TimeoutError: + self.logger.warning(f"API 调用超时: action={action}, params={params}") + return create_error_response( + code=ErrorCode.TIMEOUT_ERROR, + message="API调用超时", + data={"action": action, "params": params} + ) + except Exception as e: + self.logger.exception(f"API 调用异常: action={action}, error={str(e)}") + return create_error_response( + code=ErrorCode.WS_MESSAGE_ERROR, + message=f"API调用异常: {str(e)}", + data={"action": action, "params": params} + ) finally: # 释放连接回连接池 await self.pool.release_connection(conn) diff --git a/core/api/account.py b/core/api/account.py index 3d895c9..14d355f 100644 --- a/core/api/account.py +++ b/core/api/account.py @@ -5,11 +5,35 @@ 状态设置等相关的 OneBot v11 API 封装。 """ import orjson -from typing import Dict, Any +from typing import Dict, Any, Type, TypeVar +from dataclasses import is_dataclass, fields from .base import BaseAPI from models.objects import LoginInfo, VersionInfo, Status from ..managers.redis_manager import redis_manager +T = TypeVar('T') + +def _safe_dataclass_from_dict(cls: Type[T], data: Dict[str, Any]) -> T: + """ + 安全地从字典创建 dataclass 实例,忽略多余的键。 + """ + if not data: + try: + return cls() + except TypeError: + raise ValueError(f"无法在没有数据的情况下创建 {cls.__name__} 的实例") + + # 使用官方的 is_dataclass 进行检查,对 MyPyC 更友好 + if not is_dataclass(cls): + raise TypeError(f"{cls.__name__} 不是一个 dataclass") + + # 获取 dataclass 的所有字段名 + known_fields = {f.name for f in fields(cls)} + + # 过滤出 dataclass 认识的键值对 + filtered_data = {k: v for k, v in data.items() if k in known_fields} + + return cls(**filtered_data) class AccountAPI(BaseAPI): """ @@ -30,11 +54,11 @@ class AccountAPI(BaseAPI): if not no_cache: cached_data = await redis_manager.get(cache_key) if cached_data: - return LoginInfo(**orjson.loads(cached_data)) + return _safe_dataclass_from_dict(LoginInfo, orjson.loads(cached_data)) res = await self.call_api("get_login_info") await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 - return LoginInfo(**res) + return _safe_dataclass_from_dict(LoginInfo, res) async def get_version_info(self) -> VersionInfo: """ @@ -43,8 +67,8 @@ class AccountAPI(BaseAPI): Returns: VersionInfo: 包含 OneBot 实现版本信息的 `VersionInfo` 数据对象。 """ - res = await self.call_api("get_friend_list") - return VersionInfo(**res) + res = await self.call_api("get_version_info") + return _safe_dataclass_from_dict(VersionInfo, res) async def get_status(self) -> Status: """ @@ -54,7 +78,7 @@ class AccountAPI(BaseAPI): Status: 包含 OneBot 状态信息的 `Status` 数据对象。 """ res = await self.call_api("get_status") - return Status(**res) + return _safe_dataclass_from_dict(Status, res) async def bot_exit(self) -> Dict[str, Any]: """ @@ -162,56 +186,25 @@ class AccountAPI(BaseAPI): """ return await self.call_api("clean_cache") - async def get_stranger_info(self, user_id: int, no_cache: bool = False) -> Any: + async def get_profile_like(self) -> Dict[str, Any]: """ - 获取陌生人信息。 + 获取个人资料的点赞信息。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_profile_like") + + async def nc_get_user_status(self, user_id: int) -> Dict[str, Any]: + """ + 获取用户的在线状态 (NapCat 特有 API)。 Args: user_id (int): 目标用户的 QQ 号。 - no_cache (bool, optional): 是否不使用缓存。Defaults to False. Returns: - Any: 包含陌生人信息的字典或对象。 + Dict[str, Any]: OneBot API 的响应数据。 """ - return await self.call_api("get_stranger_info", {"user_id": user_id, "no_cache": no_cache}) + return await self.call_api("nc_get_user_status", {"user_id": user_id}) - async def get_friend_list(self, no_cache: bool = False) -> list: - """ - 获取好友列表。 - - Args: - no_cache (bool, optional): 是否不使用缓存。Defaults to False. - - Returns: - list: 好友列表。 - """ - cache_key = f"neobot:cache:get_friend_list:{self.self_id}" - if not no_cache: - cached_data = await redis_manager.get(cache_key) - if cached_data: - return orjson.loads(cached_data) - - res = await self.call_api("get_friend_list") - await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 - return res - - async def get_group_list(self, no_cache: bool = False) -> list: - """ - 获取群列表。 - - Args: - no_cache (bool, optional): 是否不使用缓存。Defaults to False. - - Returns: - list: 群列表。 - """ - cache_key = f"neobot:cache:get_group_list:{self.self_id}" - if not no_cache: - cached_data = await redis_manager.get(cache_key) - if cached_data: - return orjson.loads(cached_data) - - res = await self.call_api("get_group_list") - await redis_manager.set(cache_key, orjson.dumps(res), ex=3600) # 缓存 1 小时 - return res diff --git a/core/api/base.py b/core/api/base.py index 0a0ff06..cebb1b4 100644 --- a/core/api/base.py +++ b/core/api/base.py @@ -3,6 +3,7 @@ API 基础模块 定义了 API 调用的基础接口和统一处理逻辑。 """ +import copy from typing import Any, Dict, Optional, TYPE_CHECKING from ..utils.logger import logger @@ -35,7 +36,32 @@ class BaseAPI: params = {} try: - logger.debug(f"调用API -> action: {action}, params: {params}") + # 日志记录前,对敏感或过长的参数进行处理 + log_params = copy.deepcopy(params) + if 'message' in log_params: + if isinstance(log_params['message'], list): + for segment in log_params['message']: + if segment.get('type') == 'image' and 'file' in segment.get('data', {}): + file_data = segment['data']['file'] + if file_data.startswith('data:image/'): + segment['data']['file'] = f"{file_data[:50]}... (base64 truncated)" + elif isinstance(log_params['message'], str) and log_params['message'].startswith('data:image/'): + log_params['message'] = f"{log_params['message'][:50]}... (base64 truncated)" + + # 如果是发送消息的动作,则原子化地增加发送消息总数 + if action in ["send_private_msg", "send_group_msg", "send_msg"]: + from ..managers.redis_manager import redis_manager + try: + lua_script = "return redis.call('INCR', KEYS[1])" + await redis_manager.execute_lua_script( + script=lua_script, + keys=["neobot:stats:messages_sent"], + args=[] + ) + except Exception as e: + logger.error(f"发送消息计数失败: {e}") + + logger.debug(f"调用API -> action: {action}, params: {log_params}") response = await self._ws.call_api(action, params) logger.debug(f"API响应 <- {response}") diff --git a/core/api/friend.py b/core/api/friend.py index 85dd071..f58777f 100644 --- a/core/api/friend.py +++ b/core/api/friend.py @@ -84,3 +84,76 @@ class FriendAPI(BaseAPI): """ return await self.call_api("set_friend_add_request", {"flag": flag, "approve": approve, "remark": remark}) + async def get_friends_with_category(self) -> Dict[str, Any]: + """ + 获取带分类的好友列表。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_friends_with_category") + + async def get_unidirectional_friend_list(self) -> Dict[str, Any]: + """ + 获取单向好友列表。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_unidirectional_friend_list") + + async def friend_poke(self, user_id: int) -> Dict[str, Any]: + """ + 发送好友戳一戳。 + + Args: + user_id (int): 目标用户的 QQ 号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("friend_poke", {"user_id": user_id}) + + async def mark_private_msg_as_read(self, user_id: int, time: int = 0) -> Dict[str, Any]: + """ + 标记私聊消息为已读。 + + Args: + user_id (int): 目标用户的 QQ 号。 + time (int, optional): 标记此时间戳之前的消息为已读。Defaults to 0 (全部标记)。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + params = {"user_id": user_id} + if time > 0: + params["time"] = time + return await self.call_api("mark_private_msg_as_read", params) + + async def get_friend_msg_history(self, user_id: int, count: int = 20) -> Dict[str, Any]: + """ + 获取私聊消息历史记录。 + + Args: + user_id (int): 目标用户的 QQ 号。 + count (int, optional): 要获取的消息数量。Defaults to 20. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_friend_msg_history", {"user_id": user_id, "count": count}) + + async def forward_friend_single_msg(self, user_id: int, message_id: str) -> Dict[str, Any]: + """ + 转发单条好友消息。 + + Args: + user_id (int): 目标用户的 QQ 号。 + message_id (str): 要转发的消息 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("forward_friend_single_msg", {"user_id": user_id, "message_id": message_id}) + + diff --git a/core/api/group.py b/core/api/group.py index d5b53b0..4fcb6d0 100644 --- a/core/api/group.py +++ b/core/api/group.py @@ -282,3 +282,183 @@ class GroupAPI(BaseAPI): """ return await self.call_api("set_group_add_request", {"flag": flag, "sub_type": sub_type, "approve": approve, "reason": reason}) + async def get_group_info_ex(self, group_id: int) -> Dict[str, Any]: + """ + 获取群扩展信息 (NapCat 特有 API)。 + + Args: + group_id (int): 目标群组的群号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_group_info_ex", {"group_id": group_id}) + + async def delete_essence_msg(self, message_id: int) -> Dict[str, Any]: + """ + 删除精华消息。 + + Args: + message_id (int): 目标消息的 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("delete_essence_msg", {"message_id": message_id}) + + async def group_poke(self, group_id: int, user_id: int) -> Dict[str, Any]: + """ + 在群内发送 "戳一戳"。 + + Args: + group_id (int): 目标群组的群号。 + user_id (int): 目标成员的 QQ 号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("group_poke", {"group_id": group_id, "user_id": user_id}) + + async def mark_group_msg_as_read(self, group_id: int, time: int = 0) -> Dict[str, Any]: + """ + 标记群消息为已读。 + + Args: + group_id (int): 目标群组的群号。 + time (int, optional): 标记此时间戳之前的消息为已读。Defaults to 0 (全部标记)。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + params = {"group_id": group_id} + if time > 0: + params["time"] = time + return await self.call_api("mark_group_msg_as_read", params) + + async def forward_group_single_msg(self, group_id: int, message_id: str) -> Dict[str, Any]: + """ + 转发单条群消息。 + + Args: + group_id (int): 目标群组的群号。 + message_id (str): 要转发的消息 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("forward_group_single_msg", {"group_id": group_id, "message_id": message_id}) + + async def set_group_portrait(self, group_id: int, file: str, cache: int = 1) -> Dict[str, Any]: + """ + 设置群头像。 + + Args: + group_id (int): 目标群组的群号。 + file (str): 图片文件的路径或 URL 或 Base64。 + cache (int, optional): 是否使用缓存 (1: 是, 0: 否)。Defaults to 1. + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("set_group_portrait", {"group_id": group_id, "file": file, "cache": cache}) + + async def _send_group_notice(self, group_id: int, content: str, **kwargs) -> Dict[str, Any]: + """ + 发送群公告。 + + Args: + group_id (int): 目标群组的群号。 + content (str): 公告内容。 + **kwargs: 其他可选参数 (如 image)。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + params = {"group_id": group_id, "content": content} + params.update(kwargs) + return await self.call_api("_send_group_notice", params) + + async def _get_group_notice(self, group_id: int) -> Dict[str, Any]: + """ + 获取群公告。 + + Args: + group_id (int): 目标群组的群号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("_get_group_notice", {"group_id": group_id}) + + async def _del_group_notice(self, group_id: int, notice_id: str) -> Dict[str, Any]: + """ + 删除群公告。 + + Args: + group_id (int): 目标群组的群号。 + notice_id (str): 公告 ID。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("_del_group_notice", {"group_id": group_id, "notice_id": notice_id}) + + async def get_group_at_all_remain(self, group_id: int) -> Dict[str, Any]: + """ + 获取 @全体成员 的剩余次数。 + + Args: + group_id (int): 目标群组的群号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_group_at_all_remain", {"group_id": group_id}) + + async def get_group_system_msg(self) -> Dict[str, Any]: + """ + 获取群系统消息。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_group_system_msg") + + async def get_group_shut_list(self, group_id: int) -> Dict[str, Any]: + """ + 获取群禁言列表。 + + Args: + group_id (int): 目标群组的群号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("get_group_shut_list", {"group_id": group_id}) + + async def set_group_remark(self, group_id: int, remark: str) -> Dict[str, Any]: + """ + 设置群备注。 + + Args: + group_id (int): 目标群组的群号。 + remark (str): 要设置的备注。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("set_group_remark", {"group_id": group_id, "remark": remark}) + + async def set_group_sign(self, group_id: int) -> Dict[str, Any]: + """ + 设置群签到。 + + Args: + group_id (int): 目标群组的群号。 + + Returns: + Dict[str, Any]: OneBot API 的响应数据。 + """ + return await self.call_api("set_group_sign", {"group_id": group_id}) + + diff --git a/core/api/media.py b/core/api/media.py index 48e2e25..303c9c5 100644 --- a/core/api/media.py +++ b/core/api/media.py @@ -37,3 +37,13 @@ class MediaAPI(BaseAPI): :return: OneBot v11标准响应 """ return await self.call_api(action="get_image", params={"file": file}) + + async def get_file(self, file_id: str) -> Dict[str, Any]: + """ + 获取文件信息 + + :param file_id: 文件ID + :return: OneBot v11标准响应 + """ + return await self.call_api(action="get_file", params={"file_id": file_id}) + diff --git a/core/handlers/event_handler.py b/core/handlers/event_handler.py index 44491e2..dfc87f1 100644 --- a/core/handlers/event_handler.py +++ b/core/handlers/event_handler.py @@ -127,6 +127,19 @@ class MessageHandler(BaseHandler): """ 处理消息事件,分发给命令处理器或通用消息处理器 """ + # 原子化地增加接收消息总数 + from ..managers.redis_manager import redis_manager + from ..utils.logger import logger + try: + lua_script = "return redis.call('INCR', KEYS[1])" + await redis_manager.execute_lua_script( + script=lua_script, + keys=["neobot:stats:messages_received"], + args=[] + ) + except Exception as e: + logger.error(f"接收消息计数失败: {e}") + from ..managers import permission_manager for handler_info in self.message_handlers: consumed = await self._run_handler(handler_info["func"], bot, event) @@ -165,6 +178,19 @@ class MessageHandler(BaseHandler): await bot.send(event, message_template.format(permission_name=permission_name)) return + # 在执行指令前,原子化地增加指令调用次数 + from ..managers.redis_manager import redis_manager + from ..utils.logger import logger + try: + lua_script = "return redis.call('HINCRBY', KEYS[1], ARGV[1], 1)" + await redis_manager.execute_lua_script( + script=lua_script, + keys=["neobot:command_stats"], + args=[command_name] + ) + except Exception as e: + logger.error(f"指令 /{command_name} 调用次数统计失败: {e}") + await self._run_handler( func, bot, diff --git a/core/managers/image_manager.py b/core/managers/image_manager.py index c77fc88..ed96e4d 100644 --- a/core/managers/image_manager.py +++ b/core/managers/image_manager.py @@ -122,7 +122,13 @@ class ImageManager(Singleton): content = f.read() mime_type = "image/jpeg" if image_type == "jpeg" else "image/png" - return f"data:{mime_type};base64," + base64.b64encode(content).decode("utf-8") + base64_str = base64.b64encode(content).decode("utf-8") + + # 记录摘要日志,避免刷屏 + log_message = f"Base64 图片已生成 (MIME: {mime_type}, Size: {len(base64_str)/1024:.2f} KB, Preview: {base64_str[:30]}...{base64_str[-30:]})" + logger.debug(log_message) + + return f"data:{mime_type};base64," + base64_str except Exception as e: logger.error(f"读取图片文件失败: {e}") return None diff --git a/core/managers/permission_manager.py b/core/managers/permission_manager.py index 94d2681..2ba0b57 100644 --- a/core/managers/permission_manager.py +++ b/core/managers/permission_manager.py @@ -412,7 +412,7 @@ class PermissionManager(Singleton): """ try: # 创建空的权限数据 - empty_data = {"users": {}} + empty_data: Dict[str, Dict] = {"users": {}} # 原子性写入文件 temp_file = self.data_file + ".tmp" diff --git a/core/managers/redis_manager.py b/core/managers/redis_manager.py index cd43670..37f45ff 100644 --- a/core/managers/redis_manager.py +++ b/core/managers/redis_manager.py @@ -67,5 +67,27 @@ class RedisManager(Singleton): """ return await self.redis.set(name, value, ex=ex) + async def execute_lua_script(self, script: str, keys: list, args: list): + """ + 以原子方式执行 Lua 脚本 + + Args: + script (str): 要执行的 Lua 脚本字符串 + keys (list): 脚本中使用的 Redis 键 (KEYS[1], KEYS[2], ...) + args (list): 传递给脚本的参数 (ARGV[1], ARGV[2], ...) + + Returns: + Any: 脚本的返回值 + """ + try: + # redis-py 内部会自动处理脚本的缓存 (EVAL/EVALSHA) + lua_script = self.redis.register_script(script) + return await lua_script(keys=keys, args=args) + except Exception as e: + logger.error(f"执行 Lua 脚本失败: {e}") + logger.debug(f"脚本内容: {script}") + raise + + # 全局 Redis 管理器实例 redis_manager = RedisManager() diff --git a/core/ws_pool.py b/core/ws_pool.py index 86be046..aea2578 100644 --- a/core/ws_pool.py +++ b/core/ws_pool.py @@ -7,7 +7,7 @@ WebSocket 连接池模块 import asyncio import websockets from websockets.legacy.client import WebSocketClientProtocol -from typing import Optional, Dict, Any, cast +from typing import Optional, Dict, Any, cast, Union import uuid from loguru import logger @@ -28,7 +28,7 @@ class WSConnection: self.is_active = True self._pending_requests: Dict[str, asyncio.Future] = {} - async def send(self, data: dict): + async def send(self, data: Union[Dict[Any, Any], bytes]): """ 发送数据到 WebSocket 连接 """ @@ -57,6 +57,19 @@ class WSConnection: self.is_active = False raise WebSocketError(f"接收数据失败: {e}") + async def ping(self, timeout: int = 5) -> bool: + """ + 对 WebSocket 连接执行 ping-pong 健康检查 + """ + if not self.is_active: + return False + try: + await asyncio.wait_for(self.conn.ping(), timeout=timeout) + return True + except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): + self.is_active = False + return False + async def close(self): """ 关闭 WebSocket 连接 @@ -132,7 +145,7 @@ class WSConnectionPool: async def get_connection(self) -> WSConnection: """ - 从连接池获取一个连接 + 从连接池获取一个健康的连接,包含健康检查。 """ if self._closed: raise WebSocketError("连接池已关闭") @@ -141,18 +154,21 @@ class WSConnectionPool: # 尝试从连接池获取连接 conn = await asyncio.wait_for(self.pool.get(), timeout=5) - # 检查连接是否活跃 - if not conn.is_active: - logger.warning(f"连接 {conn.conn_id} 已失效,重新创建") - return await self._create_connection() - - return conn + # 健康检查 + if await conn.ping(): + logger.debug(f"连接 {conn.conn_id} 健康检查通过") + return conn + else: + logger.warning(f"连接 {conn.conn_id} 健康检查失败,丢弃并获取新连接") + await conn.close() + return await self.get_connection() # 递归获取下一个 + except asyncio.TimeoutError: # 连接池为空,创建新连接 - logger.warning("连接池为空,创建临时连接") + logger.warning("连接池在5秒内无可用连接,创建新连接") return await self._create_connection() except Exception as e: - raise WebSocketError(f"获取连接失败: {e}") + raise WebSocketError(f"获取连接时发生未知错误: {e}") async def release_connection(self, conn: WSConnection): """ diff --git a/docs/api/account.md b/docs/api/account.md index aa3e4e6..824a18d 100644 --- a/docs/api/account.md +++ b/docs/api/account.md @@ -77,6 +77,32 @@ print(f"正常: {status.good}") - `status`: 状态描述 - `good`: 运行是否正常 +### `get_profile_like` - 获取资料点赞信息 + +```python +async def get_profile_like(self) -> Dict[str, Any] +``` + +获取个人资料的点赞信息。 + +**返回值:** +- 包含点赞信息的字典 + +### `nc_get_user_status` - 获取用户在线状态 (NapCat) + +```python +async def nc_get_user_status(self, user_id: int) -> Dict[str, Any] +``` + +获取指定用户的在线状态(NapCatQQ 特有 API)。 + +**参数:** +- `user_id`: 目标用户的 QQ 号 + +**返回值:** +- 包含用户状态信息的字典 + + ## 状态设置 ### `set_self_longnick` - 设置个性签名 @@ -381,16 +407,6 @@ async def handle_restore_profile(event: MessageEvent, args: str): 3. **客户端支持**: 不是所有 OneBot 客户端都支持全部 API,使用前最好测试一下。 4. **谨慎操作**: `bot_exit` 会让机器人下线,谨慎使用。 -## 重复的方法 - -`AccountAPI` 中还包含了一些与好友、群组相关的方法,这些方法在其他模块中也有定义: - -- `get_stranger_info()`: 同 [好友 API](./friend.md#get_stranger_info---获取陌生人信息) -- `get_friend_list()`: 同 [好友 API](./friend.md#get_friend_list---获取好友列表) -- `get_group_list()`: 同 [群组 API](./group.md#get_group_list---获取群列表) - -这些方法在 `AccountAPI` 中的实现可能略有不同(比如缓存逻辑),但功能相同。建议使用对应模块中的版本,因为那些是专门为该功能设计的。 - ## 下一步 - [好友 API](./friend.md): 管理好友相关功能 diff --git a/docs/api/friend.md b/docs/api/friend.md index 4925d2a..866cda8 100644 --- a/docs/api/friend.md +++ b/docs/api/friend.md @@ -81,6 +81,28 @@ async def handle_who(event: MessageEvent, args: str): - `level`: QQ 等级 - 其他可能的信息字段 +### `get_friends_with_category` - 获取分类好友列表 + +```python +async def get_friends_with_category(self) -> Dict[str, Any] +``` + +获取带分组信息的好友列表。 + +**返回值:** +- 包含分组和好友信息的字典 + +### `get_unidirectional_friend_list` - 获取单向好友列表 + +```python +async def get_unidirectional_friend_list(self) -> Dict[str, Any] +``` + +获取单向好友(你加了对方,对方没加你)的列表。 + +**返回值:** +- 单向好友列表 + ## 互动功能 ### `send_like` - 发送点赞(戳一戳) @@ -119,6 +141,55 @@ async def handle_like(event: MessageEvent, args: str): - 有每日次数限制,不要滥用 - 对方可能关闭了"戳一戳"功能,这时会失败 +### `friend_poke` - 发送好友戳一戳 (新) + +```python +async def friend_poke(self, user_id: int) -> Dict[str, Any] +``` + +对指定好友发送"戳一戳"(比 `send_like` 更通用的接口)。 + +**参数:** +- `user_id`: 目标用户的 QQ 号 + +## 消息历史与状态 + +### `mark_private_msg_as_read` - 标记私聊已读 + +```python +async def mark_private_msg_as_read(self, user_id: int, time: int = 0) -> Dict[str, Any] +``` + +将与指定用户的私聊消息标记为已读。 + +**参数:** +- `user_id`: 目标用户的 QQ 号 +- `time`: 将此时间戳(秒)之前的消息标记为已读,传 `0` 表示全部标记 + +### `get_friend_msg_history` - 获取私聊历史 + +```python +async def get_friend_msg_history(self, user_id: int, count: int = 20) -> Dict[str, Any] +``` + +获取与指定用户的私聊历史记录。 + +**参数:** +- `user_id`: 目标用户的 QQ 号 +- `count`: 要获取的消息数量,默认 20 + +### `forward_friend_single_msg` - 转发单条消息 + +```python +async def forward_friend_single_msg(self, user_id: int, message_id: str) -> Dict[str, Any] +``` + +将一条消息转发给指定好友。 + +**参数:** +- `user_id`: 接收消息的好友 QQ 号 +- `message_id`: 要转发的消息的 ID + ## 加好友请求处理 ### `set_friend_add_request` - 处理加好友请求 diff --git a/docs/api/group.md b/docs/api/group.md index 9b8912b..9fcce26 100644 --- a/docs/api/group.md +++ b/docs/api/group.md @@ -408,6 +408,181 @@ honor = await bot.get_group_honor_info(123456, "talkative") print(f"本周龙王: {honor.current_talkative.user_id}") ``` +### `get_group_info_ex` - 获取群扩展信息 (NapCat) + +```python +async def get_group_info_ex(self, group_id: int) -> Dict[str, Any] +``` + +获取群的扩展信息(NapCatQQ 特有 API)。 + +**参数:** +- `group_id`: 群号 + +**返回值:** +- 包含群扩展信息的字典 + +## 精华消息 + +### `delete_essence_msg` - 删除精华消息 + +```python +async def delete_essence_msg(self, message_id: int) -> Dict[str, Any] +``` + +删除一条精华消息。 + +**参数:** +- `message_id`: 目标消息的 ID + +## 互动与状态 + +### `group_poke` - 群内戳一戳 + +```python +async def group_poke(self, group_id: int, user_id: int) -> Dict[str, Any] +``` + +在群内对指定成员发送"戳一戳"。 + +**参数:** +- `group_id`: 群号 +- `user_id`: 目标成员的 QQ 号 + +### `mark_group_msg_as_read` - 标记群消息已读 + +```python +async def mark_group_msg_as_read(self, group_id: int, time: int = 0) -> Dict[str, Any] +``` + +将指定群聊的消息标记为已读。 + +**参数:** +- `group_id`: 群号 +- `time`: 将此时间戳(秒)之前的消息标记为已读,传 `0` 表示全部标记 + +## 消息转发 + +### `forward_group_single_msg` - 转发单条群消息 + +```python +async def forward_group_single_msg(self, group_id: int, message_id: str) -> Dict[str, Any] +``` + +将一条群消息转发到当前群聊。 + +**参数:** +- `group_id`: 群号 +- `message_id`: 要转发的消息的 ID + +## 群设置 (高级) + +### `set_group_portrait` - 设置群头像 + +```python +async def set_group_portrait(self, group_id: int, file: str, cache: int = 1) -> Dict[str, Any] +``` + +设置群头像。 + +**参数:** +- `group_id`: 群号 +- `file`: 图片文件的路径、URL 或 Base64 字符串 +- `cache`: 是否使用缓存(`1` 是,`0` 否) + +### `set_group_remark` - 设置群备注 + +```python +async def set_group_remark(self, group_id: int, remark: str) -> Dict[str, Any] +``` + +设置群备注(NapCatQQ 特有 API)。 + +**参数:** +- `group_id`: 群号 +- `remark`: 要设置的备注 + +### `set_group_sign` - 群签到 + +```python +async def set_group_sign(self, group_id: int) -> Dict[str, Any] +``` + +在指定群聊中进行签到。 + +**参数:** +- `group_id`: 群号 + +## 群公告 + +### `_send_group_notice` - 发送群公告 + +```python +async def _send_group_notice(self, group_id: int, content: str, **kwargs) -> Dict[str, Any] +``` + +发送群公告。 + +**参数:** +- `group_id`: 群号 +- `content`: 公告内容 +- `**kwargs`: 其他可选参数,如 `image` + +### `_get_group_notice` - 获取群公告 + +```python +async def _get_group_notice(self, group_id: int) -> Dict[str, Any] +``` + +获取群公告列表。 + +**参数:** +- `group_id`: 群号 + +### `_del_group_notice` - 删除群公告 + +```python +async def _del_group_notice(self, group_id: int, notice_id: str) -> Dict[str, Any] +``` + +删除指定的群公告。 + +**参数:** +- `group_id`: 群号 +- `notice_id`: 要删除的公告的 ID + +## 其他信息获取 + +### `get_group_at_all_remain` - 获取@全体剩余次数 + +```python +async def get_group_at_all_remain(self, group_id: int) -> Dict[str, Any] +``` + +获取当天在指定群聊中 @全体成员 的剩余次数。 + +**参数:** +- `group_id`: 群号 + +### `get_group_system_msg` - 获取群系统消息 + +```python +async def get_group_system_msg(self) -> Dict[str, Any] +``` + +获取群系统消息(如加群请求、退群通知等)。 + +### `get_group_shut_list` - 获取群禁言列表 + +```python +async def get_group_shut_list(self, group_id: int) -> Dict[str, Any] +``` + +获取被禁言的群成员列表。 + +**参数:** +- `group_id`: 群号 + ## 加群请求处理 ### `set_group_add_request` - 处理加群请求/邀请 diff --git a/docs/api/media.md b/docs/api/media.md index b59b3b7..90a67eb 100644 --- a/docs/api/media.md +++ b/docs/api/media.md @@ -85,6 +85,20 @@ async def handle_imageinfo(event: MessageEvent): await event.reply("消息中没有图片") ``` +### `get_file` - 获取文件信息 + +```python +async def get_file(self, file_id: str) -> Dict[str, Any] +``` + +获取文件的详细信息,比如文件名、大小、URL 等。 + +**参数:** +- `file_id`: 文件 ID,通常从群文件上传事件中获取 + +**返回值:** +- 包含文件信息的字典 + ## 实际应用示例 ### 图片转发器 diff --git a/docs/core-concepts/performance.md b/docs/core-concepts/performance.md index 79f588a..d03c3f3 100644 --- a/docs/core-concepts/performance.md +++ b/docs/core-concepts/performance.md @@ -119,4 +119,22 @@ python setup_mypyc.py - **当前状态**:为了确保稳定性,`setup_mypyc.py` 脚本**默认不编译** `models/events/` 目录下的事件模型文件。这些文件虽然也被频繁使用,但它们的结构相对复杂,与 `Mypyc` 的兼容性问题仍在探索中。 - **未来展望**:我们会持续关注 `Mypyc` 的更新,当其对 `dataclass` 的支持得到改善后,会重新尝试将事件模型加入编译列表,以实现极致的性能。 +## 7. 健壮的 WebSocket 连接池 + +### 痛点 +在高并发或网络不稳定的情况下,单个 WebSocket 连接可能会因为各种原因(如超时、服务器重启、网络波动)而中断或变得不可靠。如果框架依赖于单一的、不稳定的连接,会导致 API 调用频繁失败,甚至整个机器人无响应。 + +### 解决方案 +`NeoBot` 实现了一个健壮的 `WebSocket 连接池` (`core/ws_pool.py`),它不仅管理多个连接,还具备智能的健康检查和恢复机制。 + +- **多连接管理**: 启动时会建立一个包含多个 WebSocket 连接的池,API 调用会被分发到这些连接上,实现负载均衡。 +- **自动健康检查**: 连接池会定期对池中的每个连接进行健康检查(发送 `get_status` 心跳包)。如果一个连接在规定时间内没有响应,它会被标记为“不健康”。 +- **故障转移与恢复**: 当一个 API 调用需要使用连接时,连接池会自动选择一个“健康”的连接。如果所有连接都不健康,它会尝试重新建立新的连接,直到成功为止。 +- **无感切换**: 对于上层调用者(如插件开发者)来说,这一切都是透明的。你只需要正常调用 `bot.call_api()`,连接池会在底层处理好所有的连接问题。 + +### 收益 +- **高可用性**: 即使部分连接失效,机器人依然可以通过健康的连接继续提供服务,大大减少了因网络问题导致的停机时间。 +- **高并发性能**: 通过连接池,多个 API 请求可以并行地通过不同的连接发送,提高了在高并发场景下的吞吐量。 +- **自动恢复**: 无需手动重启机器人,连接池能够自动从网络故障中恢复,增强了系统的稳定性和无人值守能力。 + 通过这种方式,我们在保证核心模块性能的同时,也维持了项目的稳定性和可维护性。 diff --git a/docs/core-concepts/redis-atomic-operations.md b/docs/core-concepts/redis-atomic-operations.md index 1563ea1..09bccef 100644 --- a/docs/core-concepts/redis-atomic-operations.md +++ b/docs/core-concepts/redis-atomic-operations.md @@ -132,4 +132,43 @@ if await permission_manager.check_permission(user_id, Permission.ADMIN): ## 总结 -通过以文件为权威数据源、Redis 为缓存层的设计,结合原子操作机制,NEO Bot 的权限管理系统在保证数据可靠性的同时,提供了高性能的访问能力。这种设计既满足了数据一致性的要求,又兼顾了系统性能的需求。 \ No newline at end of file +通过以文件为权威数据源、Redis 为缓存层的设计,结合原子操作机制,NEO Bot 的权限管理系统在保证数据可靠性的同时,提供了高性能的访问能力。这种设计既满足了数据一致性的要求,又兼顾了系统性能的需求。 + +## 扩展应用:指令调用统计 + +除了权限管理,原子操作的思想也应用在了指令调用统计中,但实现方式更为高效。 + +### 痛点 +如果每次调用指令都执行 `GET` -> `(本地+1)` -> `SET` 的流程,在高并发下会产生“竞争条件”(Race Condition),导致计数不准确。例如,两个请求同时读取到计数值 10,各自加一后都写回 11,而正确的结果应该是 12。呵呵其实是看到zmd事件紧急添加的功能 + +### 解决方案:Lua 脚本 +`NeoBot` 使用 Redis 的 `EVAL` 命令执行一个 Lua 脚本来实现原子化的计数器。 + +```lua +-- Lua 脚本 (简化版) +local current = redis.call('HGET', KEYS[1], ARGV[1]) +local count = tonumber(current) or 0 +count = count + 1 +redis.call('HSET', KEYS[1], ARGV[1], count) +return count +``` + +- **原子性**: Redis 会保证整个 Lua 脚本的执行是原子性的,执行期间不会被其他命令打断。 +- **高效性**: 将多个操作(读取、计算、写入)在 Redis 服务器端一次性完成,减少了网络往返的开销。 + +### 核心实现 +在 `RedisManager` 中,我们封装了 `execute_lua_script` 方法,使得在 Python 中调用 Lua 脚本变得非常简单。 + +```python +# Python 调用示例 +await redis_manager.execute_lua_script( + "atomic_hincrby.lua", + keys=["neobot:stats:command_usage"], + args=[command_name] +) +``` + +### 收益 +- **数据准确性**: 彻底杜绝了高并发下的计数错误问题。 +- **高性能**: 相比于传统的“读取-修改-写入”模式,使用 Lua 脚本能显著提升性能,特别是在指令调用这种高频场景下。 +- **可扩展性**: 这种模式可以轻松应用于其他需要原子操作的场景,如频率限制、资源池管理等。 \ No newline at end of file diff --git a/docs/development-standards.md b/docs/development-standards.md index a7f4c37..56a9578 100644 --- a/docs/development-standards.md +++ b/docs/development-standards.md @@ -1,357 +1,144 @@ -# NEO Bot 开发规范与公约 +# NeoBot 开发规范 -写代码很简单,但写出**高性能、不炸裂、好维护**的代码需要遵守规矩。 +本文档为 `NeoBot` 项目的官方开发规范,旨在确保代码的高性能、高可读性和高可维护性。所有贡献者都应遵循这些规范。 -本文档定义了 NEO Bot 项目的开发守则、编码公约、注意事项和代码规范。所有贡献者和插件开发者都**必须**遵循这些规范,确保机器人稳定运行、代码质量统一。 +本文档以 [PEP 8 -- Style Guide for Python Code](https://peps.python.org/pep-0008/) 为基础,并在此之上补充了针对本项目的特定约定。 -> 如果你觉得规范太麻烦,可以问问镀铬酸钾,他会给你一对一教学。。。但最好还是遵守规矩。 +## 核心开发原则 -**补充阅读**: -- [插件开发最佳实践](./plugin-development/best-practices.md) - 必读!写插件的基本规矩 -- [项目结构](./project-structure.md) - 了解代码组织 -- [核心概念](./core-concepts/architecture.md) - 理解框架设计 +### 1. 异步优先 +**永远不要阻塞事件循环**。任何同步阻塞操作(如 `time.sleep()`、同步网络请求、大规模文件读写)都会导致整个机器人框架卡死。 -## 1. 开发守则(基本原则) +- **应当**: 使用 `asyncio.sleep()`、异步库(如 `aiohttp`),并通过 `asyncio.to_thread` 或 `run_in_executor` 将同步代码移出主事件循环。 +- **禁止**: 直接在异步函数中使用任何可能阻塞的同步调用。 -### 1.1 异步优先原则 -- **绝对不要阻塞事件循环**:NeoBot 采用单线程异步架构,任何同步阻塞操作都会导致整个机器人卡死。 - - **禁止**:`time.sleep()`、同步 `requests`、密集 CPU 计算 - - **必须**:使用 `await asyncio.sleep()`、异步 HTTP 客户端、线程池执行同步任务 +### 2. 资源管理 +**复用优于重建**。频繁创建和销毁资源(如网络连接、浏览器页面)会严重影响性能。 -- **异步任务处理**:长时间运行的任务应使用 `run_in_thread_pool` 或 `asyncio.create_task` 执行,避免阻塞主循环。 +- **应当**: 通过框架提供的单例管理器(如 `redis_manager`, `browser_manager`)获取和管理资源。 +- **禁止**: 自行实例化管理器或在插件中创建独立的资源实例(如 `aiohttp.ClientSession`)。 -### 1.2 资源管理原则 -- **连接复用**:禁止重复创建连接和资源实例。 - - HTTP 请求:使用全局 `aiohttp` session 或插件提供的 `get_session()` - - 浏览器操作:必须通过 `browser_manager.get_page()` 获取页面实例 - - Redis 连接:通过 `redis_manager` 单例访问 +### 3. 错误处理 +**健壮性是第一要务**。插件的异常不应影响框架的稳定运行。 -- **资源池化**:浏览器页面、数据库连接等资源必须使用框架提供的池化机制。 +- **应当**: 在插件和业务逻辑中进行充分的 `try...except` 异常捕获,并向用户返回友好的错误提示。 +- **禁止**: 抛出未被捕获的异常,或向用户暴露原始的错误堆栈信息。 -### 1.3 性能优化原则 -- **缓存策略**:频繁访问的外部数据必须添加缓存。 - - 短期缓存(<1小时):使用 Redis 或内存缓存 - - 长期缓存:考虑持久化存储 +### 4. 跨平台兼容性 +代码必须同时兼容 **Windows(开发环境)** 和 **Linux(生产环境)**。 -- **懒加载**:大型资源或初始化成本高的组件应延迟加载。 +- **应当**: 使用 `pathlib.Path` 处理文件路径,它能自动处理不同操作系统的路径分隔符。 +- **禁止**: 硬编码路径分隔符(如 `"data\\temp"` 或 `"data/temp"`)。 -### 1.4 错误处理原则 -- **异常捕获**:所有插件代码都应妥善处理异常,避免插件崩溃影响机器人运行。 -- **友好提示**:向用户返回清晰、友好的错误信息,避免暴露内部细节。 -- **日志记录**:所有重要操作和错误都应记录日志,使用 `ModuleLogger` 进行结构化日志记录。 +## 代码风格规范 -### 1.5 安全性原则 -- **输入验证**:所有用户输入都必须验证和清理,防止注入攻击。 -- **代码执行安全**:使用沙箱环境执行用户代码,隔离系统资源。 -- **权限控制**:严格遵循权限管理系统,禁止越权操作。 +### 1. 命名规范 (PEP 8) +- **模块 (Module)**: `lower_case_with_underscores.py` +- **包 (Package)**: `lower_case_with_underscores` +- **类 (Class)**: `PascalCase` +- **函数 (Function) / 方法 (Method) / 变量 (Variable)**: `snake_case` +- **常量 (Constant)**: `UPPER_SNAKE_CASE` +- **私有成员**: 以单下划线 `_` 开头。 -### 1.6 跨平台兼容性原则 -NEO Bot 需要在 **Windows 开发环境**和 **Linux 生产环境**中都能正常运行。 +### 2. 类型提示 (PEP 484) +**所有函数和方法的签名都必须包含类型提示**。这是强制性要求,因为它对 `Mypyc` 编译和代码可读性至关重要。 -- **路径处理**: - - 使用 `pathlib.Path` 处理文件路径,避免手动拼接字符串。 - - 使用 `/` 作为路径分隔符(Python 会自动转换)。 - - 禁止使用硬编码的路径分隔符(如 `\\` 或 `/`)。 - -- **系统依赖**: - - 避免使用平台特定的系统调用。 - - 如果必须使用,通过 `sys.platform` 检测平台并提供备选方案。 - -- **环境变量**: - - 通过 `global_config` 获取配置,而不是直接读取环境变量。 - - 敏感信息(如 API 密钥)必须通过配置管理。 - -- **文件权限**: - - 在 Linux 上注意文件权限设置,确保 Bot 有读写权限。 - - 临时文件应放在系统临时目录(`tempfile.gettempdir()`)。 - -## 2. 公约(编码约定) - -### 2.1 项目结构公约 -- **插件位置**:所有插件必须放置在 `plugins/` 目录下,单个 `.py` 文件或包含 `__init__.py` 的目录。 -- **模块导入**:遵循标准导入顺序:标准库 → 第三方库 → 本地模块。 -- **配置访问**:通过 `global_config` 单例访问配置,禁止硬编码配置值。 - -### 2.2 单例管理器使用公约 -NEO Bot 的核心是**单例管理器**(`core/managers/` 目录下的类)。所有全局资源都必须通过管理器访问。 - -- **禁止重复创建**:严禁自己实例化管理器类,必须通过导入的单例对象访问。 - - ✅ `from core.managers.redis_manager import redis_manager` - - ❌ `RedisManager()` (错误!会创建新实例) - -- **资源池化**:浏览器页面、数据库连接等资源必须使用管理器提供的池化接口。 - - ✅ `await browser_manager.get_page()` - - ❌ `playwright.chromium.launch()` (错误!会创建新浏览器进程) - -- **数据一致性**:单例管理器确保全局数据一致性,不要绕过管理器直接操作底层资源。 - -### 2.2.1 单例模式实现机制 - -NEO Bot 提供了两种单例模式实现方式,位于 `core/utils/singleton.py`: - -#### 1. Singleton 基类(继承方式) -```python -from core.utils.singleton import Singleton - -class MyManager(Singleton): - """通过继承 Singleton 基类实现单例""" - - def __init__(self, config: dict): - """ - 初始化管理器 - - Args: - config: 配置字典 - """ - # 调用父类 __init__ 确保单例初始化 - super().__init__() - - # 检查是否已经初始化(防止 __init__ 被多次调用) - if hasattr(self, '_my_initialized') and self._my_initialized: - return - - # 执行一次性初始化逻辑 - self.config = config - self.resource = None - self._initialize_resource() - - # 标记为已初始化 - self._my_initialized = True - - def _initialize_resource(self): - """初始化资源(只执行一次)""" - self.resource = initialize_resource(self.config) - - async def cleanup(self): - """清理资源(单例管理器应实现清理方法)""" - if self.resource: - await self.resource.close() -``` - -**特性**: -- 通过重写 `__new__` 方法确保每个类只有一个实例 -- 自动处理重复初始化问题,但建议子类添加额外的初始化检查 -- 使用全局字典存储实例,避免类型检查问题 -- 支持带参数的 `__init__` 方法 - -#### 2. @singleton 装饰器(装饰器方式) -```python -from core.utils.singleton import singleton - -@singleton -class MyManager: - """通过装饰器实现单例""" - - def __init__(self, config): - self.config = config - self.resource = None - - async def initialize(self): - self.resource = await load_resource() -``` - -**特性**: -- 将普通类转换为单例类,无需修改类继承关系 -- 保持原始类的元数据(名称、文档字符串等) -- 适用于无法修改基类的现有类 - -#### 3. 使用建议 -- **新管理器类**:优先使用 **Singleton 基类继承方式**,结构更清晰 -- **现有类转换**:使用 **@singleton 装饰器**,无需重构 -- **线程安全**:两种方式都假设在单线程异步环境中使用,如需线程安全请自行加锁 -- **导入方式**:单例类应该通过模块级别的实例变量导出,如: +- **应当**: 明确指定所有参数和返回值的类型。对于可能返回 `None` 的情况,使用 `Optional[...]`。 +- **示例**: ```python - # redis_manager.py - class RedisManager(Singleton): - ... - - redis_manager = RedisManager() # 创建并导出单例实例 + async def get_user_data(user_id: int) -> Optional[Dict[str, Any]]: + # ... ``` -#### 4. 重要注意事项 -- **避免循环导入**:单例类的导入应谨慎处理,避免循环依赖 -- **初始化时机**:单例在第一次导入时创建,确保所需依赖已就绪 -- **__init__ 调用语义**:虽然实例是单例,但 `__init__` 方法可能被多次调用(如重新导入时)。应添加额外检查确保一次性逻辑只执行一次。 -- **资源清理**:单例管理器应在程序退出时清理资源,实现 `cleanup()` 方法 +### 3. 文档字符串 (PEP 257) +**所有公开的模块、类、函数和方法都必须拥有文档字符串**。 -### 2.3 命名公约 -- **文件命名**:使用小写字母和下划线,例如 `my_plugin.py`。 -- **类命名**:使用 `PascalCase`,例如 `CommandManager`。 -- **函数/方法命名**:使用 `snake_case`,例如 `handle_message`。 -- **常量命名**:使用 `UPPER_SNAKE_CASE`,例如 `MAX_RETRY_COUNT`。 -- **变量命名**:使用 `snake_case`,具有描述性,避免单字母变量(循环变量除外)。 +- **格式**: 遵循 Google Python Style Guide 的文档字符串格式。它清晰、简洁且易于阅读。 +- **内容**: + - **模块/类**: 简要描述其职责和功能。 + - **函数/方法**: + - 一行总结其功能。 + - `Args:`: 描述每个参数的类型和含义。 + - `Returns:`: 描述返回值的类型和含义。 + - `Raises:`: (可选) 描述可能抛出的主要异常。 +- **示例**: + ```python + async def fetch_data(url: str, timeout: int = 10) -> str: + """Fetches content from a URL. -### 2.4 类型提示公约 -- **全面使用**:所有函数、方法、类属性都应提供类型提示。**这是强制要求**,因为框架开启了 Mypyc 编译。 -- **性能优化**:类型提示不仅帮助发现 Bug,还能让 Mypyc 生成更高效的机器码。 -- **返回类型**:明确指定返回类型,包括 `None`。 -- **复杂类型**:使用 `typing` 模块中的泛型,如 `List[str]`、`Dict[str, Any]`。 -- **可选参数**:使用 `Optional[...]` 或默认值 `= None`。 + Args: + url: The URL to fetch from. + timeout: The request timeout in seconds. -**示例**: -```python -# 好的写法 -async def handle(event: MessageEvent, args: list[str]) -> None: - ... - -# 不好写法(会导致编译警告) -async def handle(event, args): - ... -``` - -### 2.5 异常处理公约 -- **自定义异常**:使用框架提供的自定义异常类,避免抛出通用的 `Exception`。 -- **异常链**:保留原始异常信息,使用 `raise CustomError(...) from e`。 -- **资源清理**:使用 `try...finally` 或上下文管理器确保资源释放。 - -### 2.6 日志记录公约 -- **模块化日志**:每个模块使用 `ModuleLogger("ModuleName")` 创建专用日志记录器。 -- **日志级别**: - - `DEBUG`:调试信息,详细操作记录 - - `INFO`:常规操作记录 - - `WARNING`:预期内的异常或潜在问题 - - `ERROR`:操作失败但可恢复的错误 - - `CRITICAL`:系统级错误,需要立即关注 - -## 3. 注意事项(常见陷阱) - -### 3.1 异步编程陷阱 -- **忘记 await**:异步函数调用必须使用 `await`,否则任务不会执行。 -- **阻塞循环**:在异步函数中执行同步阻塞操作会冻结整个事件循环。 -- **任务泄漏**:创建的异步任务必须被妥善管理,避免内存泄漏。 - -### 3.2 资源管理陷阱 -- **连接泄漏**:未关闭的 HTTP 连接、数据库连接会导致资源耗尽。 -- **文件句柄泄漏**:打开的文件必须显式关闭或使用上下文管理器。 -- **缓存雪崩**:大量缓存同时过期可能导致系统负载激增。 - -### 3.3 性能陷阱 -- **N+1 查询**:避免在循环中执行数据库或 API 查询,使用批量操作。 -- **内存泄漏**:大型数据结构长时间驻留内存,应定期清理。 -- **重复计算**:相同的计算结果应缓存,避免重复计算。 - -### 3.4 安全性陷阱 -- **SQL 注入**:使用参数化查询或 ORM,禁止拼接 SQL 字符串。 -- **XSS 攻击**:渲染用户输入时必须进行 HTML 转义。 -- **路径遍历**:用户提供的文件路径必须进行规范化验证。 - -## 4. 代码规范(详细指南) - -### 4.1 文档字符串规范(强制要求) - -**所有代码必须包含完整的文档字符串**,这是项目质量保证的基础。缺少文档字符串的代码将在审查中被拒绝。 - -- **模块级文档**:每个模块顶部应有文档字符串,描述模块功能和主要接口。 -- **类级文档**:每个类应有文档字符串,描述类的职责、使用方法和示例。 -- **函数/方法级文档**:每个公共函数和方法必须有文档字符串,包含参数说明、返回值和异常信息。 - -**参数注释要求**: -1. 每个参数都必须有类型提示和简要说明 -2. 返回值必须明确说明类型和含义 -3. 可能抛出的异常必须列出 -4. 复杂的函数应提供使用示例 - -**标准格式示例:** -```python -def process_data(data: List[str], timeout: int = 30) -> Dict[str, Any]: - """ - 处理数据并返回结果。 - - Args: - data: 待处理的数据列表 - timeout: 操作超时时间,单位秒 - - Returns: - 处理结果的字典,包含状态和详情 - - Raises: - TimeoutError: 处理超时时抛出 - ValueError: 数据格式错误时抛出 - - Example: - >>> result = process_data(["item1", "item2"]) - >>> print(result["status"]) - """ -``` - -### 4.2 函数设计规范 -- **单一职责**:每个函数只做一件事,保持功能简洁。 -- **参数数量**:函数参数不宜过多(建议 ≤5),过多时考虑使用 `dataclass` 或 `TypedDict`。 -- **默认参数**:避免使用可变对象作为默认参数,使用 `None` 代替。 - -### 4.3 类设计规范 -- **单一职责**:每个类应有明确的单一职责。 -- **组合优于继承**:优先使用组合而非继承来复用功能。 -- **属性访问控制**:使用 `@property` 装饰器控制属性访问,隐藏内部实现。 - -### 4.4 错误处理规范 -- **错误码统一**:使用框架定义的 `ErrorCode` 枚举,避免自定义魔法数字。 -- **错误响应格式**:使用 `exception_to_error_response` 生成统一错误响应。 -- **用户友好消息**:错误消息应同时包含技术细节(日志)和用户友好提示(界面)。 - -### 4.5 测试规范 -- **测试覆盖率**:核心功能应达到 80% 以上的测试覆盖率。 -- **异步测试**:使用 `pytest-asyncio` 进行异步测试。 -- **测试隔离**:测试用例之间应相互独立,避免依赖执行顺序。 - -## 5. 提交与协作规范 - -### 5.1 Git 提交规范 -- **提交信息格式**:遵循 Conventional Commits 规范 + Returns: + The content of the response as a string. + + Raises: + asyncio.TimeoutError: If the request times out. + """ + # ... ``` - (): - - - -