Dev (#24)
* feat: 整合开发历史 * codepy安全性升级 * 优化一些东西 * 再次优化 * 更新一下 requirements.txt * CQ码支持以及视频解析 * hotfix
This commit is contained in:
@@ -112,40 +112,62 @@ def get_direct_video_url(video_url: str) -> Optional[str]:
|
|||||||
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
|
logger.error(f"[bili_parser] 调用第三方API解析视频失败: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
BILI_URL_PATTERN = re.compile(r"https?://(?:www\.)?(bilibili\.com/video/[a-zA-Z0-9_]+|b23\.tv/[a-zA-Z0-9]+)")
|
||||||
|
|
||||||
|
|
||||||
@matcher.on_message()
|
@matcher.on_message()
|
||||||
async def handle_bili_share(event: MessageEvent):
|
async def handle_bili_share(event: MessageEvent):
|
||||||
# 遍历消息段,寻找JSON CQ码
|
"""
|
||||||
|
处理消息,检测B站分享链接(JSON卡片或文本链接)并进行解析。
|
||||||
|
:param event: 消息事件对象
|
||||||
|
"""
|
||||||
|
url_to_process = None
|
||||||
|
|
||||||
|
# 1. 优先解析JSON卡片中的短链接
|
||||||
for segment in event.message:
|
for segment in event.message:
|
||||||
if segment.type == "json":
|
if segment.type == "json":
|
||||||
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
|
logger.info(f"[bili_parser] 检测到JSON CQ码: {segment.data}")
|
||||||
try:
|
try:
|
||||||
# 直接从segment的data中获取json字符串
|
|
||||||
json_data = json.loads(segment.data.get("data", "{}"))
|
json_data = json.loads(segment.data.get("data", "{}"))
|
||||||
|
|
||||||
# 提取B站短链接
|
|
||||||
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
|
short_url = json_data.get("meta", {}).get("detail_1", {}).get("qqdocurl")
|
||||||
|
|
||||||
if not short_url or "b23.tv" not in short_url:
|
if short_url and "b23.tv" in short_url:
|
||||||
continue # 如果不是B站链接,继续检查下一个segment
|
url_to_process = short_url.split('?')[0]
|
||||||
|
logger.success(f"[bili_parser] 成功从JSON卡片中提取到B站短链接: {url_to_process}")
|
||||||
short_url = short_url.split('?')[0]
|
break # 找到后立即跳出循环
|
||||||
logger.success(f"[bili_parser] 成功提取到B站短链接: {short_url}")
|
|
||||||
|
|
||||||
# 找到了有效的B站链接,处理并跳出循环
|
|
||||||
await process_bili_link(event, short_url)
|
|
||||||
break
|
|
||||||
|
|
||||||
except (json.JSONDecodeError, KeyError) as e:
|
except (json.JSONDecodeError, KeyError) as e:
|
||||||
logger.error(f"[bili_parser] 解析JSON失败: {e}")
|
logger.error(f"[bili_parser] 解析JSON失败: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# 2. 如果未在JSON卡片中找到链接,则在文本消息中查找
|
||||||
|
if not url_to_process:
|
||||||
|
for segment in event.message:
|
||||||
|
if segment.type == "text":
|
||||||
|
text_content = segment.data.get("text", "")
|
||||||
|
match = BILI_URL_PATTERN.search(text_content)
|
||||||
|
if match:
|
||||||
|
url_to_process = match.group(0)
|
||||||
|
logger.success(f"[bili_parser] 成功从文本中提取到B站链接: {url_to_process}")
|
||||||
|
break # 找到后立即跳出循环
|
||||||
|
|
||||||
async def process_bili_link(event: MessageEvent, short_url: str):
|
# 3. 如果找到了任何类型的B站链接,则进行处理
|
||||||
"""处理B站链接,获取信息并回复"""
|
if url_to_process:
|
||||||
real_url = get_real_url(short_url)
|
await process_bili_link(event, url_to_process)
|
||||||
if not real_url:
|
|
||||||
logger.error(f"[bili_parser] 无法从 {short_url} 获取真实URL。")
|
async def process_bili_link(event: MessageEvent, url: str):
|
||||||
await event.reply("无法解析B站短链接。")
|
"""
|
||||||
return
|
处理B站链接(长链接或短链接),获取信息并回复
|
||||||
|
:param event: 消息事件对象
|
||||||
|
:param url: 待处理的B站链接
|
||||||
|
"""
|
||||||
|
if "b23.tv" in url:
|
||||||
|
real_url = get_real_url(url)
|
||||||
|
if not real_url:
|
||||||
|
logger.error(f"[bili_parser] 无法从 {url} 获取真实URL。")
|
||||||
|
await event.reply("无法解析B站短链接。")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
real_url = url.split('?')[0]
|
||||||
|
|
||||||
video_info = parse_video_info(real_url)
|
video_info = parse_video_info(real_url)
|
||||||
if not video_info:
|
if not video_info:
|
||||||
@@ -155,7 +177,7 @@ async def process_bili_link(event: MessageEvent, short_url: str):
|
|||||||
|
|
||||||
# 检查视频时长
|
# 检查视频时长
|
||||||
if video_info['duration'] > 300: # 5分钟 = 300秒
|
if video_info['duration'] > 300: # 5分钟 = 300秒
|
||||||
video_message = "视频太长了。。。"
|
video_message = "视频时长超过5分钟,不进行解析。"
|
||||||
else:
|
else:
|
||||||
direct_url = get_direct_video_url(real_url)
|
direct_url = get_direct_video_url(real_url)
|
||||||
if direct_url:
|
if direct_url:
|
||||||
@@ -179,7 +201,7 @@ async def process_bili_link(event: MessageEvent, short_url: str):
|
|||||||
f" 投币: {format_count(video_info['coin'])}\n"
|
f" 投币: {format_count(video_info['coin'])}\n"
|
||||||
f" 收藏: {format_count(video_info['favorite'])}\n"
|
f" 收藏: {format_count(video_info['favorite'])}\n"
|
||||||
f" 转发: {format_count(video_info['share'])}\n"
|
f" 转发: {format_count(video_info['share'])}\n"
|
||||||
f" B站链接: {short_url}"
|
f" B站链接: {url}"
|
||||||
)
|
)
|
||||||
|
|
||||||
image_message_segment = [
|
image_message_segment = [
|
||||||
|
|||||||
Reference in New Issue
Block a user