事件工厂111

This commit is contained in:
2026-01-01 18:43:14 +08:00
parent 046dd0860f
commit 3ba15d38f9
15 changed files with 1017 additions and 245 deletions

136
README.md
View File

@@ -1 +1,135 @@
1 bot for qqnt # NEO Bot Framework
这是一个基于 Python 的轻量级 OneBot 11 协议机器人框架,专为内部团队开发设计。它通过 WebSocket 连接到 OneBot 实现端(如 NapCatQQ提供了事件分发、指令管理和 API 调用封装。
## ✨ 特性
* **OneBot 11 标准支持**:完整支持 OneBot 11 的消息、通知、请求和元事件。
* **类型安全**:基于 `dataclasses` 的强类型事件模型,开发体验更佳。
* **插件系统**:轻量级的装饰器风格插件系统,支持指令 (`@matcher.command`) 和事件监听 (`@matcher.on_notice`, `@matcher.on_request`)。
* **异步核心**:基于 `asyncio``websockets` 的高性能异步核心。
* **自动重连**:内置 WebSocket 断线重连机制。
## 📂 项目结构
```
NEO/
├── base_plugins/ # 基础插件目录,新建插件文件即可自动加载
│ └── echo.py # 示例插件:实现 /echo 指令
├── core/ # 核心框架代码
│ ├── bot.py # Bot API 封装,提供 send_group_msg 等方法
│ ├── command_manager.py # 命令与事件分发器
│ ├── config_loader.py # 配置加载器
│ └── ws.py # WebSocket 客户端核心
├── models/ # 数据模型
│ ├── events/ # OneBot 事件定义 (Message, Notice, Request, Meta)
│ ├── message.py # 消息段定义 (MessageSegment)
│ └── sender.py # 发送者定义 (Sender)
├── config.toml # 配置文件
├── main.py # 启动入口
└── requirements.txt # 项目依赖
```
## 🚀 快速开始
### 1. 环境准备
* Python 3.8+
* OneBot 11 实现端(推荐 [NapCatQQ](https://github.com/NapNeko/NapCatQQ) 或 LLOneBot
### 2. 安装依赖
```bash
pip install -r requirements.txt
```
### 3. 配置文件
修改根目录下的 `config.toml`,配置 WebSocket 连接信息:
```toml
[napcat_ws]
uri = "ws://127.0.0.1:30004" # OneBot 实现端的 WebSocket 地址
token = "your_token" # Access Token (如果有)
reconnect_interval = 5 # 断线重连间隔(秒)
[bot]
command = ["/"] # 指令前缀,支持多个,如 ["/", "#"]
```
### 4. 运行
```bash
python main.py
```
## 🛠️ 开发指南
### 创建新插件
`base_plugins` 目录下创建一个新的 `.py` 文件(例如 `my_plugin.py`),框架会自动加载它。
### 示例代码
#### 1. 注册消息指令
使用 `@matcher.command("指令名")` 注册指令。
```python
from core.command_manager import matcher
from core.bot import Bot
from models import MessageEvent
# 注册 /hello 指令
@matcher.command("hello")
async def handle_hello(bot: Bot, event: MessageEvent, args: list[str]):
# args 是去除指令后的参数列表
await event.reply("你好!这里是 NEO Bot。")
```
#### 2. 监听通知事件
使用 `@matcher.on_notice("通知类型")` 监听通知。
```python
from core.command_manager import matcher
from core.bot import Bot
from models import GroupIncreaseNoticeEvent
# 监听群成员增加事件
@matcher.on_notice("group_increase")
async def welcome_new_member(bot: Bot, event: GroupIncreaseNoticeEvent):
await bot.send_group_msg(event.group_id, f"欢迎新成员 {event.user_id} 加入!")
```
#### 3. 监听请求事件
使用 `@matcher.on_request("请求类型")` 监听请求。
```python
from core.command_manager import matcher
from core.bot import Bot
from models import FriendRequestEvent
# 自动同意好友请求
@matcher.on_request("friend")
async def auto_approve_friend(bot: Bot, event: FriendRequestEvent):
await bot.call_api("set_friend_add_request", {
"flag": event.flag,
"approve": True
})
```
## 📚 事件模型说明
项目采用了基于工厂模式的事件处理系统,所有事件定义在 `models/events/` 下:
* **MessageEvent**: 消息事件,包含 `PrivateMessageEvent``GroupMessageEvent`。支持 `await event.reply()` 快速回复。
* **NoticeEvent**: 通知事件,如 `FriendAddNoticeEvent`, `GroupRecallNoticeEvent` 等。
* **RequestEvent**: 请求事件,如 `FriendRequestEvent`, `GroupRequestEvent`
* **MetaEvent**: 元事件,如心跳 `HeartbeatEvent`
所有事件均继承自 `OneBotEvent`,并包含 `bot` 属性用于调用 API。
---
*Internal Use Only - DOGSOHA ond baby2016*

View File

@@ -5,11 +5,11 @@ Echo 插件
""" """
from core.command_manager import matcher from core.command_manager import matcher
from core.bot import Bot from core.bot import Bot
from models.event import Event from models import MessageEvent
@matcher.command("echo") @matcher.command("echo")
async def handle_echo(bot: Bot, event: Event, args: list[str]): async def handle_echo(bot: Bot, event: MessageEvent, args: list[str]):
""" """
处理 echo 指令,原样回复用户输入的内容 处理 echo 指令,原样回复用户输入的内容

View File

@@ -7,7 +7,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from .ws import WS from .ws import WS
from ..models.event import Event from models import OneBotEvent
class Bot: class Bot:
@@ -33,40 +33,53 @@ class Bot:
""" """
return await self.ws.call_api(action, params) return await self.ws.call_api(action, params)
async def send_group_msg(self, group_id: int, message: str) -> dict: async def send_group_msg(self, group_id: int, message: str, auto_escape: bool = False) -> dict:
""" """
发送群消息 发送群消息
:param group_id: 群号 :param group_id: 群号
:param message: 消息内容 :param message: 消息内容
:param auto_escape: 是否自动转义
:return: API 响应结果 :return: API 响应结果
""" """
return await self.call_api( return await self.call_api(
"send_group_msg", {"group_id": group_id, "message": message} "send_group_msg", {"group_id": group_id, "message": message, "auto_escape": auto_escape}
) )
async def send_private_msg(self, user_id: int, message: str) -> dict: async def send_private_msg(self, user_id: int, message: str, auto_escape: bool = False) -> dict:
""" """
发送私聊消息 发送私聊消息
:param user_id: 用户 QQ 号 :param user_id: 用户 QQ 号
:param message: 消息内容 :param message: 消息内容
:param auto_escape: 是否自动转义
:return: API 响应结果 :return: API 响应结果
""" """
return await self.call_api( return await self.call_api(
"send_private_msg", {"user_id": user_id, "message": message} "send_private_msg", {"user_id": user_id, "message": message, "auto_escape": auto_escape}
) )
async def send(self, event: "Event", message: str) -> dict: async def send(self, event: "OneBotEvent", message: str, auto_escape: bool = False) -> dict:
""" """
智能发送消息,根据事件类型自动选择发送方式 智能发送消息,根据事件类型自动选择发送方式
:param event: 触发事件对象 :param event: 触发事件对象
:param message: 消息内容 :param message: 消息内容
:param auto_escape: 是否自动转义
:return: API 响应结果 :return: API 响应结果
""" """
if event.message_type == "group" and event.group_id: # 如果是消息事件,直接调用 reply
return await self.send_group_msg(event.group_id, message) if hasattr(event, "reply"):
elif event.user_id: await event.reply(message, auto_escape)
return await self.send_private_msg(event.user_id, message) return {"status": "ok", "msg": "Replied via event.reply()"}
# 尝试从事件中获取 user_id 或 group_id
user_id = getattr(event, "user_id", None)
group_id = getattr(event, "group_id", None)
if group_id:
return await self.send_group_msg(group_id, message, auto_escape)
elif user_id:
return await self.send_private_msg(user_id, message, auto_escape)
return {"status": "failed", "msg": "Unknown message target"} return {"status": "failed", "msg": "Unknown message target"}

View File

@@ -73,6 +73,23 @@ class CommandManager:
return decorator return decorator
# --- 统一事件分发入口 ---
async def handle_event(self, bot, event):
"""
统一事件分发入口
:param bot: Bot 实例
:param event: 事件对象
"""
post_type = event.post_type
if post_type == 'message':
await self.handle_message(bot, event)
elif post_type == 'notice':
await self.handle_notice(bot, event)
elif post_type == 'request':
await self.handle_request(bot, event)
# --- 消息分发逻辑 --- # --- 消息分发逻辑 ---
async def handle_message(self, bot, event): async def handle_message(self, bot, event):
""" """

View File

@@ -11,7 +11,7 @@ from datetime import datetime
import websockets import websockets
from models import Event from models import EventFactory
from .bot import Bot from .bot import Bot
from .command_manager import matcher from .command_manager import matcher
@@ -76,6 +76,7 @@ class WS:
data = json.loads(message) data = json.loads(message)
# 1. 处理 API 响应 # 1. 处理 API 响应
# 如果消息中包含 echo 字段,说明是 API 调用的响应
echo_id = data.get("echo") echo_id = data.get("echo")
if echo_id and echo_id in self._pending_requests: if echo_id and echo_id in self._pending_requests:
future = self._pending_requests.pop(echo_id) future = self._pending_requests.pop(echo_id)
@@ -84,12 +85,14 @@ class WS:
continue continue
# 2. 处理上报事件 # 2. 处理上报事件
# 如果消息中包含 post_type 字段,说明是 OneBot 上报的事件
if "post_type" in data: if "post_type" in data:
# 使用 create_task 异步执行,避免阻塞 # 使用 create_task 异步执行,避免阻塞 WebSocket 接收循环
asyncio.create_task(self.on_event(data)) asyncio.create_task(self.on_event(data))
except Exception as e: except Exception as e:
print(f" 解析消息异常: {e}") print(f" 解析消息异常: {e}")
traceback.print_exc()
async def on_event(self, raw_data: dict): async def on_event(self, raw_data: dict):
""" """
@@ -98,40 +101,25 @@ class WS:
:param raw_data: 原始事件数据字典 :param raw_data: 原始事件数据字典
""" """
try: try:
# 解析为 Event 对象 # 使用工厂创建事件对象
event = Event.from_dict(raw_data) event = EventFactory.create_event(raw_data)
event.bot = self.bot event.bot = self.bot # 注入 Bot 实例
# 格式化时间用于打印 # 打印日志
t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S") t = datetime.fromtimestamp(event.time).strftime("%H:%M:%S")
# --- 分流处理 ---
# A. 消息事件 (Message)
if event.post_type == "message": if event.post_type == "message":
print( sender_name = event.sender.nickname if event.sender else "Unknown"
f" [{t}] [消息] {event.message_type} | {event.user_id}: {event.raw_message}" print(f" [{t}] [消息] {event.message_type} | {event.user_id}({sender_name}): {event.raw_message}")
)
await matcher.handle_message(self.bot, event)
# B. 通知事件 (Notice)
elif event.post_type == "notice": elif event.post_type == "notice":
print( print(f" [{t}] [通知] {event.notice_type}")
f" [{t}] [通知] {event.notice_type} | 来自: {event.group_id or '私聊'}"
)
await matcher.handle_notice(self.bot, event)
# C. 请求事件 (Request)
elif event.post_type == "request": elif event.post_type == "request":
print(f" [{t}] [请求] {event.request_type} | 内容: {event.comment}") print(f" [{t}] [请求] {event.request_type}")
await matcher.handle_request(self.bot, event)
# D. 元事件 (Meta Event) - 通常用来心跳检测,可不处理 # 分发事件
elif event.post_type == "meta_event": await matcher.handle_event(self.bot, event)
pass
except Exception as e: except Exception as e:
print(f"事件分发失败: {e}") print(f" 事件处理异常: {e}")
traceback.print_exc() traceback.print_exc()
async def call_api(self, action: str, params: dict = None): async def call_api(self, action: str, params: dict = None):

View File

@@ -1,3 +1,25 @@
from .event import Event, MessageSegment from .message import MessageSegment
from .sender import Sender
from .events.base import OneBotEvent
from .events.message import MessageEvent, PrivateMessageEvent, GroupMessageEvent
from .events.notice import NoticeEvent
from .events.request import RequestEvent
from .events.meta import MetaEvent
from .events.factory import EventFactory
__all__ = ["Event", "MessageSegment", "Sender"] # Alias for backward compatibility
Event = OneBotEvent
__all__ = [
"MessageSegment",
"Sender",
"OneBotEvent",
"Event",
"MessageEvent",
"PrivateMessageEvent",
"GroupMessageEvent",
"NoticeEvent",
"RequestEvent",
"MetaEvent",
"EventFactory",
]

View File

@@ -1,11 +0,0 @@
"""
基础数据模型模块
"""
class BaseModel:
"""
基础模型类
"""
pass

View File

@@ -1,191 +0,0 @@
"""
事件模型模块
定义了 Event 类和 MessageSegment 类,用于封装 OneBot 11 的上报事件和消息段。
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from .sender import Sender
if TYPE_CHECKING:
from core.bot import Bot
@dataclass
class MessageSegment:
"""
消息段,对应 OneBot 11 标准中的消息段对象
"""
type: str
"""消息段类型,如 text, image, at 等"""
data: Dict[str, Any]
"""消息段数据"""
@property
def text(self) -> str:
"""
获取文本内容(仅当 type 为 text 时有效)
:return: 文本内容
"""
return self.data.get("text", "") if self.type == "text" else ""
@property
def image_url(self) -> str:
"""
获取图片 URL仅当 type 为 image 时有效)
:return: 图片 URL
"""
return self.data.get("url", "") if self.type == "image" else ""
def is_at(self, user_id: int = None) -> bool:
"""
判断是否为 @某人
:param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型
:return: 是否匹配
"""
if self.type != "at":
return False
if user_id is None:
return True
return str(self.data.get("qq")) == str(user_id)
def __repr__(self):
return f"[MS:{self.type}:{self.data}]"
@dataclass
class Event:
"""
事件类,封装了 OneBot 11 的上报事件
"""
post_type: str
"""上报类型: message, notice, request, meta_event"""
self_id: int
"""收到消息的机器人 QQ 号"""
time: int
"""事件发生的时间戳"""
# --- 消息事件字段 ---
message_type: Optional[str] = None
"""消息类型: group, private"""
sub_type: Optional[str] = None
"""消息子类型"""
message_id: Optional[int] = None
"""消息 ID"""
user_id: Optional[int] = None
"""发送者 QQ 号"""
raw_message: Optional[str] = None
"""原始消息内容"""
message: List[MessageSegment] = field(default_factory=list)
"""消息内容列表"""
sender: Optional[Sender] = None
"""发送者信息"""
group_id: Optional[int] = None
"""群号"""
target_id: Optional[int] = None
"""目标 QQ 号"""
# --- 通知事件字段 ---
notice_type: Optional[str] = None
"""通知类型"""
operator_id: Optional[int] = None
"""操作者 QQ 号"""
duration: Optional[int] = None
"""时长"""
honor_type: Optional[str] = None
"""荣誉类型"""
# --- 请求事件字段 ---
request_type: Optional[str] = None
"""请求类型"""
flag: Optional[str] = None
"""请求 flag"""
comment: Optional[str] = None
"""验证信息"""
# 注入的 Bot 实例
bot: Optional["Bot"] = field(default=None, init=False)
"""关联的 Bot 实例"""
async def reply(self, message: str) -> dict:
"""
快捷回复消息
:param message: 回复内容
:return: API 响应结果
"""
if not self.bot:
return {"status": "failed", "msg": "Bot instance not attached to event"}
return await self.bot.send(self, message)
@classmethod
def from_dict(cls, data: dict):
"""
从字典创建 Event 对象
:param data: 原始事件数据字典
:return: Event 对象
"""
raw_msg_array = data.get("message")
segments = []
if isinstance(raw_msg_array, list):
segments = [
MessageSegment(type=seg["type"], data=seg["data"])
for seg in raw_msg_array
]
sender_data = data.get("sender")
sender_obj = None
if isinstance(sender_data, dict):
sender_obj = Sender(
**{k: v for k, v in sender_data.items() if k in Sender.__annotations__}
)
# 数据整合
processed_data = data.copy()
processed_data["message"] = segments
processed_data["sender"] = sender_obj
# 字段过滤:只提取 dataclass 中定义的字段
valid_data = {
k: v for k, v in processed_data.items() if k in cls.__annotations__
}
return cls(**valid_data)
# --- 快捷判断工具 ---
@property
def is_message(self) -> bool:
"""是否为消息事件"""
return self.post_type == "message"
@property
def is_notice(self) -> bool:
"""是否为通知事件"""
return self.post_type == "notice"
@property
def is_request(self) -> bool:
"""是否为请求事件"""
return self.post_type == "request"

68
models/events/base.py Normal file
View File

@@ -0,0 +1,68 @@
"""
基础事件模型模块
定义了所有 OneBot 11 事件的基类和事件类型枚举。
"""
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Optional
from abc import ABC, abstractmethod
if TYPE_CHECKING:
from core.bot import Bot
class EventType:
"""
事件类型枚举
"""
META = 'meta_event' # 元事件
REQUEST = 'request' # 请求事件
NOTICE = 'notice' # 通知事件
MESSAGE = 'message' # 消息事件
MESSAGE_SENT = 'message_sent' # 消息发送事件
@dataclass
class OneBotEvent(ABC):
"""
OneBot 事件基类
所有具体的事件类型都应该继承自此类
"""
time: int
"""事件发生的时间戳"""
self_id: int
"""收到事件的机器人 QQ 号"""
_bot: Optional["Bot"] = field(default=None, init=False)
"""Bot 实例引用,用于快捷调用 API"""
@property
@abstractmethod
def post_type(self) -> str:
"""
上报类型
"""
pass
@property
def bot(self) -> "Bot":
"""
获取 Bot 实例
:return: Bot 实例
:raises ValueError: 如果 Bot 实例未设置
"""
if self._bot is None:
raise ValueError("Bot instance not set for this event")
return self._bot
@bot.setter
def bot(self, value: "Bot"):
"""
设置 Bot 实例
:param value: Bot 实例
"""
self._bot = value

275
models/events/factory.py Normal file
View File

@@ -0,0 +1,275 @@
"""
事件工厂模块
用于根据 JSON 数据创建对应的事件对象。
"""
from typing import Any, Dict
from models.message import MessageSegment
from models.sender import Sender
from .base import OneBotEvent, EventType
from .message import GroupMessageEvent, PrivateMessageEvent, Anonymous
from .notice import (
NoticeEvent, FriendAddNoticeEvent, FriendRecallNoticeEvent,
GroupRecallNoticeEvent, GroupIncreaseNoticeEvent,
GroupDecreaseNoticeEvent, GroupAdminNoticeEvent, GroupBanNoticeEvent,
GroupUploadNoticeEvent, GroupUploadFile
)
from .request import RequestEvent, FriendRequestEvent, GroupRequestEvent
from .meta import MetaEvent, HeartbeatEvent, LifeCycleEvent, HeartbeatStatus
class EventFactory:
"""
事件工厂类
"""
@staticmethod
def create_event(data: Dict[str, Any]) -> OneBotEvent:
"""
根据数据创建事件对象
:param data: 事件数据字典
:return: 对应的事件对象
:raises ValueError: 如果事件类型未知
"""
post_type = data.get("post_type")
# 提取公共字段
common_args = {
"time": data.get("time", 0),
"self_id": data.get("self_id", 0),
}
if post_type == EventType.MESSAGE or post_type == EventType.MESSAGE_SENT:
return EventFactory._create_message_event(data, common_args)
elif post_type == EventType.NOTICE:
return EventFactory._create_notice_event(data, common_args)
elif post_type == EventType.REQUEST:
return EventFactory._create_request_event(data, common_args)
elif post_type == EventType.META:
return EventFactory._create_meta_event(data, common_args)
else:
# 未知类型的事件,抛出异常
raise ValueError(f"Unknown event type: {post_type}")
@staticmethod
def _create_message_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent:
"""
创建消息事件
:param data: 事件数据
:param common_args: 公共参数
:return: 消息事件对象
"""
message_type = data.get("message_type")
# 解析消息段
message_list = []
raw_message_list = data.get("message", [])
if isinstance(raw_message_list, list):
for item in raw_message_list:
if isinstance(item, dict):
message_list.append(MessageSegment(type=item.get("type", ""), data=item.get("data", {})))
# 解析发送者
sender_data = data.get("sender", {})
sender = Sender(
user_id=sender_data.get("user_id", 0),
nickname=sender_data.get("nickname", ""),
sex=sender_data.get("sex", "unknown"),
age=sender_data.get("age", 0),
card=sender_data.get("card"),
area=sender_data.get("area"),
level=sender_data.get("level"),
role=sender_data.get("role"),
title=sender_data.get("title"),
)
msg_args = {
**common_args,
"message_type": message_type,
"sub_type": data.get("sub_type", ""),
"message_id": data.get("message_id", 0),
"user_id": data.get("user_id", 0),
"message": message_list,
"raw_message": data.get("raw_message", ""),
"font": data.get("font", 0),
"sender": sender,
}
if message_type == "private":
return PrivateMessageEvent(**msg_args)
elif message_type == "group":
anonymous_data = data.get("anonymous")
anonymous = None
if anonymous_data:
anonymous = Anonymous(
id=anonymous_data.get("id", 0),
name=anonymous_data.get("name", ""),
flag=anonymous_data.get("flag", "")
)
return GroupMessageEvent(
**msg_args,
group_id=data.get("group_id", 0),
anonymous=anonymous,
)
else:
# 未知消息类型,抛出异常
raise ValueError(f"Unknown message type: {message_type}")
@staticmethod
def _create_notice_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent:
"""
创建通知事件
:param data: 事件数据
:param common_args: 公共参数
:return: 通知事件对象
"""
notice_type = data.get("notice_type", "")
# 好友相关通知
if notice_type == "friend_add":
return FriendAddNoticeEvent(
**common_args,
notice_type=notice_type,
user_id=data.get("user_id", 0)
)
elif notice_type == "friend_recall":
return FriendRecallNoticeEvent(
**common_args,
notice_type=notice_type,
user_id=data.get("user_id", 0),
message_id=data.get("message_id", 0)
)
# 群相关通知
elif notice_type == "group_recall":
return GroupRecallNoticeEvent(
**common_args,
notice_type=notice_type,
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
operator_id=data.get("operator_id", 0),
message_id=data.get("message_id", 0)
)
elif notice_type == "group_increase":
return GroupIncreaseNoticeEvent(
**common_args,
notice_type=notice_type,
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
operator_id=data.get("operator_id", 0),
sub_type=data.get("sub_type", "")
)
elif notice_type == "group_decrease":
return GroupDecreaseNoticeEvent(
**common_args,
notice_type=notice_type,
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
operator_id=data.get("operator_id", 0),
sub_type=data.get("sub_type", "")
)
elif notice_type == "group_admin":
return GroupAdminNoticeEvent(
**common_args,
notice_type=notice_type,
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
sub_type=data.get("sub_type", "")
)
elif notice_type == "group_ban":
return GroupBanNoticeEvent(
**common_args,
notice_type=notice_type,
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
operator_id=data.get("operator_id", 0),
duration=data.get("duration", 0),
sub_type=data.get("sub_type", "")
)
elif notice_type == "group_upload":
file_data = data.get("file", {})
file = GroupUploadFile(
id=file_data.get("id", ""),
name=file_data.get("name", ""),
size=file_data.get("size", 0),
busid=file_data.get("busid", 0)
)
return GroupUploadNoticeEvent(
**common_args,
notice_type=notice_type,
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
file=file
)
else:
# 未知通知类型,返回基础通知事件
return NoticeEvent(**common_args, notice_type=notice_type)
@staticmethod
def _create_request_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent:
"""
创建请求事件
:param data: 事件数据
:param common_args: 公共参数
:return: 请求事件对象
"""
request_type = data.get("request_type", "")
if request_type == "friend":
return FriendRequestEvent(
**common_args,
request_type=request_type,
user_id=data.get("user_id", 0),
comment=data.get("comment", ""),
flag=data.get("flag", "")
)
elif request_type == "group":
return GroupRequestEvent(
**common_args,
request_type=request_type,
sub_type=data.get("sub_type", ""),
group_id=data.get("group_id", 0),
user_id=data.get("user_id", 0),
comment=data.get("comment", ""),
flag=data.get("flag", "")
)
else:
# 未知请求类型,返回基础请求事件
return RequestEvent(**common_args, request_type=request_type)
@staticmethod
def _create_meta_event(data: Dict[str, Any], common_args: Dict[str, Any]) -> OneBotEvent:
"""
创建元事件
:param data: 事件数据
:param common_args: 公共参数
:return: 元事件对象
"""
meta_event_type = data.get("meta_event_type", "")
if meta_event_type == "heartbeat":
status_data = data.get("status", {})
status = HeartbeatStatus(
online=status_data.get("online"),
good=status_data.get("good", True)
)
return HeartbeatEvent(
**common_args,
meta_event_type=meta_event_type,
status=status,
interval=data.get("interval", 0)
)
elif meta_event_type == "lifecycle":
return LifeCycleEvent(
**common_args,
meta_event_type=meta_event_type,
sub_type=data.get("sub_type", "")
)
else:
# 未知元事件类型,返回基础元事件
return MetaEvent(**common_args, meta_event_type=meta_event_type)

116
models/events/message.py Normal file
View File

@@ -0,0 +1,116 @@
"""
消息事件模型模块
定义了消息相关的事件类,包括 MessageEvent, PrivateMessageEvent, GroupMessageEvent。
"""
from dataclasses import dataclass, field
from typing import List, Optional
from models.message import MessageSegment
from models.sender import Sender
from .base import OneBotEvent, EventType
@dataclass
class Anonymous:
"""
匿名信息
"""
id: int = 0
"""匿名用户 ID"""
name: str = ""
"""匿名用户名称"""
flag: str = ""
"""匿名用户 flag"""
@dataclass
class MessageEvent(OneBotEvent):
"""
消息事件基类
"""
message_type: str
"""消息类型: private (私聊), group (群聊)"""
sub_type: str
"""
消息子类型
如果是私聊消息,可能是 friend, group, other, normal, anonymous, notice
如果是群聊消息,可能是 normal, anonymous, notice
"""
message_id: int
"""消息 ID"""
user_id: int
"""发送者 QQ 号"""
message: List[MessageSegment] = field(default_factory=list)
"""消息内容列表"""
raw_message: str = ""
"""原始消息内容"""
font: int = 0
"""字体"""
sender: Optional[Sender] = None
"""发送者信息"""
@property
def post_type(self) -> str:
return EventType.MESSAGE
async def reply(self, message: str, auto_escape: bool = False):
"""
回复消息(抽象方法,由子类实现)
:param message: 回复内容
:param auto_escape: 是否自动转义
"""
raise NotImplementedError("reply method must be implemented by subclasses")
@dataclass
class PrivateMessageEvent(MessageEvent):
"""
私聊消息事件
"""
async def reply(self, message: str, auto_escape: bool = False):
"""
回复私聊消息
:param message: 回复内容
:param auto_escape: 是否自动转义
"""
await self.bot.send_private_msg(
user_id=self.user_id, message=message, auto_escape=auto_escape
)
@dataclass
class GroupMessageEvent(MessageEvent):
"""
群聊消息事件
"""
group_id: int = 0
"""群号"""
anonymous: Optional[Anonymous] = None
"""匿名信息"""
async def reply(self, message: str, auto_escape: bool = False):
"""
回复群聊消息
:param message: 回复内容
:param auto_escape: 是否自动转义
"""
await self.bot.send_group_msg(
group_id=self.group_id, message=message, auto_escape=auto_escape
)

67
models/events/meta.py Normal file
View File

@@ -0,0 +1,67 @@
"""
元事件模型模块
定义了元事件相关的事件类,包括心跳事件和生命周期事件。
"""
from dataclasses import dataclass, field
from typing import Optional
from .base import OneBotEvent, EventType
@dataclass
class HeartbeatStatus:
"""
心跳状态接口
"""
online: Optional[bool] = None # 是否在线
good: bool = True # 状态是否良好
class LifeCycleSubType:
"""
生命周期子类型枚举
"""
ENABLE = 'enable' # 启用
DISABLE = 'disable' # 禁用
CONNECT = 'connect' # 连接
@dataclass
class MetaEvent(OneBotEvent):
"""
元事件基类
"""
meta_event_type: str
"""元事件类型"""
@property
def post_type(self) -> str:
return EventType.META
@dataclass
class HeartbeatEvent(MetaEvent):
"""
心跳事件,用于确认连接状态
"""
meta_event_type: str = 'heartbeat'
"""元事件类型:心跳事件"""
status: HeartbeatStatus = field(default_factory=HeartbeatStatus)
"""状态信息"""
interval: int = 0
"""心跳间隔时间(ms)"""
@dataclass
class LifeCycleEvent(MetaEvent):
"""
生命周期事件,用于通知框架生命周期变化
"""
meta_event_type: str = 'lifecycle'
"""元事件类型:生命周期事件"""
sub_type: LifeCycleSubType = LifeCycleSubType.ENABLE
"""子类型:启用、禁用、连接"""

159
models/events/notice.py Normal file
View File

@@ -0,0 +1,159 @@
"""
通知事件模型模块
定义了通知相关的事件类,包括好友通知和群组通知等。
"""
from dataclasses import dataclass, field
from .base import OneBotEvent, EventType
@dataclass
class NoticeEvent(OneBotEvent):
"""
通知事件基类
"""
notice_type: str
"""通知类型"""
@property
def post_type(self) -> str:
return EventType.NOTICE
@dataclass
class FriendAddNoticeEvent(NoticeEvent):
"""
好友添加通知
"""
user_id: int = 0
"""新好友 QQ 号"""
@dataclass
class FriendRecallNoticeEvent(NoticeEvent):
"""
好友消息撤回通知
"""
user_id: int = 0
"""消息发送者 QQ 号"""
message_id: int = 0
"""被撤回的消息 ID"""
@dataclass
class GroupNoticeEvent(NoticeEvent):
"""
群组通知事件基类
"""
group_id: int = 0
"""群号"""
user_id: int = 0
"""用户 QQ 号"""
@dataclass
class GroupRecallNoticeEvent(GroupNoticeEvent):
"""
群消息撤回通知
"""
operator_id: int = 0
"""操作者 QQ 号"""
message_id: int = 0
"""被撤回的消息 ID"""
@dataclass
class GroupIncreaseNoticeEvent(GroupNoticeEvent):
"""
群成员增加通知
"""
operator_id: int = 0
"""操作者 QQ 号"""
sub_type: str = ""
"""
子类型
approve: 管理员同意入群
invite: 管理员邀请入群
"""
@dataclass
class GroupDecreaseNoticeEvent(GroupNoticeEvent):
"""
群成员减少通知
"""
operator_id: int = 0
"""操作者 QQ 号(如果是主动退群,则和 user_id 相同)"""
sub_type: str = ""
"""
子类型
leave: 主动退群
kick: 成员被踢
kick_me: 登录号被踢
disband: 群被解散
"""
@dataclass
class GroupAdminNoticeEvent(GroupNoticeEvent):
"""
群管理员变动通知
"""
sub_type: str = ""
"""
子类型
set: 设置管理员
unset: 取消管理员
"""
@dataclass
class GroupBanNoticeEvent(GroupNoticeEvent):
"""
群禁言通知
"""
operator_id: int = 0
"""操作者 QQ 号(管理员)"""
duration: int = 0
"""禁言时长(秒)0 表示解除禁言"""
sub_type: str = ""
"""
子类型
ban: 禁言
lift_ban: 解除禁言
"""
@dataclass
class GroupUploadFile:
"""
群文件信息
"""
id: str = ""
"""文件 ID"""
name: str = ""
"""文件名"""
size: int = 0
"""文件大小(Byte)"""
busid: int = 0
"""文件总线 ID"""
@dataclass
class GroupUploadNoticeEvent(GroupNoticeEvent):
"""
群文件上传通知
"""
file: GroupUploadFile = field(default_factory=GroupUploadFile)
"""文件信息"""

61
models/events/request.py Normal file
View File

@@ -0,0 +1,61 @@
"""
请求事件模型模块
定义了请求相关的事件类。
"""
from dataclasses import dataclass
from .base import OneBotEvent, EventType
@dataclass
class RequestEvent(OneBotEvent):
"""
请求事件基类
"""
request_type: str
"""请求类型"""
@property
def post_type(self) -> str:
return EventType.REQUEST
@dataclass
class FriendRequestEvent(RequestEvent):
"""
加好友请求事件
"""
user_id: int = 0
"""发送请求的 QQ 号"""
comment: str = ""
"""验证信息"""
flag: str = ""
"""请求 flag在调用处理请求的 API 时需要传入此 flag"""
@dataclass
class GroupRequestEvent(RequestEvent):
"""
加群请求/邀请事件
"""
sub_type: str = ""
"""
子类型
add: 加群请求
invite: 邀请登录号入群
"""
group_id: int = 0
"""群号"""
user_id: int = 0
"""发送请求的 QQ 号"""
comment: str = ""
"""验证信息"""
flag: str = ""
"""请求 flag在调用处理请求的 API 时需要传入此 flag"""

54
models/message.py Normal file
View File

@@ -0,0 +1,54 @@
"""
消息段模型模块
定义了 MessageSegment 类,用于封装 OneBot 11 的消息段。
"""
from dataclasses import dataclass
from typing import Any, Dict
@dataclass
class MessageSegment:
"""
消息段,对应 OneBot 11 标准中的消息段对象
"""
type: str
"""消息段类型,如 text, image, at 等"""
data: Dict[str, Any]
"""消息段数据"""
@property
def text(self) -> str:
"""
获取文本内容(仅当 type 为 text 时有效)
:return: 文本内容
"""
return self.data.get("text", "") if self.type == "text" else ""
@property
def image_url(self) -> str:
"""
获取图片 URL仅当 type 为 image 时有效)
:return: 图片 URL
"""
return self.data.get("url", "") if self.type == "image" else ""
def is_at(self, user_id: int = None) -> bool:
"""
判断是否为 @某人
:param user_id: 指定的 QQ 号,如果为 None 则只判断是否为 at 类型
:return: 是否匹配
"""
if self.type != "at":
return False
if user_id is None:
return True
return str(self.data.get("qq")) == str(user_id)
def __repr__(self):
return f"[MS:{self.type}:{self.data}]"