# -*- coding: utf-8 -*- import re import json import html import requests from bs4 import BeautifulSoup from typing import Optional, Tuple, Dict, Any from core.logger import logger from core.command_manager import matcher from models import MessageEvent, MessageSegment __plugin_meta__ = { "name": "bili_parser", "description": "自动解析B站分享卡片,提取视频封面和播放量等信息。", "usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。", } HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } def format_count(num: int) -> str: if not isinstance(num, int): return str(num) if num < 10000: return str(num) return f"{num / 10000:.1f}万" def get_real_url(short_url: str) -> Optional[str]: try: response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5) if response.status_code == 302: return response.headers.get('Location') except requests.RequestException as e: print(f"获取真实URL失败: {e}") return None def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]: try: response = requests.get(video_url, headers=HEADERS, timeout=5) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__')) if not script_tag: return None json_str = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string).group(1) data = json.loads(json_str) video_data = data.get('videoData', {}) stat = video_data.get('stat', {}) cover_url = video_data.get('pic', '') if cover_url: cover_url = cover_url.split('@')[0] if cover_url.startswith('//'): cover_url = 'https:' + cover_url return { "title": video_data.get('title', '未知标题'), "bvid": video_data.get('bvid', '未知BV号'), "cover_url": cover_url, "play": stat.get('view', 0), "like": stat.get('like', 0), "coin": stat.get('coin', 0), "favorite": stat.get('favorite', 0), "share": stat.get('share', 0), } except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e: print(f"解析视频信息失败: {e}") return None @matcher.on_message() async def handle_bili_share(event: MessageEvent): if not event.raw_message.startswith('[CQ:json,data='): return logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}") try: json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']') json_str_decoded = html.unescape(json_str_raw) data = json.loads(json_str_decoded) short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl") if not short_url or "b23.tv" not in short_url: logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。") return short_url = short_url.split('?')[0] logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}") except (json.JSONDecodeError, KeyError) as e: logger.error(f"[bili_parser] 解析JSON失败: {e}") return real_url = get_real_url(short_url) if not real_url: logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。") await event.reply("无法解析B站短链接。") return video_info = parse_video_info(real_url) if not video_info: logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。") await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。") return text_message = ( f"BiliBili 视频解析\n" f"--------------------\n" f" 标题: {video_info['title']}\n" f" BV号: {video_info['bvid']}\n" f"--------------------\n" f" 数据:\n" f" 播放: {format_count(video_info['play'])}\n" f" 点赞: {format_count(video_info['like'])}\n" f" 投币: {format_count(video_info['coin'])}\n" f" 收藏: {format_count(video_info['favorite'])}\n" f" 转发: {format_count(video_info['share'])}\n" f" B站链接: {short_url}" ) image_message_segment = [MessageSegment.text("B站封面:"),MessageSegment.image(video_info['cover_url'])] nodes = [ event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message), event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment) ] logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}") await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes)