From 723f7b9724572685cb665a8717b7965da825c3e1 Mon Sep 17 00:00:00 2001 From: K2cr2O1 <2221577113@qq.com> Date: Mon, 5 Jan 2026 21:36:03 +0800 Subject: [PATCH] =?UTF-8?q?CQ=E7=A0=81=E6=94=AF=E6=8C=81=E4=BB=A5=E5=8F=8A?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/message.py | 275 ++++++++++++++++++++++++++++++++++++++++- plugins/bili_parser.py | 119 +++++++++++++----- 2 files changed, 362 insertions(+), 32 deletions(-) diff --git a/models/message.py b/models/message.py index f79d05f..bd93aff 100644 --- a/models/message.py +++ b/models/message.py @@ -42,6 +42,40 @@ class MessageSegment: """ return self.data.get("url", "") if self.type == "image" else "" + @property + def share_url(self) -> str: + """ + 当消息段类型为 'share' 时,快速获取其分享 URL。 + + Returns: + str: 分享的 URL。如果类型不是 'share' 或数据中不含 'url',则返回空字符串。 + """ + return self.data.get("url", "") if self.type == "share" else "" + + @property + def music_url(self) -> str: + """ + 当消息段类型为 'music' 且为 'custom' 类型时,快速获取其 URL。 + + Returns: + str: 音乐的 URL。如果类型不匹配,则返回空字符串。 + """ + if self.type == "music" and self.data.get("type") == "custom": + return self.data.get("url", "") + return "" + + @property + def file_url(self) -> str: + """ + 当消息段类型为 'record', 'video', 'file' 时,快速获取其文件 URL。 + + Returns: + str: 文件的 URL 或路径。如果类型不匹配,则返回空字符串。 + """ + if self.type in ("record", "video", "file"): + return self.data.get("file", "") + return "" + def is_at(self, user_id: int = None) -> bool: """ 检查当前消息段是否是一个 'at' (提及) 消息段。 @@ -81,30 +115,46 @@ class MessageSegment: return MessageSegment(type="text", data={"text": text}) @staticmethod - def at(user_id: int | str) -> "MessageSegment": + def at(user_id: int | str, name: str = None) -> "MessageSegment": """ 创建一个 @某人 的消息段。 Args: user_id (int | str): 要提及的 QQ 号。若为 "all",则表示 @全体成员。 + name (str, optional): 当在群中找不到对应的QQ号时,显示的名称。Defaults to None. Returns: MessageSegment: 一个类型为 'at' 的消息段对象。 """ - return MessageSegment(type="at", data={"qq": str(user_id)}) + data = {"qq": str(user_id)} + if name: + data["name"] = name + return MessageSegment(type="at", data=data) @staticmethod - def image(file: str) -> "MessageSegment": + def image(file: str, image_type: str = None, cache: bool = True, proxy: bool = True, timeout: int = None, sub_type: int = None) -> "MessageSegment": """ 创建一个图片消息段。 Args: file (str): 图片的路径、URL 或 Base64 编码的字符串。 + image_type (str, optional): 图片类型,'flash' 表示闪照。Defaults to None. + cache (bool, optional): 是否使用缓存。Defaults to True. + proxy (bool, optional): 是否通过代理下载。Defaults to True. + timeout (int, optional): 下载超时时间(秒)。Defaults to None. + sub_type (int, optional): 图片子类型,用于特殊图片。Defaults to None. Returns: MessageSegment: 一个类型为 'image' 的消息段对象。 """ - return MessageSegment(type="image", data={"file": file}) + data = {"file": file, "cache": "1" if cache else "0", "proxy": "1" if proxy else "0"} + if image_type: + data["type"] = image_type + if timeout: + data["timeout"] = str(timeout) + if sub_type: + data["subType"] = str(sub_type) + return MessageSegment(type="image", data=data) @staticmethod def face(id: int) -> "MessageSegment": @@ -119,3 +169,220 @@ class MessageSegment: """ return MessageSegment(type="face", data={"id": str(id)}) + @staticmethod + def json(data: str) -> "MessageSegment": + """ + 创建一个 JSON 消息段。 + + Args: + data (str): JSON 字符串。 + + Returns: + MessageSegment: 一个类型为 'json' 的消息段对象。 + """ + return MessageSegment(type="json", data={"data": data}) + @staticmethod + def xml(data: str) -> "MessageSegment": + """ + 创建一个 XML 消息段。 + + Args: + data (str): XML 字符串。 + + Returns: + MessageSegment: 一个类型为 'xml' 的消息段对象。 + """ + return MessageSegment(type="xml", data={"data": data}) + @staticmethod + def share(url: str, title: str, content: str = None, image: str = None) -> "MessageSegment": + """ + 创建一个分享消息段。 + + Args: + url (str): 分享的 URL。 + title (str): 分享的标题。 + content (str, optional): 分享的描述内容。Defaults to None. + image (str, optional): 分享的图片 URL。Defaults to None. + + Returns: + MessageSegment: 一个类型为 'share' 的消息段对象。 + """ + data = {"url": url, "title": title} + if content: + data["content"] = content + if image: + data["image"] = image + return MessageSegment(type="share", data=data) + @staticmethod + def music(type: str, id: str) -> "MessageSegment": + """ + 创建一个音乐消息段。 + + Args: + type (str): 音乐平台类型,如 "qq"、"xiami" 等。 + id (str): 音乐在平台上的唯一标识符。 + + Returns: + MessageSegment: 一个类型为 'music' 的消息段对象。 + """ + return MessageSegment(type="music", data={"type": type, "id": id}) + @staticmethod + def music_custom(url: str, audio: str, title: str, content: str = None, image: str = None) -> "MessageSegment": + """ + 创建一个自定义音乐消息段。 + + Args: + url (str): 音乐的 URL。 + audio (str): 音乐的音频 URL。 + title (str): 音乐的标题。 + content (str, optional): 音乐的描述内容。Defaults to None. + image (str, optional): 音乐的图片 URL。Defaults to None. + + Returns: + MessageSegment: 一个类型为 'music_custom' 的消息段对象。 + """ + data = {"url": url, "audio": audio, "title": title} + if content: + data["content"] = content + if image: + data["image"] = image + return MessageSegment(type="music", data={"type": "custom", **data}) + @staticmethod + def record(file: str, magic: bool = False, cache: bool = True, proxy: bool = True, timeout: int = None) -> "MessageSegment": + """ + 创建一个语音消息段。 + + Args: + file (str): 语音的路径、URL 或 Base64 编码的字符串。 + magic (bool, optional): 是否为变声。Defaults to False. + cache (bool, optional): 是否使用缓存。Defaults to True. + proxy (bool, optional): 是否通过代理下载。Defaults to True. + timeout (int, optional): 下载超时时间(秒)。Defaults to None. + + Returns: + MessageSegment: 一个类型为 'record' 的消息段对象。 + """ + data = {"file": file, "magic": "1" if magic else "0", "cache": "1" if cache else "0", "proxy": "1" if proxy else "0"} + if timeout: + data["timeout"] = str(timeout) + return MessageSegment(type="record", data=data) + @staticmethod + def video(file: str, cover: str = None, c: int = 2) -> "MessageSegment": + """ + 创建一个视频消息段。 + + Args: + file (str): 视频的路径、URL 或 Base64 编码的字符串。 + cover (str, optional): 视频封面,支持http, file和base64。Defaults to None. + c (int, optional): 下载线程数。Defaults to 2. + + Returns: + MessageSegment: 一个类型为 'video' 的消息段对象。 + """ + data = {"file": file, "c": str(c)} + if cover: + data["cover"] = cover + return MessageSegment(type="video", data=data) + @staticmethod + def file(file: str) -> "MessageSegment": + """ + 创建一个文件消息段。 + + Args: + file (str): 文件的路径、URL 或 Base64 编码的字符串。 + + Returns: + MessageSegment: 一个类型为 'file' 的消息段对象。 + """ + return MessageSegment(type="file", data={"file": file}) + + @staticmethod + def reply(message_id: str) -> "MessageSegment": + """ + 创建一个回复消息段。 + + Args: + message_id (str): 被回复的消息 ID。 + + Returns: + MessageSegment: 一个类型为 'reply' 的消息段对象。 + """ + return MessageSegment(type="reply", data={"id": message_id}) + + @staticmethod + def rps() -> "MessageSegment": + """ + 创建一个猜拳魔法表情消息段。 + + Returns: + MessageSegment: 一个类型为 'rps' 的消息段对象。 + """ + return MessageSegment(type="rps", data={}) + + @staticmethod + def dice() -> "MessageSegment": + """ + 创建一个掷骰子魔法表情消息段。 + + Returns: + MessageSegment: 一个类型为 'dice' 的消息段对象。 + """ + return MessageSegment(type="dice", data={}) + + @staticmethod + def shake() -> "MessageSegment": + """ + 创建一个戳一戳消息段。 + + Returns: + MessageSegment: 一个类型为 'shake' 的消息段对象。 + """ + return MessageSegment(type="shake", data={}) + + @staticmethod + def anonymous(ignore: bool = False) -> "MessageSegment": + """ + 创建一个匿名消息段。 + + Args: + ignore (bool, optional): 发送失败时是否忽略。Defaults to False. + + Returns: + MessageSegment: 一个类型为 'anonymous' 的消息段对象。 + """ + return MessageSegment(type="anonymous", data={"ignore": "1" if ignore else "0"}) + + @staticmethod + def contact(contact_type: str, contact_id: int) -> "MessageSegment": + """ + 创建一个推荐好友/群消息段。 + + Args: + contact_type (str): 推荐类型,'qq' 或 'group'。 + contact_id (int): 被推荐的 QQ 号或群号。 + + Returns: + MessageSegment: 一个类型为 'contact' 的消息段对象。 + """ + return MessageSegment(type="contact", data={"type": contact_type, "id": str(contact_id)}) + + @staticmethod + def location(lat: float, lon: float, title: str = "", content: str = "") -> "MessageSegment": + """ + 创建一个位置消息段。 + + Args: + lat (float): 纬度。 + lon (float): 经度。 + title (str, optional): 标题。Defaults to "". + content (str, optional): 内容描述。Defaults to "". + + Returns: + MessageSegment: 一个类型为 'location' 的消息段对象。 + """ + data = {"lat": str(lat), "lon": str(lon)} + if title: + data["title"] = title + if content: + data["content"] = content + return MessageSegment(type="location", data=data) \ No newline at end of file diff --git a/plugins/bili_parser.py b/plugins/bili_parser.py index 8c08865..a44170f 100644 --- a/plugins/bili_parser.py +++ b/plugins/bili_parser.py @@ -1,10 +1,9 @@ # -*- coding: utf-8 -*- import re import json -import html import requests from bs4 import BeautifulSoup -from typing import Optional, Tuple, Dict, Any +from typing import Optional, Dict, Any from core.logger import logger from core.command_manager import matcher @@ -29,6 +28,14 @@ def format_count(num: int) -> str: return f"{num / 10000:.1f}万" +def format_duration(seconds: int) -> str: + """将秒数格式化为 MM:SS 的形式""" + if not isinstance(seconds, int) or seconds < 0: + return "滚木" + minutes, seconds = divmod(seconds, 60) + return f"{minutes:02d}:{seconds:02d}" + + def get_real_url(short_url: str) -> Optional[str]: try: response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) @@ -52,23 +59,35 @@ def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: data = json.loads(json_str) video_data = data.get('videoData', {}) + up_data = data.get('upData', {}) stat = video_data.get('stat', {}) + owner = video_data.get('owner', {}) cover_url = video_data.get('pic', '') if cover_url: cover_url = cover_url.split('@')[0] if cover_url.startswith('//'): cover_url = 'https:' + cover_url + + owner_avatar = owner.get('face', '') + if owner_avatar: + if owner_avatar.startswith('//'): + owner_avatar = 'https:' + owner_avatar + owner_avatar = owner_avatar.split('@')[0] return { "title": video_data.get('title', '未知标题'), "bvid": video_data.get('bvid', '未知BV号'), + "duration": video_data.get('duration', 0), "cover_url": cover_url, "play": stat.get('view', 0), "like": stat.get('like', 0), "coin": stat.get('coin', 0), "favorite": stat.get('favorite', 0), "share": stat.get('share', 0), + "owner_name": owner.get('name', '未知UP主'), + "owner_avatar": owner_avatar, + "followers": up_data.get('fans', 0), } except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e: @@ -76,33 +95,52 @@ def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: return None +def get_direct_video_url(video_url: str) -> Optional[str]: + """ + 调用第三方API解析B站视频直链 + :param video_url: B站视频的完整URL + :return: 视频直链URL,如果失败则返回None + """ + api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json" + try: + response = requests.get(api_url, headers=HEADERS, timeout=10) + response.raise_for_status() + data = response.json() + if data.get("code") == 200 and data.get("data"): + return data["data"][0].get("video_url") + except (requests.RequestException, json.JSONDecodeError, KeyError, IndexError) as e: + logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}") + return None + @matcher.on_message() async def handle_bili_share(event: MessageEvent): - if not event.raw_message.startswith('[CQ:json,data='): - return + # 遍历消息段,寻找JSON CQ码 + for segment in event.message: + if segment.type == "json": + logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}") + try: + # 直接从segment的data中获取json字符串 + json_data = json.loads(segment.data.get("data", "{}")) + + # 提取B站短链接 + short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl") + + if not short_url or "b23.tv" not in short_url: + continue # 如果不是B站链接,继续检查下一个segment + + short_url = short_url.split('?')[0] + logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}") + + # 找到了有效的B站链接,处理并跳出循环 + await process_bili_link(event, short_url) + break - logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}") + except (json.JSONDecodeError, KeyError) as e: + logger.error(f"[bili_parser] 解析JSON失败: {e}") + continue - try: - json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']') - - json_str_decoded = html.unescape(json_str_raw) - - data = json.loads(json_str_decoded) - - short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl") - - if not short_url or "b23.tv" not in short_url: - logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。") - return - - short_url = short_url.split('?')[0] - logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}") - - except (json.JSONDecodeError, KeyError) as e: - logger.error(f"[bili_parser] 解析JSON失败: {e}") - return - +async def process_bili_link(event: MessageEvent, short_url: str): + """处理B站链接,获取信息并回复""" real_url = get_real_url(short_url) if not real_url: logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。") @@ -115,11 +153,25 @@ async def handle_bili_share(event: MessageEvent): await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") return + # 检查视频时长 + if video_info['duration'] > 300: # 5分钟 = 300秒 + video_message = "视频太长了。。。" + else: + direct_url = get_direct_video_url(real_url) + if direct_url: + video_message = MessageSegment.video(direct_url) + else: + video_message = "视频解析失败,无法获取直链。" + text_message = ( f"BiliBili 视频解析\n" f"--------------------\n" + f" UP主: {video_info['owner_name']}\n" + f" 粉丝: {format_count(video_info['followers'])}\n" + f"--------------------\n" f" 标题: {video_info['title']}\n" f" BV号: {video_info['bvid']}\n" + f" 时长: {format_duration(video_info['duration'])}\n" f"--------------------\n" f" 数据:\n" f" 播放: {format_count(video_info['play'])}\n" @@ -130,12 +182,23 @@ async def handle_bili_share(event: MessageEvent): f" B站链接: {short_url}" ) - image_message_segment = [MessageSegment.text("B站封面:"),MessageSegment.image(video_info['cover_url'])] + image_message_segment = [ + MessageSegment.text("B站封面:"), + MessageSegment.image(video_info['cover_url']) + ] + + up_info_segment = [ + MessageSegment.text("UP主头像:"), + MessageSegment.image(video_info['owner_avatar']) + ] nodes = [ event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message), - event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment) + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment), + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=up_info_segment), + event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=video_message) ] logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}") - await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes) + # 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊 + await event.bot.send_forwarded_messages(target=event, nodes=nodes)