feat(reverse_ws_manager): 增加基于消息内容的防重复处理机制
This commit is contained in:
@@ -52,6 +52,10 @@ class ReverseWSManager:
|
||||
self._message_lock_times: Dict[str, datetime] = {} # 消息锁创建时间
|
||||
self._lock_ttl = 300 # 锁保留时间(秒)
|
||||
|
||||
# 基于消息内容的防重复
|
||||
self._processed_messages: Dict[str, datetime] = {} # 已处理的消息内容和时间
|
||||
self._message_content_ttl = 5 # 消息内容保留时间(秒)
|
||||
|
||||
# 启动清理任务
|
||||
self._cleanup_task = None
|
||||
|
||||
@@ -176,6 +180,14 @@ class ReverseWSManager:
|
||||
if lock_key in self._message_lock_times:
|
||||
del self._message_lock_times[lock_key]
|
||||
|
||||
# 清理过期的消息内容
|
||||
expired_messages = [
|
||||
msg_key for msg_key, timestamp in self._processed_messages.items()
|
||||
if (current_time - timestamp).total_seconds() > self._message_content_ttl
|
||||
]
|
||||
for msg_key in expired_messages:
|
||||
del self._processed_messages[msg_key]
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
@@ -248,10 +260,16 @@ class ReverseWSManager:
|
||||
# 使用锁防止同一消息被多次处理
|
||||
message_key = self._get_message_key(event_data)
|
||||
async with self._get_message_lock(message_key):
|
||||
# 再次检查是否重复(防止并发问题)
|
||||
# 检查是否重复(基于事件ID)
|
||||
if self._is_duplicate_event(event_data):
|
||||
self.logger.debug(f"并发检测到重复消息,已忽略: {message_key}")
|
||||
self.logger.debug(f"并发检测到重复消息(事件ID),已忽略: {message_key}")
|
||||
return
|
||||
|
||||
# 检查是否重复(基于消息内容)
|
||||
if self._is_duplicate_message(event_data):
|
||||
self.logger.debug(f"并发检测到重复消息(内容),已忽略: {message_key}")
|
||||
return
|
||||
|
||||
self._mark_event_processed(event_data)
|
||||
|
||||
# 更新客户端负载
|
||||
@@ -375,6 +393,28 @@ class ReverseWSManager:
|
||||
event_key = f"{event_data.get('post_type')}:{event_id}"
|
||||
return event_key in self._processed_events
|
||||
|
||||
def _is_duplicate_message(self, event_data: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
检查是否为重复消息(基于消息内容)。
|
||||
|
||||
Args:
|
||||
event_data: 事件数据
|
||||
|
||||
Returns:
|
||||
是否为重复消息
|
||||
"""
|
||||
if event_data.get('post_type') != 'message':
|
||||
return False
|
||||
|
||||
# 生成消息内容标识
|
||||
message_id = event_data.get('message_id')
|
||||
user_id = event_data.get('user_id')
|
||||
raw_message = event_data.get('raw_message', '')
|
||||
|
||||
# 使用消息内容和用户ID作为标识
|
||||
content_key = f"content:{raw_message}:{user_id}"
|
||||
return content_key in self._processed_messages
|
||||
|
||||
def _mark_event_processed(self, event_data: Dict[str, Any]) -> None:
|
||||
"""
|
||||
标记事件已处理。
|
||||
@@ -389,6 +429,13 @@ class ReverseWSManager:
|
||||
event_key = f"{event_data.get('post_type')}:{event_id}"
|
||||
self._processed_events[event_key] = datetime.now()
|
||||
|
||||
# 同时标记消息内容已处理
|
||||
if event_data.get('post_type') == 'message':
|
||||
raw_message = event_data.get('raw_message', '')
|
||||
user_id = event_data.get('user_id')
|
||||
content_key = f"content:{raw_message}:{user_id}"
|
||||
self._processed_messages[content_key] = datetime.now()
|
||||
|
||||
def _get_message_key(self, event_data: Dict[str, Any]) -> str:
|
||||
"""
|
||||
获取消息唯一标识。
|
||||
|
||||
Reference in New Issue
Block a user