142 lines
5.1 KiB
Python
142 lines
5.1 KiB
Python
# -*- coding: utf-8 -*-
|
||
import re
|
||
import json
|
||
import html
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from typing import Optional, Tuple, Dict, Any
|
||
|
||
from core.logger import logger
|
||
from core.command_manager import matcher
|
||
from models import MessageEvent, MessageSegment
|
||
|
||
__plugin_meta__ = {
|
||
"name": "bili_parser",
|
||
"description": "自动解析B站分享卡片,提取视频封面和播放量等信息。",
|
||
"usage": "(自动触发)当检测到B站小程序分享卡片时,自动发送视频信息。",
|
||
}
|
||
|
||
HEADERS = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
|
||
def format_count(num: int) -> str:
|
||
if not isinstance(num, int):
|
||
return str(num)
|
||
if num < 10000:
|
||
return str(num)
|
||
return f"{num / 10000:.1f}万"
|
||
|
||
|
||
def get_real_url(short_url: str) -> Optional[str]:
|
||
try:
|
||
response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5)
|
||
if response.status_code == 302:
|
||
return response.headers.get('Location')
|
||
except requests.RequestException as e:
|
||
print(f"获取真实URL失败: {e}")
|
||
return None
|
||
|
||
def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
|
||
try:
|
||
response = requests.get(video_url, headers=HEADERS, timeout=5)
|
||
response.raise_for_status()
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
||
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
|
||
if not script_tag:
|
||
return None
|
||
|
||
json_str = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string).group(1)
|
||
data = json.loads(json_str)
|
||
|
||
video_data = data.get('videoData', {})
|
||
stat = video_data.get('stat', {})
|
||
|
||
cover_url = video_data.get('pic', '')
|
||
if cover_url:
|
||
cover_url = cover_url.split('@')[0]
|
||
if cover_url.startswith('//'):
|
||
cover_url = 'https:' + cover_url
|
||
|
||
return {
|
||
"title": video_data.get('title', '未知标题'),
|
||
"bvid": video_data.get('bvid', '未知BV号'),
|
||
"cover_url": cover_url,
|
||
"play": stat.get('view', 0),
|
||
"like": stat.get('like', 0),
|
||
"coin": stat.get('coin', 0),
|
||
"favorite": stat.get('favorite', 0),
|
||
"share": stat.get('share', 0),
|
||
}
|
||
|
||
except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e:
|
||
print(f"解析视频信息失败: {e}")
|
||
|
||
return None
|
||
|
||
@matcher.on_message()
|
||
async def handle_bili_share(event: MessageEvent):
|
||
if not event.raw_message.startswith('[CQ:json,data='):
|
||
return
|
||
|
||
logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}")
|
||
|
||
try:
|
||
json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']')
|
||
|
||
json_str_decoded = html.unescape(json_str_raw)
|
||
|
||
data = json.loads(json_str_decoded)
|
||
|
||
short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
|
||
|
||
if not short_url or "b23.tv" not in short_url:
|
||
logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。")
|
||
return
|
||
|
||
short_url = short_url.split('?')[0]
|
||
logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}")
|
||
|
||
except (json.JSONDecodeError, KeyError) as e:
|
||
logger.error(f"[bili_parser] 解析JSON失败: {e}")
|
||
return
|
||
|
||
real_url = get_real_url(short_url)
|
||
if not real_url:
|
||
logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。")
|
||
await event.reply("无法解析B站短链接。")
|
||
return
|
||
|
||
video_info = parse_video_info(real_url)
|
||
if not video_info:
|
||
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
|
||
await event.reply("无法获取视频信息,可能是B站接口变动或视频不存在。")
|
||
return
|
||
|
||
text_message = (
|
||
f"BiliBili 视频解析\n"
|
||
f"--------------------\n"
|
||
f" 标题: {video_info['title']}\n"
|
||
f" BV号: {video_info['bvid']}\n"
|
||
f"--------------------\n"
|
||
f" 数据:\n"
|
||
f" 播放: {format_count(video_info['play'])}\n"
|
||
f" 点赞: {format_count(video_info['like'])}\n"
|
||
f" 投币: {format_count(video_info['coin'])}\n"
|
||
f" 收藏: {format_count(video_info['favorite'])}\n"
|
||
f" 转发: {format_count(video_info['share'])}\n"
|
||
f" B站链接: {short_url}"
|
||
)
|
||
|
||
image_message_segment = [MessageSegment.text("B站封面:"),MessageSegment.image(video_info['cover_url'])]
|
||
|
||
nodes = [
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message),
|
||
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment)
|
||
]
|
||
|
||
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
|
||
await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes)
|