* feat: 整合开发历史

* codepy安全性升级

* 优化一些东西

* 再次优化

* 更新一下 requirements.txt

* CQ码支持以及视频解析
This commit is contained in:
镀铬酸钾
2026-01-05 21:41:48 +08:00
committed by GitHub
parent 4a18909c4f
commit dfe70d27fd
2 changed files with 362 additions and 32 deletions

View File

@@ -1,10 +1,9 @@
# -*- coding: utf-8 -*-
import re
import json
import html
import requests
from bs4 import BeautifulSoup
from typing import Optional, Tuple, Dict, Any
from typing import Optional, Dict, Any
from core.logger import logger
from core.command_manager import matcher
@@ -29,6 +28,14 @@ def format_count(num: int) -> str:
return f"{num / 10000:.1f}"
def format_duration(seconds: int) -> str:
"""将秒数格式化为 MM:SS 的形式"""
if not isinstance(seconds, int) or seconds < 0:
return "滚木"
minutes, seconds = divmod(seconds, 60)
return f"{minutes:02d}:{seconds:02d}"
def get_real_url(short_url: str) -> Optional[str]:
try:
response = requests.head(short_url, headers=HEADERS, allow_redirects=False, timeout=5)
@@ -52,23 +59,35 @@ def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
data = json.loads(json_str)
video_data = data.get('videoData', {})
up_data = data.get('upData', {})
stat = video_data.get('stat', {})
owner = video_data.get('owner', {})
cover_url = video_data.get('pic', '')
if cover_url:
cover_url = cover_url.split('@')[0]
if cover_url.startswith('//'):
cover_url = 'https:' + cover_url
owner_avatar = owner.get('face', '')
if owner_avatar:
if owner_avatar.startswith('//'):
owner_avatar = 'https:' + owner_avatar
owner_avatar = owner_avatar.split('@')[0]
return {
"title": video_data.get('title', '未知标题'),
"bvid": video_data.get('bvid', '未知BV号'),
"duration": video_data.get('duration', 0),
"cover_url": cover_url,
"play": stat.get('view', 0),
"like": stat.get('like', 0),
"coin": stat.get('coin', 0),
"favorite": stat.get('favorite', 0),
"share": stat.get('share', 0),
"owner_name": owner.get('name', '未知UP主'),
"owner_avatar": owner_avatar,
"followers": up_data.get('fans', 0),
}
except (requests.RequestException, KeyError, AttributeError, json.JSONDecodeError) as e:
@@ -76,33 +95,52 @@ def parse_video_info(video_url: str) -> Optional[Dict[str, Any]]:
return None
def get_direct_video_url(video_url: str) -> Optional[str]:
"""
调用第三方API解析B站视频直链
:param video_url: B站视频的完整URL
:return: 视频直链URL如果失败则返回None
"""
api_url = f"https://api.mir6.com/api/bzjiexi?url={video_url}&type=json"
try:
response = requests.get(api_url, headers=HEADERS, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("code") == 200 and data.get("data"):
return data["data"][0].get("video_url")
except (requests.RequestException, json.JSONDecodeError, KeyError, IndexError) as e:
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
return None
@matcher.on_message()
async def handle_bili_share(event: MessageEvent):
if not event.raw_message.startswith('[CQ:json,data='):
return
# 遍历消息段寻找JSON CQ码
for segment in event.message:
if segment.type == "json":
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
try:
# 直接从segment的data中获取json字符串
json_data = json.loads(segment.data.get("data", "{}"))
# 提取B站短链接
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if not short_url or "b23.tv" not in short_url:
continue # 如果不是B站链接继续检查下一个segment
short_url = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}")
# 找到了有效的B站链接处理并跳出循环
await process_bili_link(event, short_url)
break
logger.info(f"[bili_parser] 检测到JSON CQ码: {event.raw_message}")
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
continue
try:
json_str_raw = event.raw_message.strip('[CQ:json,data=]').rstrip(']')
json_str_decoded = html.unescape(json_str_raw)
data = json.loads(json_str_decoded)
short_url = data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
if not short_url or "b23.tv" not in short_url:
logger.warning("[bili_parser] JSON中未找到有效的b23.tv链接。")
return
short_url = short_url.split('?')[0]
logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}")
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"[bili_parser] 解析JSON失败: {e}")
return
async def process_bili_link(event: MessageEvent, short_url: str):
"""处理B站链接获取信息并回复"""
real_url = get_real_url(short_url)
if not real_url:
logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。")
@@ -115,11 +153,25 @@ async def handle_bili_share(event: MessageEvent):
await event.reply("无法获取视频信息可能是B站接口变动或视频不存在。")
return
# 检查视频时长
if video_info['duration'] > 300: # 5分钟 = 300秒
video_message = "视频太长了。。。"
else:
direct_url = get_direct_video_url(real_url)
if direct_url:
video_message = MessageSegment.video(direct_url)
else:
video_message = "视频解析失败,无法获取直链。"
text_message = (
f"BiliBili 视频解析\n"
f"--------------------\n"
f" UP主: {video_info['owner_name']}\n"
f" 粉丝: {format_count(video_info['followers'])}\n"
f"--------------------\n"
f" 标题: {video_info['title']}\n"
f" BV号: {video_info['bvid']}\n"
f" 时长: {format_duration(video_info['duration'])}\n"
f"--------------------\n"
f" 数据:\n"
f" 播放: {format_count(video_info['play'])}\n"
@@ -130,12 +182,23 @@ async def handle_bili_share(event: MessageEvent):
f" B站链接: {short_url}"
)
image_message_segment = [MessageSegment.text("B站封面"),MessageSegment.image(video_info['cover_url'])]
image_message_segment = [
MessageSegment.text("B站封面"),
MessageSegment.image(video_info['cover_url'])
]
up_info_segment = [
MessageSegment.text("UP主头像"),
MessageSegment.image(video_info['owner_avatar'])
]
nodes = [
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=text_message),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment)
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=image_message_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=up_info_segment),
event.bot.build_forward_node(user_id=event.self_id, nickname="B站视频解析", message=video_message)
]
logger.success(f"[bili_parser] 成功解析视频信息并准备以聊天记录形式回复: {video_info['title']}")
await event.bot.send_group_forward_msg(group_id=event.group_id, messages=nodes)
# 使用更通用的 send_forwarded_messages 方法,自动判断私聊或群聊
await event.bot.send_forwarded_messages(target=event, nodes=nodes)