Files
NeoBot/plugins/web_parser/parsers/douyin.py
K2Cr2O1 e61383398d feat(douyin): 添加多API并发解析抖音视频功能
重构抖音解析器,将原有单API解析拆分为独立方法,并新增mmp API作为备选方案
使用asyncio.as_completed并发请求多个API,取最快返回的有效结果
2026-02-15 09:59:02 +08:00

332 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import re
import aiohttp
import asyncio
from typing import Optional, Dict, Any, List
from core.utils.logger import logger
from models import MessageEvent, MessageSegment
from ..base import BaseParser
from ..utils import extract_original_text
from cachetools import TTLCache
class DouyinParser(BaseParser):
"""
抖音视频解析器
"""
def __init__(self):
super().__init__()
self.name = "抖音解析器"
self.url_pattern = re.compile(r"https?://v\.douyin\.com/[a-zA-Z0-9_-]+/?", re.IGNORECASE)
self.short_pattern = re.compile(r"(?:https?://)?v\.douyin\.com/[a-zA-Z0-9_-]+/?", re.IGNORECASE)
self.nickname = "抖音视频解析"
# 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
async def _parse_api_xhus(self, url: str) -> Optional[Dict[str, Any]]:
"""
使用 xhus API 解析抖音视频
Args:
url (str): 抖音视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
try:
api_url = f"http://api.xhus.cn/api/douyin?url={url}"
session = self.get_session()
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status != 200:
logger.error(f"[{self.name}] xhus API请求失败状态码: {response.status}")
return None
response_data = await response.json()
if not isinstance(response_data, dict):
logger.error(f"[{self.name}] xhus API返回格式错误: {response_data}")
return None
if response_data.get("code") != 200:
logger.error(f"[{self.name}] xhus API返回错误: {response_data}")
return None
data = response_data.get("data", {})
if not data:
logger.error(f"[{self.name}] xhus API返回数据为空")
return None
return {
"type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image",
"video_url": data.get("url", ""),
"video_url_HQ": data.get("url", ""),
"nickname": data.get("author", "未知作者"),
"desc": data.get("title", "无描述"),
"aweme_id": data.get("uid", ""),
"like": data.get("like", 0),
"cover": data.get("cover", ""),
"time": data.get("time", 0),
"author_avatar": data.get("avatar", ""),
"music": data.get("music", {}),
}
except Exception as e:
logger.error(f"[{self.name}] xhus API解析失败: {e}")
return None
async def _parse_api_mmp(self, url: str) -> Optional[Dict[str, Any]]:
"""
使用 mmp API 解析抖音视频
Args:
url (str): 抖音视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
try:
api_url = f"https://api.mmp.cc/api/Jiexi?url={url}"
session = self.get_session()
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status != 200:
logger.error(f"[{self.name}] mmp API请求失败状态码: {response.status}")
return None
response_data = await response.json()
if not isinstance(response_data, dict):
logger.error(f"[{self.name}] mmp API返回格式错误: {response_data}")
return None
if response_data.get("code") != 200:
logger.error(f"[{self.name}] mmp API返回错误: {response_data}")
return None
data = response_data.get("data", {})
if not data:
logger.error(f"[{self.name}] mmp API返回数据为空")
return None
return {
"type": data.get("type", "video"),
"video_url": data.get("video_url", ""),
"video_url_HQ": data.get("video_url_HQ", ""),
"nickname": data.get("nickname", "未知作者"),
"desc": data.get("desc", "无描述"),
"aweme_id": data.get("aweme_id", ""),
"like": data.get("like", 0),
"cover": data.get("cover", ""),
"time": data.get("time", 0),
"author_avatar": data.get("author_avatar", ""),
"music": data.get("music", {}),
}
except Exception as e:
logger.error(f"[{self.name}] mmp API解析失败: {e}")
return None
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析抖音视频信息并发请求多个API取最快返回的结果
Args:
url (str): 抖音视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
async def try_api(coro, api_name: str) -> tuple:
try:
result = await coro
return (result, api_name)
except Exception as e:
logger.error(f"[{self.name}] {api_name} API异常: {e}")
return (None, api_name)
tasks = [
try_api(self._parse_api_xhus(url), "xhus"),
try_api(self._parse_api_mmp(url), "mmp"),
]
for coro in asyncio.as_completed(tasks):
result, api_name = await coro
if result:
logger.info(f"[{self.name}] 使用 {api_name} API 成功解析")
return result
logger.error(f"[{self.name}] 所有API解析均失败")
return None
async def get_real_url(self, short_url: str) -> Optional[str]:
"""
获取抖音短链接的真实URL
Args:
short_url (str): 抖音短链接
Returns:
Optional[str]: 真实URL如果失败则返回None
"""
try:
session = self.get_session()
async with session.get(short_url, allow_redirects=True, timeout=aiohttp.ClientTimeout(total=10)) as response:
redirected_url = str(response.url)
# 检查重定向后的URL是否是有效的视频或图文页
if 'douyin.com/video/' in redirected_url or 'douyin.com/note/' in redirected_url:
logger.info(f"[{self.name}] 成功获取真实URL: {redirected_url}")
return redirected_url
else:
logger.warning(f"[{self.name}] 短链接 {short_url} 重定向到了非预期的页面: {redirected_url}")
return None
except Exception as e:
logger.error(f"[{self.name}] 获取真实URL失败: {e}")
return None
async def format_response(self, event: MessageEvent, data: Dict[str, Any]) -> List[Any]:
"""
格式化抖音视频响应消息
Args:
event (MessageEvent): 消息事件对象
data (Dict[str, Any]): 视频信息
Returns:
List[Any]: 消息段列表
"""
# 构建回复消息,包含原分享中的文本内容(如果有)
original_text = extract_original_text(event.message, self.url_pattern)
# 构建回复消息
text_parts = ["抖音视频解析"]
text_parts.append("--------------------")
if original_text:
text_parts.append(f" 分享内容: {original_text}")
text_parts.append("--------------------")
text_parts.append(f" 作者: {data['nickname']}")
text_parts.append(f" 抖音号: {data['aweme_id']}")
text_parts.append(f" 标题: {data['desc']}")
text_parts.append(f" 点赞: {self.format_count(data['like'])}")
text_parts.append(f" 类型: {data['type']}")
# 如果是音乐,添加音乐信息
if data.get('music'):
music_info = data['music']
text_parts.append("--------------------")
text_parts.append(" 背景音乐:")
text_parts.append(f" 标题: {music_info.get('title', '')}")
text_parts.append(f" 作者: {music_info.get('author', '')}")
text_parts.append("--------------------")
text_message = "\n".join(text_parts)
# 准备转发消息节点
nodes = []
# 添加文本信息节点
text_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=text_message
)
nodes.append(text_node)
# 添加封面图片节点(如果有)
if data.get('cover'):
try:
cover_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=[
MessageSegment.text("抖音视频封面:\n"),
MessageSegment.image(data['cover'])
]
)
nodes.append(cover_node)
except Exception as e:
logger.warning(f"[{self.name}] 无法添加封面图片: {e}")
# 添加作者头像节点(如果有)
if data.get('author_avatar'):
try:
avatar_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=[
MessageSegment.text("作者头像:\n"),
MessageSegment.image(data['author_avatar'])
]
)
nodes.append(avatar_node)
except Exception as e:
logger.warning(f"[{self.name}] 无法添加作者头像: {e}")
# 尝试添加视频直链(单独节点)
video_success = False
direct_message = None
try:
if data.get('video_url'):
video_url = data.get('video_url', '')
# 检查视频类型
if data.get('type') == 'video':
video_message = MessageSegment.video(video_url)
direct_message = video_message
video_type_text = "视频直链:"
else: # image类型
video_message = MessageSegment.image(video_url) # 单个图片
direct_message = video_message
video_type_text = "图集首图:"
# 构建视频/图片节点
video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message=[
MessageSegment.text(video_type_text + "\n"),
video_message
]
)
nodes.append(video_node)
video_success = True
except Exception as e:
logger.error(f"[{self.name}] 无法添加视频/图片: {e}")
# 如果无法添加视频,添加提示信息
if not video_success:
no_video_node = event.bot.build_forward_node(
user_id=event.self_id,
nickname=self.nickname,
message="视频解析成功,但无法获取直链或播放视频。"
)
nodes.append(no_video_node)
# 同时直接发送视频/图片(如果获取到直链)
if direct_message:
try:
await event.reply(direct_message)
except Exception as e:
logger.error(f"[{self.name}] 直接发送视频/图片失败: {e}")
return nodes
def should_handle_url(self, url: str) -> bool:
"""
判断是否应该处理该URL
Args:
url (str): URL
Returns:
bool: 是否应该处理
"""
# 检查是否是抖音相关域名
return ('douyin.com' in url or bool(self.url_pattern.search(url)) or bool(self.short_pattern.search(url)))