# -*- coding: utf-8 -*- import re import json import aiohttp from bs4 import BeautifulSoup from typing import Optional, Dict, Any, Union from cachetools import TTLCache from core.utils.logger import logger from core.managers.command_manager import matcher from models import MessageEvent, MessageSegment # 创建一个TTL缓存,最大容量100,缓存时间10秒 processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) __plugin_meta__ = { "name": "bili_parser", "description": "自动解析B站分享卡片,提取视频封面和播放量等信息。", "usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。", } HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # 全局共享的 ClientSession _session: Optional[aiohttp.ClientSession] = None async def get_session() -> aiohttp.ClientSession: global _session if _session is None or _session.closed: _session = aiohttp.ClientSession() return _session def format_count(num: int) -> str: if not isinstance(num, int): return str(num) if num < 10000: return str(num) return f"{num / 10000:.1f}万" def format_duration(seconds: int) -> str: """将秒数格式化为 MM:SS 的形式""" if not isinstance(seconds, int) or seconds < 0: return "滚木" minutes, seconds = divmod(seconds, 60) return f"{minutes:02d}:{seconds:02d}" async def get_real_url(short_url: str) -> Optional[str]: try: session = await get_session() async with session.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) as response: if response.status == 302: return response.headers.get('Location') except Exception as e: logger.error(f"获取真实URL失败: {e}") return None async def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: try: session = await get_session() async with session.get(video_url, headers=HEADERS, timeout=5) as response: response.raise_for_status() text = await response.text() soup = BeautifulSoup(text, 'html.parser') script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) if not script_tag or not script_tag.string: return None match = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string) if not match: return None json_str = match.group(1) data = json.loads(json_str) video_data = data.get('videoData', {}) up_data = data.get('upData', {}) stat = video_data.get('stat', {}) owner = video_data.get('owner', {}) cover_url = video_data.get('pic', '') if cover_url: cover_url = cover_url.split('@')[0] if cover_url.startswith('//'): cover_url = 'https:' + cover_url owner_avatar = owner.get('face', '') if owner_avatar: if owner_avatar.startswith('//'): owner_avatar = 'https:' + owner_avatar owner_avatar = owner_avatar.split('@')[0] return { "title": video_data.get('title', '未知标题'), "bvid": video_data.get('bvid', '未知BV号'), "duration": video_data.get('duration', 0), "cover_url": cover_url, "play": stat.get('view', 0), "like": stat.get('like', 0), "coin": stat.get('coin', 0), "favorite": stat.get('favorite', 0), "share": stat.get('share', 0), "owner_name": owner.get('name', '未知UP主'), "owner_avatar": owner_avatar, "followers": up_data.get('fans', 0), } except (aiohttp.ClientError, KeyError, AttributeError, json.JSONDecodeError) as e: logger.error(f"解析视频信息失败: {e}") return None async def get_direct_video_url(video_url: str) -> Optional[str]: """ 调用第三方API解析B站视频直链 :param video_url: B站视频的完整URL :return: 视频直链URL,如果失败则返回None """ api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json" try: async with aiohttp.ClientSession() as session: async with session.get(api_url, headers=HEADERS, timeout=10) as response: response.raise_for_status() # 使用 content_type=None 来忽略 Content-Type 检查 # 因为 API 返回 text/json 而不是标准的 application/json data = await response.json(content_type=None) if data.get("code") == 200 and data.get("data"): return data["data"][0].get("video_url") except (aiohttp.ClientError, json.JSONDecodeError, KeyError, IndexError) as e: logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}") return None BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/[a-zA-Z0-9_]+|b23\.tv/[a-zA-Z0-9]+)") @matcher.on_message() async def handle_bili_share(event: MessageEvent): """ 处理消息,检测B站分享链接(JSON卡片或文本链接)并进行解析。 :param event: 消息事件对象 """ # 消息去重 if event.message_id in processed_messages: return processed_messages[event.message_id] = True # 忽略机器人自己发送的消息,防止无限循环 if event.user_id == event.self_id: return url_to_process = None # 1. 优先解析JSON卡片中的短链接 for segment in event.message: if segment.type == "json": logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}") try: json_data = json.loads(segment.data.get("data", "{}")) short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl") if short_url and "b23.tv" in short_url: url_to_process = short_url.split('?')[0] logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {url_to_process}") break # 找到后立即跳出循环 except (json.JSONDecodeError, KeyError) as e: logger.error(f"[bili_parser] 解析JSON失败: {e}") continue # 2. 如果未在JSON卡片中找到链接,则在文本消息中查找 if not url_to_process: for segment in event.message: if segment.type == "text": text_content = segment.data.get("text", "") match = BILI_URL_PATTERN.search(text_content) if match: url_to_process = match.group(0) logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {url_to_process}") break # 找到后立即跳出循环 # 3. 如果找到了任何类型的B站链接,则进行处理 if url_to_process: await process_bili_link(event, url_to_process) async def process_bili_link(event: MessageEvent, url: str): """ 处理B站链接(长链接或短链接),获取信息并回复 :param event: 消息事件对象 :param url: 待处理的B站链接 """ if "b23.tv" in url: real_url = await get_real_url(url) if not real_url: logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。") await event.reply("无法解析B站短链接。") return else: real_url = url.split('?')[0] video_info = await parse_video_info(real_url) if not video_info: logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") return # 检查视频时长 video_message: Union[str, MessageSegment] if video_info['duration'] > 300: # 5分钟 = 300秒 video_message = "视频时长超过5分钟,不进行解析。" else: direct_url = await get_direct_video_url(real_url) if direct_url: video_message = MessageSegment.video(direct_url) else: video_message = "视频解析失败,无法获取直链。" text_message = ( f"BiliBili 视频解析\n" f"--------------------\n" f" UP主: {video_info['owner_name']}\n" f" 粉丝: {format_count(video_info['followers'])}\n" f"--------------------\n" f" 标题: {video_info['title']}\n" f" BV号: {video_info['bvid']}\n" f" 时长: {format_duration(video_info['duration'])}\n" f"--------------------\n" f" 数据:\n" f" 播放: {format_count(video_info['play'])}\n" f" 点赞: {format_count(video_info['like'])}\n" f" 投币: {format_count(video_info['coin'])}\n" f" 收藏: {format_count(video_info['favorite'])}\n" f" 转发: {format_count(video_info['share'])}\n" f" B站链接: {url}" ) image_message_segment = [ MessageSegment.text("B站封面:"), MessageSegment.image(video_info['cover_url']) ] up_info_segment = [ MessageSegment.text("UP主头像:"), MessageSegment.image(video_info['owner_avatar']) ] nodes = [ event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message), event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment), event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=up_info_segment), event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=video_message) ] logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}") # 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊 await event.bot.send_forwarded_messages(target=event, nodes=nodes)