diff --git a/plugins/web_parser/parsers/douyin.py b/plugins/web_parser/parsers/douyin.py index 72cd12b..933a4db 100644 --- a/plugins/web_parser/parsers/douyin.py +++ b/plugins/web_parser/parsers/douyin.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import re import aiohttp +import asyncio from typing import Optional, Dict, Any, List from core.utils.logger import logger @@ -24,9 +25,9 @@ class DouyinParser(BaseParser): # 消息去重缓存 self.processed_messages: TTLCache[int, bool] = TTLCache(maxsize=100, ttl=10) - async def parse(self, url: str) -> Optional[Dict[str, Any]]: + async def _parse_api_xhus(self, url: str) -> Optional[Dict[str, Any]]: """ - 解析抖音视频信息 + 使用 xhus API 解析抖音视频 Args: url (str): 抖音视频URL @@ -35,31 +36,29 @@ class DouyinParser(BaseParser): Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None """ try: - # 使用第三方API解析抖音视频 api_url = f"http://api.xhus.cn/api/douyin?url={url}" session = self.get_session() async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response: if response.status != 200: - logger.error(f"[{self.name}] API请求失败,状态码: {response.status}") + logger.error(f"[{self.name}] xhus API请求失败,状态码: {response.status}") return None response_data = await response.json() if not isinstance(response_data, dict): - logger.error(f"[{self.name}] API返回格式错误: {response_data}") + logger.error(f"[{self.name}] xhus API返回格式错误: {response_data}") return None if response_data.get("code") != 200: - logger.error(f"[{self.name}] API返回错误: {response_data}") + logger.error(f"[{self.name}] xhus API返回错误: {response_data}") return None data = response_data.get("data", {}) if not data: - logger.error(f"[{self.name}] API返回数据为空") + logger.error(f"[{self.name}] xhus API返回数据为空") return None - # 转换API响应格式 return { "type": "video" if not data.get("images") or not isinstance(data.get("images"), list) else "image", "video_url": data.get("url", ""), @@ -74,13 +73,92 @@ class DouyinParser(BaseParser): "music": data.get("music", {}), } - except (aiohttp.ClientError, KeyError, AttributeError, ValueError) as e: - logger.error(f"[{self.name}] 解析抖音视频信息失败: {e}") - logger.debug(f"失败的URL: {url}") except Exception as e: - logger.error(f"[{self.name}] 解析抖音视频时发生未知错误: {e}") - logger.debug(f"失败的URL: {url}") + logger.error(f"[{self.name}] xhus API解析失败: {e}") + return None + + async def _parse_api_mmp(self, url: str) -> Optional[Dict[str, Any]]: + """ + 使用 mmp API 解析抖音视频 + Args: + url (str): 抖音视频URL + + Returns: + Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None + """ + try: + api_url = f"https://api.mmp.cc/api/Jiexi?url={url}" + + session = self.get_session() + async with session.get(api_url, headers=self.HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response: + if response.status != 200: + logger.error(f"[{self.name}] mmp API请求失败,状态码: {response.status}") + return None + + response_data = await response.json() + + if not isinstance(response_data, dict): + logger.error(f"[{self.name}] mmp API返回格式错误: {response_data}") + return None + + if response_data.get("code") != 200: + logger.error(f"[{self.name}] mmp API返回错误: {response_data}") + return None + + data = response_data.get("data", {}) + if not data: + logger.error(f"[{self.name}] mmp API返回数据为空") + return None + + return { + "type": data.get("type", "video"), + "video_url": data.get("video_url", ""), + "video_url_HQ": data.get("video_url_HQ", ""), + "nickname": data.get("nickname", "未知作者"), + "desc": data.get("desc", "无描述"), + "aweme_id": data.get("aweme_id", ""), + "like": data.get("like", 0), + "cover": data.get("cover", ""), + "time": data.get("time", 0), + "author_avatar": data.get("author_avatar", ""), + "music": data.get("music", {}), + } + + except Exception as e: + logger.error(f"[{self.name}] mmp API解析失败: {e}") + return None + + async def parse(self, url: str) -> Optional[Dict[str, Any]]: + """ + 解析抖音视频信息(并发请求多个API,取最快返回的结果) + + Args: + url (str): 抖音视频URL + + Returns: + Optional[Dict[str, Any]]: 视频信息字典,如果失败则返回None + """ + async def try_api(coro, api_name: str) -> tuple: + try: + result = await coro + return (result, api_name) + except Exception as e: + logger.error(f"[{self.name}] {api_name} API异常: {e}") + return (None, api_name) + + tasks = [ + try_api(self._parse_api_xhus(url), "xhus"), + try_api(self._parse_api_mmp(url), "mmp"), + ] + + for coro in asyncio.as_completed(tasks): + result, api_name = await coro + if result: + logger.info(f"[{self.name}] 使用 {api_name} API 成功解析") + return result + + logger.error(f"[{self.name}] 所有API解析均失败") return None async def get_real_url(self, short_url: str) -> Optional[str]: