Files
NeoBot/plugins/bili_parser.py
2026-01-05 21:36:03 +08:00

205 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import re
import json
import requests
from bs4 import BeautifulSoup
from typing import Optional, Dict, Any
from core.logger import logger
from core.command_manager import matcher
from models import MessageEvent, MessageSegment
__plugin_meta__ = {
"name": "bili_parser",
"description": "自动解析B站分享卡片提取视频封面和播放量等信息。",
"usage": "自动触发当检测到B站小程序分享卡片时自动发送视频信息。",
}
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def format_count(num: int) -> str:
if not isinstance(num, int):
return str(num)
if num < 10000:
return str(num)
return f"{num / 10000:.1f}"
def format_duration(seconds: int) -> str:
"""将秒数格式化为 MM:SS 的形式"""
if not isinstance(seconds, int) or seconds < 0:
return "滚木"
minutes, seconds = divmod(seconds, 60)
return f"{minutes:02d}:{seconds:02d}"
def get_real_url(short_url: str) -> Optional[str]:
try:
response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5)
if response.status_code == 302:
return response.headers.get('Location')
except requests.RequestException as e:
print(f"获取真实URL失败: {e}")
return None
def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
try:
response = requests.get(video_url, headers=HEADERS, timeout=5)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
script_tag = soup.find('script', text=re.compile('window.__INITIAL_STATE__'))
if not script_tag:
return None
json_str = re.search(r'window\.__INITIAL_STATE__\s*=\s*(\{.*?\});', script_tag.string).group(1)
data = json.loads(json_str)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e:
print(f"解析视频信息失败: {e}")
return None
def get_direct_video_url(video_url: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
:param video_url: B站视频的完整URL
:return: 视频直链URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
try:
response = requests.get(api_url, headers=HEADERS, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (requests.RequestException, json.JSONDecodeError, KeyError, IndexError) as e:
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
return None
@matcher.on_message()
async def handle_bili_share(event: MessageEvent):
# 遍历消息段寻找JSON CQ码
for segment in event.message:
if segment.type == "json":
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
try:
# 直接从segment的data中获取json字符串
json_data = json.loads(segment.data.get("data", "{}"))
# 提取B站短链接
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if not short_url or "b23.tv" not in short_url:
continue # 如果不是B站链接继续检查下一个segment
short_url = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}")
# 找到了有效的B站链接处理并跳出循环
await process_bili_link(event, short_url)
break
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
continue
async def process_bili_link(event: MessageEvent, short_url: str):
"""处理B站链接获取信息并回复"""
real_url = get_real_url(short_url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。")
await event.reply("无法解析B站短链接。")
return
video_info = parse_video_info(real_url)
if not video_info:
logger.error(f"[bili_parser] 无法从 {real_url} 解析视频信息。")
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
# 检查视频时长
if video_info['duration'] > 300: # 5分钟 = 300秒
video_message = "视频太长了。。。"
else:
direct_url = get_direct_video_url(real_url)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
video_message = "视频解析失败,无法获取直链。"
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" UP主: {video_info['owner_name']}\n"
f" 粉丝: {format_count(video_info['followers'])}\n"
f"--------------------\n"
f" 标题: {video_info['title']}\n"
f" BV号: {video_info['bvid']}\n"
f" 时长: {format_duration(video_info['duration'])}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {format_count(video_info['play'])}\n"
f" 点赞: {format_count(video_info['like'])}\n"
f" 投币: {format_count(video_info['coin'])}\n"
f" 收藏: {format_count(video_info['favorite'])}\n"
f" 转发: {format_count(video_info['share'])}\n"
f" B站链接: {short_url}"
)
image_message_segment = [
MessageSegment.text("B站封面"),
MessageSegment.image(video_info['cover_url'])
]
up_info_segment = [
MessageSegment.text("UP主头像"),
MessageSegment.image(video_info['owner_avatar'])
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=video_message)
]
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
await event.bot.send_forwarded_messages(target=event, nodes=nodes)