Merge pull request #63 from Fairy-Oracle-Sanctuary/dev

feat(reverse_ws_manager): 增加基于消息内容的防重复处理机制
This commit is contained in:
镀铬酸钾
2026-02-28 21:42:55 +08:00
committed by GitHub

View File

@@ -52,6 +52,10 @@ class ReverseWSManager:
self._message_lock_times: Dict[str, datetime] = {} # 消息锁创建时间
self._lock_ttl = 300 # 锁保留时间(秒)
# 基于消息内容的防重复
self._processed_messages: Dict[str, datetime] = {} # 已处理的消息内容和时间
self._message_content_ttl = 5 # 消息内容保留时间(秒)
# 启动清理任务
self._cleanup_task = None
@@ -176,6 +180,14 @@ class ReverseWSManager:
if lock_key in self._message_lock_times:
del self._message_lock_times[lock_key]
# 清理过期的消息内容
expired_messages = [
msg_key for msg_key, timestamp in self._processed_messages.items()
if (current_time - timestamp).total_seconds() > self._message_content_ttl
]
for msg_key in expired_messages:
del self._processed_messages[msg_key]
except asyncio.CancelledError:
break
except Exception as e:
@@ -248,10 +260,16 @@ class ReverseWSManager:
# 使用锁防止同一消息被多次处理
message_key = self._get_message_key(event_data)
async with self._get_message_lock(message_key):
# 再次检查是否重复(防止并发问题
# 检查是否重复(基于事件ID
if self._is_duplicate_event(event_data):
self.logger.debug(f"并发检测到重复消息,已忽略: {message_key}")
self.logger.debug(f"并发检测到重复消息事件ID,已忽略: {message_key}")
return
# 检查是否重复(基于消息内容)
if self._is_duplicate_message(event_data):
self.logger.debug(f"并发检测到重复消息(内容),已忽略: {message_key}")
return
self._mark_event_processed(event_data)
# 更新客户端负载
@@ -375,6 +393,28 @@ class ReverseWSManager:
event_key = f"{event_data.get('post_type')}:{event_id}"
return event_key in self._processed_events
def _is_duplicate_message(self, event_data: Dict[str, Any]) -> bool:
"""
检查是否为重复消息(基于消息内容)。
Args:
event_data: 事件数据
Returns:
是否为重复消息
"""
if event_data.get('post_type') != 'message':
return False
# 生成消息内容标识
message_id = event_data.get('message_id')
user_id = event_data.get('user_id')
raw_message = event_data.get('raw_message', '')
# 使用消息内容和用户ID作为标识
content_key = f"content:{raw_message}:{user_id}"
return content_key in self._processed_messages
def _mark_event_processed(self, event_data: Dict[str, Any]) -> None:
"""
标记事件已处理。
@@ -389,6 +429,13 @@ class ReverseWSManager:
event_key = f"{event_data.get('post_type')}:{event_id}"
self._processed_events[event_key] = datetime.now()
# 同时标记消息内容已处理
if event_data.get('post_type') == 'message':
raw_message = event_data.get('raw_message', '')
user_id = event_data.get('user_id')
content_key = f"content:{raw_message}:{user_id}"
self._processed_messages[content_key] = datetime.now()
def _get_message_key(self, event_data: Dict[str, Any]) -> str:
"""
获取消息唯一标识。