diff --git a/core/managers/reverse_ws_manager.py b/core/managers/reverse_ws_manager.py index a0d270c..b64b39d 100644 --- a/core/managers/reverse_ws_manager.py +++ b/core/managers/reverse_ws_manager.py @@ -52,6 +52,10 @@ class ReverseWSManager: self._message_lock_times: Dict[str, datetime] = {} # 消息锁创建时间 self._lock_ttl = 300 # 锁保留时间(秒) + # 基于消息内容的防重复 + self._processed_messages: Dict[str, datetime] = {} # 已处理的消息内容和时间 + self._message_content_ttl = 5 # 消息内容保留时间(秒) + # 启动清理任务 self._cleanup_task = None @@ -175,6 +179,14 @@ class ReverseWSManager: del self._message_locks[lock_key] if lock_key in self._message_lock_times: del self._message_lock_times[lock_key] + + # 清理过期的消息内容 + expired_messages = [ + msg_key for msg_key, timestamp in self._processed_messages.items() + if (current_time - timestamp).total_seconds() > self._message_content_ttl + ] + for msg_key in expired_messages: + del self._processed_messages[msg_key] except asyncio.CancelledError: break @@ -248,10 +260,16 @@ class ReverseWSManager: # 使用锁防止同一消息被多次处理 message_key = self._get_message_key(event_data) async with self._get_message_lock(message_key): - # 再次检查是否重复(防止并发问题) + # 检查是否重复(基于事件ID) if self._is_duplicate_event(event_data): - self.logger.debug(f"并发检测到重复消息,已忽略: {message_key}") + self.logger.debug(f"并发检测到重复消息(事件ID),已忽略: {message_key}") return + + # 检查是否重复(基于消息内容) + if self._is_duplicate_message(event_data): + self.logger.debug(f"并发检测到重复消息(内容),已忽略: {message_key}") + return + self._mark_event_processed(event_data) # 更新客户端负载 @@ -375,6 +393,28 @@ class ReverseWSManager: event_key = f"{event_data.get('post_type')}:{event_id}" return event_key in self._processed_events + def _is_duplicate_message(self, event_data: Dict[str, Any]) -> bool: + """ + 检查是否为重复消息(基于消息内容)。 + + Args: + event_data: 事件数据 + + Returns: + 是否为重复消息 + """ + if event_data.get('post_type') != 'message': + return False + + # 生成消息内容标识 + message_id = event_data.get('message_id') + user_id = event_data.get('user_id') + raw_message = event_data.get('raw_message', '') + + # 使用消息内容和用户ID作为标识 + content_key = f"content:{raw_message}:{user_id}" + return content_key in self._processed_messages + def _mark_event_processed(self, event_data: Dict[str, Any]) -> None: """ 标记事件已处理。 @@ -389,6 +429,13 @@ class ReverseWSManager: event_key = f"{event_data.get('post_type')}:{event_id}" self._processed_events[event_key] = datetime.now() + # 同时标记消息内容已处理 + if event_data.get('post_type') == 'message': + raw_message = event_data.get('raw_message', '') + user_id = event_data.get('user_id') + content_key = f"content:{raw_message}:{user_id}" + self._processed_messages[content_key] = datetime.now() + def _get_message_key(self, event_data: Dict[str, Any]) -> str: """ 获取消息唯一标识。