Files
NeoBot/core/api/message.py
2026-01-02 14:22:35 +08:00

142 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
消息相关 API 模块
"""
from typing import Union, List, Dict, Any, TYPE_CHECKING
from .base import BaseAPI
if TYPE_CHECKING:
from models import MessageSegment, OneBotEvent
class MessageAPI(BaseAPI):
"""
消息相关 API Mixin
"""
async def send_group_msg(self, group_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
发送群消息
:param group_id: 群号
:param message: 消息内容可以是字符串、MessageSegment 对象或 MessageSegment 列表
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
:return: API 响应结果
"""
return await self.call_api(
"send_group_msg", {"group_id": group_id, "message": self._process_message(message), "auto_escape": auto_escape}
)
async def send_private_msg(self, user_id: int, message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
发送私聊消息
:param user_id: 用户 QQ 号
:param message: 消息内容可以是字符串、MessageSegment 对象或 MessageSegment 列表
:param auto_escape: 是否自动转义(仅当 message 为字符串时有效)
:return: API 响应结果
"""
return await self.call_api(
"send_private_msg", {"user_id": user_id, "message": self._process_message(message), "auto_escape": auto_escape}
)
async def send(self, event: "OneBotEvent", message: Union[str, "MessageSegment", List["MessageSegment"]], auto_escape: bool = False) -> Dict[str, Any]:
"""
智能发送消息,根据事件类型自动选择发送方式
:param event: 触发事件对象
:param message: 消息内容
:param auto_escape: 是否自动转义
:return: API 响应结果
"""
# 如果是消息事件,直接调用 reply
if hasattr(event, "reply"):
await event.reply(message, auto_escape)
return {"status": "ok", "msg": "Replied via event.reply()"}
# 尝试从事件中获取 user_id 或 group_id
user_id = getattr(event, "user_id", None)
group_id = getattr(event, "group_id", None)
if group_id:
return await self.send_group_msg(group_id, message, auto_escape)
elif user_id:
return await self.send_private_msg(user_id, message, auto_escape)
return {"status": "failed", "msg": "Unknown message target"}
async def delete_msg(self, message_id: int) -> Dict[str, Any]:
"""
撤回消息
:param message_id: 消息 ID
:return: API 响应结果
"""
return await self.call_api("delete_msg", {"message_id": message_id})
async def get_msg(self, message_id: int) -> Dict[str, Any]:
"""
获取消息
:param message_id: 消息 ID
:return: API 响应结果
"""
return await self.call_api("get_msg", {"message_id": message_id})
async def get_forward_msg(self, id: str) -> Dict[str, Any]:
"""
获取合并转发消息
:param id: 合并转发 ID
:return: API 响应结果
"""
return await self.call_api("get_forward_msg", {"id": id})
async def can_send_image(self) -> Dict[str, Any]:
"""
检查是否可以发送图片
:return: API 响应结果
"""
return await self.call_api("can_send_image")
async def can_send_record(self) -> Dict[str, Any]:
"""
检查是否可以发送语音
:return: API 响应结果
"""
return await self.call_api("can_send_record")
def _process_message(self, message: Union[str, "MessageSegment", List["MessageSegment"]]) -> Union[str, List[Dict[str, Any]]]:
"""
处理消息内容,将其转换为 API 可接受的格式
:param message: 原始消息内容
:return: 处理后的消息内容
"""
if isinstance(message, str):
return message
# 避免循环导入,在运行时导入
from models import MessageSegment
if isinstance(message, MessageSegment):
return [self._segment_to_dict(message)]
if isinstance(message, list):
return [self._segment_to_dict(m) for m in message if isinstance(m, MessageSegment)]
return str(message)
def _segment_to_dict(self, segment: "MessageSegment") -> Dict[str, Any]:
"""
将 MessageSegment 对象转换为字典
:param segment: MessageSegment 对象
:return: 字典格式的消息段
"""
return {
"type": segment.type,
"data": segment.data
}