修复status密集发送时,消息出错

This commit is contained in:
Xuwznln
2025-09-10 18:52:23 +08:00
parent 361eae2f6d
commit f68340d932

View File

@@ -142,9 +142,13 @@ class TaskScheduler:
# 执行相应的任务
should_continue = False
if item.task_type == "query_action_status":
should_continue = asyncio.run_coroutine_threadsafe(self._process_query_status_item(item), self.message_sender.event_loop).result()
should_continue = asyncio.run_coroutine_threadsafe(
self._process_query_status_item(item), self.message_sender.event_loop
).result()
elif item.task_type == "job_call_back_status":
should_continue = asyncio.run_coroutine_threadsafe(self._process_job_callback_item(item), self.message_sender.event_loop).result()
should_continue = asyncio.run_coroutine_threadsafe(
self._process_job_callback_item(item), self.message_sender.event_loop
).result()
else:
logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}")
continue
@@ -622,8 +626,9 @@ class WebSocketClient(BaseCommunicationClient):
self.message_queue = asyncio.Queue() if not self.is_disabled else None
self.reconnect_count = 0
# 消息发送锁(解决并发写入问题)- 延迟初始化
self.send_lock = None
# 消息发送队列和处理器
self.send_queue = None # 延迟初始化
self.send_queue_task = None # 发送队列处理任务
# 任务调度器
self.task_scheduler = None
@@ -709,8 +714,8 @@ class WebSocketClient(BaseCommunicationClient):
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
# 在正确的事件循环中创建
self.send_lock = asyncio.Lock()
# 在正确的事件循环中创建发送队列
self.send_queue = asyncio.Queue(maxsize=1000) # 限制队列大小防止内存溢出
# 运行连接逻辑
self.event_loop.run_until_complete(self._connection_handler())
@@ -746,8 +751,20 @@ class WebSocketClient(BaseCommunicationClient):
logger.info(f"[WebSocket] Connected to {self.websocket_url}")
# 启动发送队列处理器
self.send_queue_task = asyncio.create_task(self._send_queue_processor())
try:
# 处理消息
await self._message_handler()
finally:
# 停止发送队列处理器
if self.send_queue_task and not self.send_queue_task.done():
self.send_queue_task.cancel()
try:
await self.send_queue_task
except asyncio.CancelledError:
pass
except websockets.exceptions.ConnectionClosed:
logger.warning("[WebSocket] Connection closed")
@@ -779,6 +796,67 @@ class WebSocketClient(BaseCommunicationClient):
await self.websocket.close()
self.websocket = None
async def _send_queue_processor(self):
"""处理发送队列中的消息"""
logger.debug("[WebSocket] Send queue processor started")
if not self.send_queue:
logger.error("[WebSocket] Send queue not initialized")
return
try:
while self.connected and self.websocket:
try:
# 使用超时避免无限等待
message = await asyncio.wait_for(self.send_queue.get(), timeout=1.0)
# 批量处理:收集短时间内的多个消息
messages_to_send = [message]
batch_size = 0
max_batch_size = 10 # 最大批处理数量
# 尝试获取更多消息(非阻塞)
while batch_size < max_batch_size and not self.send_queue.empty():
try:
additional_msg = self.send_queue.get_nowait()
messages_to_send.append(additional_msg)
batch_size += 1
except asyncio.QueueEmpty:
break
# 发送消息
for msg in messages_to_send:
if self.websocket and self.connected:
try:
message_str = json.dumps(msg, ensure_ascii=False)
await self.websocket.send(message_str)
logger.trace( # type: ignore
f"[WebSocket] Message sent: {msg.get('action', 'unknown')}"
)
except Exception as e:
logger.error(f"[WebSocket] Failed to send message: {str(e)}")
# 如果发送失败,将消息重新放回队列(可选)
# await self.send_queue.put(msg)
break
# 在批量发送之间添加小延迟,避免过载
if batch_size > 5:
await asyncio.sleep(0.001)
except asyncio.TimeoutError:
# 超时是正常的,继续循环
continue
except asyncio.CancelledError:
logger.debug("[WebSocket] Send queue processor cancelled")
break
except Exception as e:
logger.error(f"[WebSocket] Error in send queue processor: {str(e)}")
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"[WebSocket] Fatal error in send queue processor: {str(e)}")
finally:
logger.debug("[WebSocket] Send queue processor stopped")
# 消息处理方法
async def _message_handler(self):
"""处理接收到的消息"""
@@ -831,24 +909,23 @@ class WebSocketClient(BaseCommunicationClient):
# MessageSender接口实现
async def send_message(self, message: Dict[str, Any]) -> None:
"""内部发送消息方法,使用锁确保线程安全"""
if not self.connected or not self.websocket:
"""内部发送消息方法,将消息放入发送队列"""
if not self.connected:
logger.warning("[WebSocket] Not connected, cannot send message")
return
# 检查是否已初始化(在事件循环启动后才会创建)
if not self.send_lock:
logger.warning("[WebSocket] Send lock not initialized, cannot send message safely")
# 检查发送队列是否已初始化
if not self.send_queue:
logger.warning("[WebSocket] Send queue not initialized, cannot send message")
return
message_str = json.dumps(message, ensure_ascii=False)
# 使用异步锁防止并发写入导致的竞态条件
async with self.send_lock:
try:
await self.websocket.send(message_str)
logger.debug(f"[WebSocket] Message sent: {message['action']}")
except Exception as e:
logger.error(f"[WebSocket] Failed to send message: {str(e)}")
# 尝试将消息放入队列(非阻塞)
self.send_queue.put_nowait(message)
logger.trace(f"[WebSocket] Message queued: {message['action']}") # type: ignore
except asyncio.QueueFull:
logger.error(f"[WebSocket] Send queue full, dropping message: {message['action']}")
# 可选:在队列满时采取其他策略,如等待或丢弃旧消息
def is_connected(self) -> bool:
"""检查是否已连接TaskScheduler调用的接口"""