feat(douyin): 添加多API并发解析抖音视频功能

重构抖音解析器,将原有单API解析拆分为独立方法,并新增mmp API作为备选方案
使用asyncio.as_completed并发请求多个API,取最快返回的有效结果
This commit is contained in:
2026-02-15 09:59:02 +08:00
parent aeb896e794
commit e61383398d

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re import re
import aiohttp import aiohttp
import asyncio
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from core.utils.logger import logger from core.utils.logger import logger
@@ -24,9 +25,9 @@ class DouyinParser(BaseParser):
# 消息去重缓存 # 消息去重缓存
self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10)
async def parse(self, url: str) -> Optional[Dict[str, Any]]: async def _parse_api_xhus(self, url: str) -> Optional[Dict[str, Any]]:
""" """
解析抖音视频信息 使用 xhus API 解析抖音视频
Args: Args:
url (str): 抖音视频URL url (str): 抖音视频URL
@@ -35,31 +36,29 @@ class DouyinParser(BaseParser):
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
""" """
try: try:
# 使用第三方API解析抖音视频
api_url = f"http://api.xhus.cn/api/douyin?url={url}" api_url = f"http://api.xhus.cn/api/douyin?url={url}"
session = self.get_session() session = self.get_session()
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response: async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status != 200: if response.status != 200:
logger.error(f"[{self.name}] API请求失败状态码: {response.status}") logger.error(f"[{self.name}] xhus API请求失败状态码: {response.status}")
return None return None
response_data = await response.json() response_data = await response.json()
if not isinstance(response_data, dict): if not isinstance(response_data, dict):
logger.error(f"[{self.name}] API返回格式错误: {response_data}") logger.error(f"[{self.name}] xhus API返回格式错误: {response_data}")
return None return None
if response_data.get("code") != 200: if response_data.get("code") != 200:
logger.error(f"[{self.name}] API返回错误: {response_data}") logger.error(f"[{self.name}] xhus API返回错误: {response_data}")
return None return None
data = response_data.get("data", {}) data = response_data.get("data", {})
if not data: if not data:
logger.error(f"[{self.name}] API返回数据为空") logger.error(f"[{self.name}] xhus API返回数据为空")
return None return None
# 转换API响应格式
return { return {
"type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image", "type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image",
"video_url": data.get("url", ""), "video_url": data.get("url", ""),
@@ -74,13 +73,92 @@ class DouyinParser(BaseParser):
"music": data.get("music", {}), "music": data.get("music", {}),
} }
except (aiohttp.ClientError, KeyError, AttributeError, ValueError) as e:
logger.error(f"[{self.name}] 解析抖音视频信息失败: {e}")
logger.debug(f"失败的URL: {url}")
except Exception as e: except Exception as e:
logger.error(f"[{self.name}] 解析抖音视频时发生未知错误: {e}") logger.error(f"[{self.name}] xhus API解析失败: {e}")
logger.debug(f"失败的URL: {url}") return None
async def _parse_api_mmp(self, url: str) -> Optional[Dict[str, Any]]:
"""
使用 mmp API 解析抖音视频
Args:
url (str): 抖音视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
try:
api_url = f"https://api.mmp.cc/api/Jiexi?url={url}"
session = self.get_session()
async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status != 200:
logger.error(f"[{self.name}] mmp API请求失败状态码: {response.status}")
return None
response_data = await response.json()
if not isinstance(response_data, dict):
logger.error(f"[{self.name}] mmp API返回格式错误: {response_data}")
return None
if response_data.get("code") != 200:
logger.error(f"[{self.name}] mmp API返回错误: {response_data}")
return None
data = response_data.get("data", {})
if not data:
logger.error(f"[{self.name}] mmp API返回数据为空")
return None
return {
"type": data.get("type", "video"),
"video_url": data.get("video_url", ""),
"video_url_HQ": data.get("video_url_HQ", ""),
"nickname": data.get("nickname", "未知作者"),
"desc": data.get("desc", "无描述"),
"aweme_id": data.get("aweme_id", ""),
"like": data.get("like", 0),
"cover": data.get("cover", ""),
"time": data.get("time", 0),
"author_avatar": data.get("author_avatar", ""),
"music": data.get("music", {}),
}
except Exception as e:
logger.error(f"[{self.name}] mmp API解析失败: {e}")
return None
async def parse(self, url: str) -> Optional[Dict[str, Any]]:
"""
解析抖音视频信息并发请求多个API取最快返回的结果
Args:
url (str): 抖音视频URL
Returns:
Optional[Dict[str, Any]]: 视频信息字典如果失败则返回None
"""
async def try_api(coro, api_name: str) -> tuple:
try:
result = await coro
return (result, api_name)
except Exception as e:
logger.error(f"[{self.name}] {api_name} API异常: {e}")
return (None, api_name)
tasks = [
try_api(self._parse_api_xhus(url), "xhus"),
try_api(self._parse_api_mmp(url), "mmp"),
]
for coro in asyncio.as_completed(tasks):
result, api_name = await coro
if result:
logger.info(f"[{self.name}] 使用 {api_name} API 成功解析")
return result
logger.error(f"[{self.name}] 所有API解析均失败")
return None return None
async def get_real_url(self, short_url: str) -> Optional[str]: async def get_real_url(self, short_url: str) -> Optional[str]: