diff --git a/README.md b/README.md index 359859e..05074fe 100644 --- a/README.md +++ b/README.md @@ -1 +1,135 @@ -1 bot for qqnt +# NEO Bot Framework + +这是一个基于 Python 的轻量级 OneBot 11 协议机器人框架,专为内部团队开发设计。它通过 WebSocket 连接到 OneBot 实现端(如 NapCatQQ),提供了事件分发、指令管理和 API 调用封装。 + +## ✨ 特性 + +* **OneBot 11 标准支持**:完整支持 OneBot 11 的消息、通知、请求和元事件。 +* **类型安全**:基于 `dataclasses` 的强类型事件模型,开发体验更佳。 +* **插件系统**:轻量级的装饰器风格插件系统,支持指令 (`@matcher.command`) 和事件监听 (`@matcher.on_notice`, `@matcher.on_request`)。 +* **异步核心**:基于 `asyncio` 和 `websockets` 的高性能异步核心。 +* **自动重连**:内置 WebSocket 断线重连机制。 + +## 📂 项目结构 + +``` +NEO/ +├── base_plugins/ # 基础插件目录,新建插件文件即可自动加载 +│ └── echo.py # 示例插件:实现 /echo 指令 +├── core/ # 核心框架代码 +│ ├── bot.py # Bot API 封装,提供 send_group_msg 等方法 +│ ├── command_manager.py # 命令与事件分发器 +│ ├── config_loader.py # 配置加载器 +│ └── ws.py # WebSocket 客户端核心 +├── models/ # 数据模型 +│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta) +│ ├── message.py # 消息段定义 (MessageSegment) +│ └── sender.py # 发送者定义 (Sender) +├── config.toml # 配置文件 +├── main.py # 启动入口 +└── requirements.txt # 项目依赖 +``` + +## 🚀 快速开始 + +### 1. 环境准备 + +* Python 3.8+ +* OneBot 11 实现端(推荐 [NapCatQQ](https://github.com/NapNeko/NapCatQQ) 或 LLOneBot) + +### 2. 安装依赖 + +```bash +pip install -r requirements.txt +``` + +### 3. 配置文件 + +修改根目录下的 `config.toml`,配置 WebSocket 连接信息: + +```toml +[napcat_ws] +uri = "ws://127.0.0.1:30004" # OneBot 实现端的 WebSocket 地址 +token = "your_token" # Access Token (如果有) +reconnect_interval = 5 # 断线重连间隔(秒) + +[bot] +command = ["/"] # 指令前缀,支持多个,如 ["/", "#"] +``` + +### 4. 运行 + +```bash +python main.py +``` + +## 🛠️ 开发指南 + +### 创建新插件 + +在 `base_plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。 + +### 示例代码 + +#### 1. 注册消息指令 + +使用 `@matcher.command("指令名")` 注册指令。 + +```python +from core.command_manager import matcher +from core.bot import Bot +from models import MessageEvent + +# 注册 /hello 指令 +@matcher.command("hello") +async def handle_hello(bot: Bot, event: MessageEvent, args: list[str]): + # args 是去除指令后的参数列表 + await event.reply("你好!这里是 NEO Bot。") +``` + +#### 2. 监听通知事件 + +使用 `@matcher.on_notice("通知类型")` 监听通知。 + +```python +from core.command_manager import matcher +from core.bot import Bot +from models import GroupIncreaseNoticeEvent + +# 监听群成员增加事件 +@matcher.on_notice("group_increase") +async def welcome_new_member(bot: Bot, event: GroupIncreaseNoticeEvent): + await bot.send_group_msg(event.group_id, f"欢迎新成员 {event.user_id} 加入!") +``` + +#### 3. 监听请求事件 + +使用 `@matcher.on_request("请求类型")` 监听请求。 + +```python +from core.command_manager import matcher +from core.bot import Bot +from models import FriendRequestEvent + +# 自动同意好友请求 +@matcher.on_request("friend") +async def auto_approve_friend(bot: Bot, event: FriendRequestEvent): + await bot.call_api("set_friend_add_request", { + "flag": event.flag, + "approve": True + }) +``` + +## 📚 事件模型说明 + +项目采用了基于工厂模式的事件处理系统,所有事件定义在 `models/events/` 下: + +* **MessageEvent**: 消息事件,包含 `PrivateMessageEvent` 和 `GroupMessageEvent`。支持 `await event.reply()` 快速回复。 +* **NoticeEvent**: 通知事件,如 `FriendAddNoticeEvent`, `GroupRecallNoticeEvent` 等。 +* **RequestEvent**: 请求事件,如 `FriendRequestEvent`, `GroupRequestEvent`。 +* **MetaEvent**: 元事件,如心跳 `HeartbeatEvent`。 + +所有事件均继承自 `OneBotEvent`,并包含 `bot` 属性用于调用 API。 + +--- +*Internal Use Only - DOGSOHA ond baby2016* diff --git a/base_plugins/echo.py b/base_plugins/echo.py index 87d4005..c38aada 100644 --- a/base_plugins/echo.py +++ b/base_plugins/echo.py @@ -5,11 +5,11 @@ Echo 插件 """ from core.command_manager import matcher from core.bot import Bot -from models.event import Event +from models import MessageEvent @matcher.command("echo") -async def handle_echo(bot: Bot, event: Event, args: list[str]): +async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]): """ 处理 echo 指令,原样回复用户输入的内容 diff --git a/core/bot.py b/core/bot.py index 6420d76..bf97b93 100644 --- a/core/bot.py +++ b/core/bot.py @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: from .ws import WS - from ..models.event import Event + from models import OneBotEvent class Bot: @@ -33,40 +33,53 @@ class Bot: """ return await self.ws.call_api(action, params) - async def send_group_msg(self, group_id: int, message: str) -> dict: + async def send_group_msg(self, group_id: int, message: str, auto_escape: bool = False) -> dict: """ 发送群消息 :param group_id: 群号 :param message: 消息内容 + :param auto_escape: 是否自动转义 :return: API 响应结果 """ return await self.call_api( - "send_group_msg", {"group_id": group_id, "message": message} + "send_group_msg", {"group_id": group_id, "message": message, "auto_escape": auto_escape} ) - async def send_private_msg(self, user_id: int, message: str) -> dict: + async def send_private_msg(self, user_id: int, message: str, auto_escape: bool = False) -> dict: """ 发送私聊消息 :param user_id: 用户 QQ 号 :param message: 消息内容 + :param auto_escape: 是否自动转义 :return: API 响应结果 """ return await self.call_api( - "send_private_msg", {"user_id": user_id, "message": message} + "send_private_msg", {"user_id": user_id, "message": message, "auto_escape": auto_escape} ) - async def send(self, event: "Event", message: str) -> dict: + async def send(self, event: "OneBotEvent", message: str, auto_escape: bool = False) -> dict: """ 智能发送消息,根据事件类型自动选择发送方式 :param event: 触发事件对象 :param message: 消息内容 + :param auto_escape: 是否自动转义 :return: API 响应结果 """ - if event.message_type == "group" and event.group_id: - return await self.send_group_msg(event.group_id, message) - elif event.user_id: - return await self.send_private_msg(event.user_id, message) + # 如果是消息事件,直接调用 reply + if hasattr(event, "reply"): + await event.reply(message, auto_escape) + return {"status": "ok", "msg": "Replied via event.reply()"} + + # 尝试从事件中获取 user_id 或 group_id + user_id = getattr(event, "user_id", None) + group_id = getattr(event, "group_id", None) + + if group_id: + return await self.send_group_msg(group_id, message, auto_escape) + elif user_id: + return await self.send_private_msg(user_id, message, auto_escape) + return {"status": "failed", "msg": "Unknown message target"} diff --git a/core/command_manager.py b/core/command_manager.py index 390c51c..19a035d 100644 --- a/core/command_manager.py +++ b/core/command_manager.py @@ -73,6 +73,23 @@ class CommandManager: return decorator + # --- 统一事件分发入口 --- + async def handle_event(self, bot, event): + """ + 统一事件分发入口 + + :param bot: Bot 实例 + :param event: 事件对象 + """ + post_type = event.post_type + + if post_type == 'message': + await self.handle_message(bot, event) + elif post_type == 'notice': + await self.handle_notice(bot, event) + elif post_type == 'request': + await self.handle_request(bot, event) + # --- 消息分发逻辑 --- async def handle_message(self, bot, event): """ diff --git a/core/ws.py b/core/ws.py index 655fcc8..7a6206d 100644 --- a/core/ws.py +++ b/core/ws.py @@ -11,7 +11,7 @@ from datetime import datetime import websockets -from models import Event +from models import EventFactory from .bot import Bot from .command_manager import matcher @@ -76,6 +76,7 @@ class WS: data = json.loads(message) # 1. 处理 API 响应 + # 如果消息中包含 echo 字段,说明是 API 调用的响应 echo_id = data.get("echo") if echo_id and echo_id in self._pending_requests: future = self._pending_requests.pop(echo_id) @@ -84,12 +85,14 @@ class WS: continue # 2. 处理上报事件 + # 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件 if "post_type" in data: - # 使用 create_task 异步执行,避免阻塞 + # 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环 asyncio.create_task(self.on_event(data)) except Exception as e: print(f" 解析消息异常: {e}") + traceback.print_exc() async def on_event(self, raw_data: dict): """ @@ -98,40 +101,25 @@ class WS: :param raw_data: 原始事件数据字典 """ try: - # 解析为 Event 对象 - event = Event.from_dict(raw_data) - event.bot = self.bot + # 使用工厂创建事件对象 + event = EventFactory.create_event(raw_data) + event.bot = self.bot # 注入 Bot 实例 - # 格式化时间用于打印 + # 打印日志 t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S") - - # --- 分流处理 --- - - # A. 消息事件 (Message) if event.post_type == "message": - print( - f" [{t}] [消息] {event.message_type} | {event.user_id}: {event.raw_message}" - ) - await matcher.handle_message(self.bot, event) - - # B. 通知事件 (Notice) + sender_name = event.sender.nickname if event.sender else "Unknown" + print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}") elif event.post_type == "notice": - print( - f" [{t}] [通知] {event.notice_type} | 来自: {event.group_id or '私聊'}" - ) - await matcher.handle_notice(self.bot, event) - - # C. 请求事件 (Request) + print(f" [{t}] [通知] {event.notice_type}") elif event.post_type == "request": - print(f" [{t}] [请求] {event.request_type} | 内容: {event.comment}") - await matcher.handle_request(self.bot, event) + print(f" [{t}] [请求] {event.request_type}") - # D. 元事件 (Meta Event) - 通常用来心跳检测,可不处理 - elif event.post_type == "meta_event": - pass + # 分发事件 + await matcher.handle_event(self.bot, event) except Exception as e: - print(f"事件分发失败: {e}") + print(f" 事件处理异常: {e}") traceback.print_exc() async def call_api(self, action: str, params: dict = None): diff --git a/models/__init__.py b/models/__init__.py index 7317020..dbd36ec 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -1,3 +1,25 @@ -from .event import Event, MessageSegment +from .message import MessageSegment +from .sender import Sender +from .events.base import OneBotEvent +from .events.message import MessageEvent, PrivateMessageEvent, GroupMessageEvent +from .events.notice import NoticeEvent +from .events.request import RequestEvent +from .events.meta import MetaEvent +from .events.factory import EventFactory -__all__ = ["Event", "MessageSegment", "Sender"] +# Alias for backward compatibility +Event = OneBotEvent + +__all__ = [ + "MessageSegment", + "Sender", + "OneBotEvent", + "Event", + "MessageEvent", + "PrivateMessageEvent", + "GroupMessageEvent", + "NoticeEvent", + "RequestEvent", + "MetaEvent", + "EventFactory", +] diff --git a/models/base.py b/models/base.py deleted file mode 100644 index d28f641..0000000 --- a/models/base.py +++ /dev/null @@ -1,11 +0,0 @@ -""" -基础数据模型模块 -""" - - -class BaseModel: - """ - 基础模型类 - """ - - pass diff --git a/models/event.py b/models/event.py deleted file mode 100644 index b48b148..0000000 --- a/models/event.py +++ /dev/null @@ -1,191 +0,0 @@ -""" -事件模型模块 - -定义了 Event 类和 MessageSegment 类,用于封装 OneBot 11 的上报事件和消息段。 -""" -from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, TYPE_CHECKING - -from .sender import Sender - -if TYPE_CHECKING: - from core.bot import Bot - - -@dataclass -class MessageSegment: - """ - 消息段,对应 OneBot 11 标准中的消息段对象 - """ - - type: str - """消息段类型,如 text, image, at 等""" - - data: Dict[str, Any] - """消息段数据""" - - @property - def text(self) -> str: - """ - 获取文本内容(仅当 type 为 text 时有效) - - :return: 文本内容 - """ - return self.data.get("text", "") if self.type == "text" else "" - - @property - def image_url(self) -> str: - """ - 获取图片 URL(仅当 type 为 image 时有效) - - :return: 图片 URL - """ - return self.data.get("url", "") if self.type == "image" else "" - - def is_at(self, user_id: int = None) -> bool: - """ - 判断是否为 @某人 - - :param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型 - :return: 是否匹配 - """ - if self.type != "at": - return False - if user_id is None: - return True - return str(self.data.get("qq")) == str(user_id) - - def __repr__(self): - return f"[MS:{self.type}:{self.data}]" - - -@dataclass -class Event: - """ - 事件类,封装了 OneBot 11 的上报事件 - """ - - post_type: str - """上报类型: message, notice, request, meta_event""" - - self_id: int - """收到消息的机器人 QQ 号""" - - time: int - """事件发生的时间戳""" - - # --- 消息事件字段 --- - message_type: Optional[str] = None - """消息类型: group, private""" - - sub_type: Optional[str] = None - """消息子类型""" - - message_id: Optional[int] = None - """消息 ID""" - - user_id: Optional[int] = None - """发送者 QQ 号""" - - raw_message: Optional[str] = None - """原始消息内容""" - - message: List[MessageSegment] = field(default_factory=list) - """消息内容列表""" - - sender: Optional[Sender] = None - """发送者信息""" - - group_id: Optional[int] = None - """群号""" - - target_id: Optional[int] = None - """目标 QQ 号""" - - # --- 通知事件字段 --- - notice_type: Optional[str] = None - """通知类型""" - - operator_id: Optional[int] = None - """操作者 QQ 号""" - - duration: Optional[int] = None - """时长""" - - honor_type: Optional[str] = None - """荣誉类型""" - - # --- 请求事件字段 --- - request_type: Optional[str] = None - """请求类型""" - - flag: Optional[str] = None - """请求 flag""" - - comment: Optional[str] = None - """验证信息""" - - # 注入的 Bot 实例 - bot: Optional["Bot"] = field(default=None, init=False) - """关联的 Bot 实例""" - - async def reply(self, message: str) -> dict: - """ - 快捷回复消息 - - :param message: 回复内容 - :return: API 响应结果 - """ - if not self.bot: - return {"status": "failed", "msg": "Bot instance not attached to event"} - return await self.bot.send(self, message) - - @classmethod - def from_dict(cls, data: dict): - """ - 从字典创建 Event 对象 - - :param data: 原始事件数据字典 - :return: Event 对象 - """ - raw_msg_array = data.get("message") - segments = [] - if isinstance(raw_msg_array, list): - segments = [ - MessageSegment(type=seg["type"], data=seg["data"]) - for seg in raw_msg_array - ] - - sender_data = data.get("sender") - sender_obj = None - if isinstance(sender_data, dict): - sender_obj = Sender( - **{k: v for k, v in sender_data.items() if k in Sender.__annotations__} - ) - - # 数据整合 - processed_data = data.copy() - processed_data["message"] = segments - processed_data["sender"] = sender_obj - - # 字段过滤:只提取 dataclass 中定义的字段 - valid_data = { - k: v for k, v in processed_data.items() if k in cls.__annotations__ - } - return cls(**valid_data) - - # --- 快捷判断工具 --- - @property - def is_message(self) -> bool: - """是否为消息事件""" - return self.post_type == "message" - - @property - def is_notice(self) -> bool: - """是否为通知事件""" - return self.post_type == "notice" - - @property - def is_request(self) -> bool: - """是否为请求事件""" - return self.post_type == "request" diff --git a/models/events/base.py b/models/events/base.py new file mode 100644 index 0000000..5bbe8d6 --- /dev/null +++ b/models/events/base.py @@ -0,0 +1,68 @@ +""" +基础事件模型模块 + +定义了所有 OneBot 11 事件的基类和事件类型枚举。 +""" +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Optional +from abc import ABC, abstractmethod + +if TYPE_CHECKING: + from core.bot import Bot + + +class EventType: + """ + 事件类型枚举 + """ + META = 'meta_event' # 元事件 + REQUEST = 'request' # 请求事件 + NOTICE = 'notice' # 通知事件 + MESSAGE = 'message' # 消息事件 + MESSAGE_SENT = 'message_sent' # 消息发送事件 + + +@dataclass +class OneBotEvent(ABC): + """ + OneBot 事件基类 + 所有具体的事件类型都应该继承自此类 + """ + + time: int + """事件发生的时间戳""" + + self_id: int + """收到事件的机器人 QQ 号""" + + _bot: Optional["Bot"] = field(default=None, init=False) + """Bot 实例引用,用于快捷调用 API""" + + @property + @abstractmethod + def post_type(self) -> str: + """ + 上报类型 + """ + pass + + @property + def bot(self) -> "Bot": + """ + 获取 Bot 实例 + + :return: Bot 实例 + :raises ValueError: 如果 Bot 实例未设置 + """ + if self._bot is None: + raise ValueError("Bot instance not set for this event") + return self._bot + + @bot.setter + def bot(self, value: "Bot"): + """ + 设置 Bot 实例 + + :param value: Bot 实例 + """ + self._bot = value diff --git a/models/events/factory.py b/models/events/factory.py new file mode 100644 index 0000000..36b8980 --- /dev/null +++ b/models/events/factory.py @@ -0,0 +1,275 @@ +""" +事件工厂模块 + +用于根据 JSON 数据创建对应的事件对象。 +""" +from typing import Any, Dict + +from models.message import MessageSegment +from models.sender import Sender +from .base import OneBotEvent, EventType +from .message import GroupMessageEvent, PrivateMessageEvent, Anonymous +from .notice import ( + NoticeEvent, FriendAddNoticeEvent, FriendRecallNoticeEvent, + GroupRecallNoticeEvent, GroupIncreaseNoticeEvent, + GroupDecreaseNoticeEvent, GroupAdminNoticeEvent, GroupBanNoticeEvent, + GroupUploadNoticeEvent, GroupUploadFile +) +from .request import RequestEvent, FriendRequestEvent, GroupRequestEvent +from .meta import MetaEvent, HeartbeatEvent, LifeCycleEvent, HeartbeatStatus + + +class EventFactory: + """ + 事件工厂类 + """ + + @staticmethod + def create_event(data: Dict[str, Any]) -> OneBotEvent: + """ + 根据数据创建事件对象 + + :param data: 事件数据字典 + :return: 对应的事件对象 + :raises ValueError: 如果事件类型未知 + """ + post_type = data.get("post_type") + + # 提取公共字段 + common_args = { + "time": data.get("time", 0), + "self_id": data.get("self_id", 0), + } + + if post_type == EventType.MESSAGE or post_type == EventType.MESSAGE_SENT: + return EventFactory._create_message_event(data, common_args) + elif post_type == EventType.NOTICE: + return EventFactory._create_notice_event(data, common_args) + elif post_type == EventType.REQUEST: + return EventFactory._create_request_event(data, common_args) + elif post_type == EventType.META: + return EventFactory._create_meta_event(data, common_args) + else: + # 未知类型的事件,抛出异常 + raise ValueError(f"Unknown event type: {post_type}") + + @staticmethod + def _create_message_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent: + """ + 创建消息事件 + + :param data: 事件数据 + :param common_args: 公共参数 + :return: 消息事件对象 + """ + message_type = data.get("message_type") + + # 解析消息段 + message_list = [] + raw_message_list = data.get("message", []) + if isinstance(raw_message_list, list): + for item in raw_message_list: + if isinstance(item, dict): + message_list.append(MessageSegment(type=item.get("type", ""), data=item.get("data", {}))) + + # 解析发送者 + sender_data = data.get("sender", {}) + sender = Sender( + user_id=sender_data.get("user_id", 0), + nickname=sender_data.get("nickname", ""), + sex=sender_data.get("sex", "unknown"), + age=sender_data.get("age", 0), + card=sender_data.get("card"), + area=sender_data.get("area"), + level=sender_data.get("level"), + role=sender_data.get("role"), + title=sender_data.get("title"), + ) + + msg_args = { + **common_args, + "message_type": message_type, + "sub_type": data.get("sub_type", ""), + "message_id": data.get("message_id", 0), + "user_id": data.get("user_id", 0), + "message": message_list, + "raw_message": data.get("raw_message", ""), + "font": data.get("font", 0), + "sender": sender, + } + + if message_type == "private": + return PrivateMessageEvent(**msg_args) + elif message_type == "group": + anonymous_data = data.get("anonymous") + anonymous = None + if anonymous_data: + anonymous = Anonymous( + id=anonymous_data.get("id", 0), + name=anonymous_data.get("name", ""), + flag=anonymous_data.get("flag", "") + ) + return GroupMessageEvent( + **msg_args, + group_id=data.get("group_id", 0), + anonymous=anonymous, + ) + else: + # 未知消息类型,抛出异常 + raise ValueError(f"Unknown message type: {message_type}") + + @staticmethod + def _create_notice_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent: + """ + 创建通知事件 + + :param data: 事件数据 + :param common_args: 公共参数 + :return: 通知事件对象 + """ + notice_type = data.get("notice_type", "") + + # 好友相关通知 + if notice_type == "friend_add": + return FriendAddNoticeEvent( + **common_args, + notice_type=notice_type, + user_id=data.get("user_id", 0) + ) + elif notice_type == "friend_recall": + return FriendRecallNoticeEvent( + **common_args, + notice_type=notice_type, + user_id=data.get("user_id", 0), + message_id=data.get("message_id", 0) + ) + # 群相关通知 + elif notice_type == "group_recall": + return GroupRecallNoticeEvent( + **common_args, + notice_type=notice_type, + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + operator_id=data.get("operator_id", 0), + message_id=data.get("message_id", 0) + ) + elif notice_type == "group_increase": + return GroupIncreaseNoticeEvent( + **common_args, + notice_type=notice_type, + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + operator_id=data.get("operator_id", 0), + sub_type=data.get("sub_type", "") + ) + elif notice_type == "group_decrease": + return GroupDecreaseNoticeEvent( + **common_args, + notice_type=notice_type, + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + operator_id=data.get("operator_id", 0), + sub_type=data.get("sub_type", "") + ) + elif notice_type == "group_admin": + return GroupAdminNoticeEvent( + **common_args, + notice_type=notice_type, + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + sub_type=data.get("sub_type", "") + ) + elif notice_type == "group_ban": + return GroupBanNoticeEvent( + **common_args, + notice_type=notice_type, + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + operator_id=data.get("operator_id", 0), + duration=data.get("duration", 0), + sub_type=data.get("sub_type", "") + ) + elif notice_type == "group_upload": + file_data = data.get("file", {}) + file = GroupUploadFile( + id=file_data.get("id", ""), + name=file_data.get("name", ""), + size=file_data.get("size", 0), + busid=file_data.get("busid", 0) + ) + return GroupUploadNoticeEvent( + **common_args, + notice_type=notice_type, + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + file=file + ) + else: + # 未知通知类型,返回基础通知事件 + return NoticeEvent(**common_args, notice_type=notice_type) + + @staticmethod + def _create_request_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent: + """ + 创建请求事件 + + :param data: 事件数据 + :param common_args: 公共参数 + :return: 请求事件对象 + """ + request_type = data.get("request_type", "") + + if request_type == "friend": + return FriendRequestEvent( + **common_args, + request_type=request_type, + user_id=data.get("user_id", 0), + comment=data.get("comment", ""), + flag=data.get("flag", "") + ) + elif request_type == "group": + return GroupRequestEvent( + **common_args, + request_type=request_type, + sub_type=data.get("sub_type", ""), + group_id=data.get("group_id", 0), + user_id=data.get("user_id", 0), + comment=data.get("comment", ""), + flag=data.get("flag", "") + ) + else: + # 未知请求类型,返回基础请求事件 + return RequestEvent(**common_args, request_type=request_type) + + @staticmethod + def _create_meta_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent: + """ + 创建元事件 + + :param data: 事件数据 + :param common_args: 公共参数 + :return: 元事件对象 + """ + meta_event_type = data.get("meta_event_type", "") + + if meta_event_type == "heartbeat": + status_data = data.get("status", {}) + status = HeartbeatStatus( + online=status_data.get("online"), + good=status_data.get("good", True) + ) + return HeartbeatEvent( + **common_args, + meta_event_type=meta_event_type, + status=status, + interval=data.get("interval", 0) + ) + elif meta_event_type == "lifecycle": + return LifeCycleEvent( + **common_args, + meta_event_type=meta_event_type, + sub_type=data.get("sub_type", "") + ) + else: + # 未知元事件类型,返回基础元事件 + return MetaEvent(**common_args, meta_event_type=meta_event_type) diff --git a/models/events/message.py b/models/events/message.py new file mode 100644 index 0000000..e2e0bf1 --- /dev/null +++ b/models/events/message.py @@ -0,0 +1,116 @@ +""" +消息事件模型模块 + +定义了消息相关的事件类,包括 MessageEvent, PrivateMessageEvent, GroupMessageEvent。 +""" +from dataclasses import dataclass, field +from typing import List, Optional + +from models.message import MessageSegment +from models.sender import Sender +from .base import OneBotEvent, EventType + + +@dataclass +class Anonymous: + """ + 匿名信息 + """ + id: int = 0 + """匿名用户 ID""" + + name: str = "" + """匿名用户名称""" + + flag: str = "" + """匿名用户 flag""" + + +@dataclass +class MessageEvent(OneBotEvent): + """ + 消息事件基类 + """ + + message_type: str + """消息类型: private (私聊), group (群聊)""" + + sub_type: str + """ + 消息子类型 + 如果是私聊消息,可能是 friend, group, other, normal, anonymous, notice + 如果是群聊消息,可能是 normal, anonymous, notice + """ + + message_id: int + """消息 ID""" + + user_id: int + """发送者 QQ 号""" + + message: List[MessageSegment] = field(default_factory=list) + """消息内容列表""" + + raw_message: str = "" + """原始消息内容""" + + font: int = 0 + """字体""" + + sender: Optional[Sender] = None + """发送者信息""" + + @property + def post_type(self) -> str: + return EventType.MESSAGE + + async def reply(self, message: str, auto_escape: bool = False): + """ + 回复消息(抽象方法,由子类实现) + + :param message: 回复内容 + :param auto_escape: 是否自动转义 + """ + raise NotImplementedError("reply method must be implemented by subclasses") + + +@dataclass +class PrivateMessageEvent(MessageEvent): + """ + 私聊消息事件 + """ + + async def reply(self, message: str, auto_escape: bool = False): + """ + 回复私聊消息 + + :param message: 回复内容 + :param auto_escape: 是否自动转义 + """ + await self.bot.send_private_msg( + user_id=self.user_id, message=message, auto_escape=auto_escape + ) + + +@dataclass +class GroupMessageEvent(MessageEvent): + """ + 群聊消息事件 + """ + + group_id: int = 0 + """群号""" + + anonymous: Optional[Anonymous] = None + """匿名信息""" + + async def reply(self, message: str, auto_escape: bool = False): + """ + 回复群聊消息 + + :param message: 回复内容 + :param auto_escape: 是否自动转义 + """ + await self.bot.send_group_msg( + group_id=self.group_id, message=message, auto_escape=auto_escape + ) diff --git a/models/events/meta.py b/models/events/meta.py new file mode 100644 index 0000000..91b44d8 --- /dev/null +++ b/models/events/meta.py @@ -0,0 +1,67 @@ +""" +元事件模型模块 + +定义了元事件相关的事件类,包括心跳事件和生命周期事件。 +""" +from dataclasses import dataclass, field +from typing import Optional +from .base import OneBotEvent, EventType + + +@dataclass +class HeartbeatStatus: + """ + 心跳状态接口 + """ + online: Optional[bool] = None # 是否在线 + good: bool = True # 状态是否良好 + + +class LifeCycleSubType: + """ + 生命周期子类型枚举 + """ + ENABLE = 'enable' # 启用 + DISABLE = 'disable' # 禁用 + CONNECT = 'connect' # 连接 + + +@dataclass +class MetaEvent(OneBotEvent): + """ + 元事件基类 + """ + + meta_event_type: str + """元事件类型""" + + @property + def post_type(self) -> str: + return EventType.META + + +@dataclass +class HeartbeatEvent(MetaEvent): + """ + 心跳事件,用于确认连接状态 + """ + meta_event_type: str = 'heartbeat' + """元事件类型:心跳事件""" + + status: HeartbeatStatus = field(default_factory=HeartbeatStatus) + """状态信息""" + + interval: int = 0 + """心跳间隔时间(ms)""" + + +@dataclass +class LifeCycleEvent(MetaEvent): + """ + 生命周期事件,用于通知框架生命周期变化 + """ + meta_event_type: str = 'lifecycle' + """元事件类型:生命周期事件""" + + sub_type: LifeCycleSubType = LifeCycleSubType.ENABLE + """子类型:启用、禁用、连接""" diff --git a/models/events/notice.py b/models/events/notice.py new file mode 100644 index 0000000..443382b --- /dev/null +++ b/models/events/notice.py @@ -0,0 +1,159 @@ +""" +通知事件模型模块 + +定义了通知相关的事件类,包括好友通知和群组通知等。 +""" +from dataclasses import dataclass, field +from .base import OneBotEvent, EventType + + +@dataclass +class NoticeEvent(OneBotEvent): + """ + 通知事件基类 + """ + + notice_type: str + """通知类型""" + + @property + def post_type(self) -> str: + return EventType.NOTICE + + +@dataclass +class FriendAddNoticeEvent(NoticeEvent): + """ + 好友添加通知 + """ + user_id: int = 0 + """新好友 QQ 号""" + + +@dataclass +class FriendRecallNoticeEvent(NoticeEvent): + """ + 好友消息撤回通知 + """ + user_id: int = 0 + """消息发送者 QQ 号""" + + message_id: int = 0 + """被撤回的消息 ID""" + + +@dataclass +class GroupNoticeEvent(NoticeEvent): + """ + 群组通知事件基类 + """ + group_id: int = 0 + """群号""" + + user_id: int = 0 + """用户 QQ 号""" + + +@dataclass +class GroupRecallNoticeEvent(GroupNoticeEvent): + """ + 群消息撤回通知 + """ + operator_id: int = 0 + """操作者 QQ 号""" + + message_id: int = 0 + """被撤回的消息 ID""" + + +@dataclass +class GroupIncreaseNoticeEvent(GroupNoticeEvent): + """ + 群成员增加通知 + """ + operator_id: int = 0 + """操作者 QQ 号""" + + sub_type: str = "" + """ + 子类型 + approve: 管理员同意入群 + invite: 管理员邀请入群 + """ + + +@dataclass +class GroupDecreaseNoticeEvent(GroupNoticeEvent): + """ + 群成员减少通知 + """ + operator_id: int = 0 + """操作者 QQ 号(如果是主动退群,则和 user_id 相同)""" + + sub_type: str = "" + """ + 子类型 + leave: 主动退群 + kick: 成员被踢 + kick_me: 登录号被踢 + disband: 群被解散 + """ + + +@dataclass +class GroupAdminNoticeEvent(GroupNoticeEvent): + """ + 群管理员变动通知 + """ + sub_type: str = "" + """ + 子类型 + set: 设置管理员 + unset: 取消管理员 + """ + + +@dataclass +class GroupBanNoticeEvent(GroupNoticeEvent): + """ + 群禁言通知 + """ + operator_id: int = 0 + """操作者 QQ 号(管理员)""" + + duration: int = 0 + """禁言时长(秒),0 表示解除禁言""" + + sub_type: str = "" + """ + 子类型 + ban: 禁言 + lift_ban: 解除禁言 + """ + + +@dataclass +class GroupUploadFile: + """ + 群文件信息 + """ + id: str = "" + """文件 ID""" + + name: str = "" + """文件名""" + + size: int = 0 + """文件大小(Byte)""" + + busid: int = 0 + """文件总线 ID""" + + +@dataclass +class GroupUploadNoticeEvent(GroupNoticeEvent): + """ + 群文件上传通知 + """ + file: GroupUploadFile = field(default_factory=GroupUploadFile) + """文件信息""" diff --git a/models/events/request.py b/models/events/request.py new file mode 100644 index 0000000..87930b6 --- /dev/null +++ b/models/events/request.py @@ -0,0 +1,61 @@ +""" +请求事件模型模块 + +定义了请求相关的事件类。 +""" +from dataclasses import dataclass +from .base import OneBotEvent, EventType + + +@dataclass +class RequestEvent(OneBotEvent): + """ + 请求事件基类 + """ + + request_type: str + """请求类型""" + + @property + def post_type(self) -> str: + return EventType.REQUEST + + +@dataclass +class FriendRequestEvent(RequestEvent): + """ + 加好友请求事件 + """ + user_id: int = 0 + """发送请求的 QQ 号""" + + comment: str = "" + """验证信息""" + + flag: str = "" + """请求 flag,在调用处理请求的 API 时需要传入此 flag""" + + +@dataclass +class GroupRequestEvent(RequestEvent): + """ + 加群请求/邀请事件 + """ + sub_type: str = "" + """ + 子类型 + add: 加群请求 + invite: 邀请登录号入群 + """ + + group_id: int = 0 + """群号""" + + user_id: int = 0 + """发送请求的 QQ 号""" + + comment: str = "" + """验证信息""" + + flag: str = "" + """请求 flag,在调用处理请求的 API 时需要传入此 flag""" diff --git a/models/message.py b/models/message.py new file mode 100644 index 0000000..cabbe67 --- /dev/null +++ b/models/message.py @@ -0,0 +1,54 @@ +""" +消息段模型模块 + +定义了 MessageSegment 类,用于封装 OneBot 11 的消息段。 +""" +from dataclasses import dataclass +from typing import Any, Dict + + +@dataclass +class MessageSegment: + """ + 消息段,对应 OneBot 11 标准中的消息段对象 + """ + + type: str + """消息段类型,如 text, image, at 等""" + + data: Dict[str, Any] + """消息段数据""" + + @property + def text(self) -> str: + """ + 获取文本内容(仅当 type 为 text 时有效) + + :return: 文本内容 + """ + return self.data.get("text", "") if self.type == "text" else "" + + @property + def image_url(self) -> str: + """ + 获取图片 URL(仅当 type 为 image 时有效) + + :return: 图片 URL + """ + return self.data.get("url", "") if self.type == "image" else "" + + def is_at(self, user_id: int = None) -> bool: + """ + 判断是否为 @某人 + + :param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型 + :return: 是否匹配 + """ + if self.type != "at": + return False + if user_id is None: + return True + return str(self.data.get("qq")) == str(user_id) + + def __repr__(self): + return f"[MS:{self.type}:{self.data}]"