Daily build

This commit is contained in:
2026-01-02 14:22:35 +08:00
parent 87fcb78dba
commit 3f76e7d022
17 changed files with 1425 additions and 400 deletions

141
core/api/message.py Normal file
View File

@@ -0,0 +1,141 @@
"""
消息相关 API 模块
"""
from typing import Union, List, Dict, Any, TYPE_CHECKING
from .base import BaseAPI
if TYPE_CHECKING:
from models import MessageSegment, OneBotEvent
class MessageAPI(BaseAPI):
"""
消息相关 API Mixin
"""
async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
发送群消息
:param group_id: 群号
:param message: 消息内容可以是字符串、MessageSegment 对象或 MessageSegment 列表
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
:return: API 响应结果
"""
return await self.call_api(
"send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape}
)
async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
发送私聊消息
:param user_id: 用户 QQ 号
:param message: 消息内容可以是字符串、MessageSegment 对象或 MessageSegment 列表
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
:return: API 响应结果
"""
return await self.call_api(
"send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape}
)
async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
智能发送消息,根据事件类型自动选择发送方式
:param event: 触发事件对象
:param message: 消息内容
:param auto_escape: 是否自动转义
:return: API 响应结果
"""
# 如果是消息事件,直接调用 reply
if hasattr(event, "reply"):
await event.reply(message, auto_escape)
return {"status": "ok", "msg": "Replied via event.reply()"}
# 尝试从事件中获取 user_id 或 group_id
user_id = getattr(event, "user_id", None)
group_id = getattr(event, "group_id", None)
if group_id:
return await self.send_group_msg(group_id, message, auto_escape)
elif user_id:
return await self.send_private_msg(user_id, message, auto_escape)
return {"status": "failed", "msg": "Unknown message target"}
async def delete_msg(self, message_id: int) -> Dict[str, Any]:
"""
撤回消息
:param message_id: 消息 ID
:return: API 响应结果
"""
return await self.call_api("delete_msg", {"message_id": message_id})
async def get_msg(self, message_id: int) -> Dict[str, Any]:
"""
获取消息
:param message_id: 消息 ID
:return: API 响应结果
"""
return await self.call_api("get_msg", {"message_id": message_id})
async def get_forward_msg(self, id: str) -> Dict[str, Any]:
"""
获取合并转发消息
:param id: 合并转发 ID
:return: API 响应结果
"""
return await self.call_api("get_forward_msg", {"id": id})
async def can_send_image(self) -> Dict[str, Any]:
"""
检查是否可以发送图片
:return: API 响应结果
"""
return await self.call_api("can_send_image")
async def can_send_record(self) -> Dict[str, Any]:
"""
检查是否可以发送语音
:return: API 响应结果
"""
return await self.call_api("can_send_record")
def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]:
"""
处理消息内容,将其转换为 API 可接受的格式
:param message: 原始消息内容
:return: 处理后的消息内容
"""
if isinstance(message, str):
return message
# 避免循环导入,在运行时导入
from models import MessageSegment
if isinstance(message, MessageSegment):
return [self._segment_to_dict(message)]
if isinstance(message, list):
return [self._segment_to_dict(m) for m in message if isinstance(m, MessageSegment)]
return str(message)
def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]:
"""
将 MessageSegment 对象转换为字典
:param segment: MessageSegment 对象
:return: 字典格式的消息段
"""
return {
"type": segment.type,
"data": segment.data
}