From f68340d9326a704ac0043c2d941de3e29a9d4ebf Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 10 Sep 2025 18:52:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dstatus=E5=AF=86=E9=9B=86?= =?UTF-8?q?=E5=8F=91=E9=80=81=E6=97=B6=EF=BC=8C=E6=B6=88=E6=81=AF=E5=87=BA?= =?UTF-8?q?=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/app/ws_client.py | 119 +++++++++++++++++++++++++++++++------- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 4d87f4ae..10b377ce 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -142,9 +142,13 @@ class TaskScheduler: # 执行相应的任务 should_continue = False if item.task_type == "query_action_status": - should_continue = asyncio.run_coroutine_threadsafe(self._process_query_status_item(item), self.message_sender.event_loop).result() + should_continue = asyncio.run_coroutine_threadsafe( + self._process_query_status_item(item), self.message_sender.event_loop + ).result() elif item.task_type == "job_call_back_status": - should_continue = asyncio.run_coroutine_threadsafe(self._process_job_callback_item(item), self.message_sender.event_loop).result() + should_continue = asyncio.run_coroutine_threadsafe( + self._process_job_callback_item(item), self.message_sender.event_loop + ).result() else: logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}") continue @@ -622,8 +626,9 @@ class WebSocketClient(BaseCommunicationClient): self.message_queue = asyncio.Queue() if not self.is_disabled else None self.reconnect_count = 0 - # 消息发送锁(解决并发写入问题)- 延迟初始化 - self.send_lock = None + # 消息发送队列和处理器 + self.send_queue = None # 延迟初始化 + self.send_queue_task = None # 发送队列处理任务 # 任务调度器 self.task_scheduler = None @@ -709,8 +714,8 @@ class WebSocketClient(BaseCommunicationClient): self.event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.event_loop) - # 在正确的事件循环中创建锁 - self.send_lock = asyncio.Lock() + # 在正确的事件循环中创建发送队列 + self.send_queue = asyncio.Queue(maxsize=1000) # 限制队列大小防止内存溢出 # 运行连接逻辑 self.event_loop.run_until_complete(self._connection_handler()) @@ -746,8 +751,20 @@ class WebSocketClient(BaseCommunicationClient): logger.info(f"[WebSocket] Connected to {self.websocket_url}") - # 处理消息 - await self._message_handler() + # 启动发送队列处理器 + self.send_queue_task = asyncio.create_task(self._send_queue_processor()) + + try: + # 处理消息 + await self._message_handler() + finally: + # 停止发送队列处理器 + if self.send_queue_task and not self.send_queue_task.done(): + self.send_queue_task.cancel() + try: + await self.send_queue_task + except asyncio.CancelledError: + pass except websockets.exceptions.ConnectionClosed: logger.warning("[WebSocket] Connection closed") @@ -779,6 +796,67 @@ class WebSocketClient(BaseCommunicationClient): await self.websocket.close() self.websocket = None + async def _send_queue_processor(self): + """处理发送队列中的消息""" + logger.debug("[WebSocket] Send queue processor started") + if not self.send_queue: + logger.error("[WebSocket] Send queue not initialized") + return + + try: + while self.connected and self.websocket: + try: + # 使用超时避免无限等待 + message = await asyncio.wait_for(self.send_queue.get(), timeout=1.0) + + # 批量处理:收集短时间内的多个消息 + messages_to_send = [message] + batch_size = 0 + max_batch_size = 10 # 最大批处理数量 + + # 尝试获取更多消息(非阻塞) + while batch_size < max_batch_size and not self.send_queue.empty(): + try: + additional_msg = self.send_queue.get_nowait() + messages_to_send.append(additional_msg) + batch_size += 1 + except asyncio.QueueEmpty: + break + + # 发送消息 + for msg in messages_to_send: + if self.websocket and self.connected: + try: + message_str = json.dumps(msg, ensure_ascii=False) + await self.websocket.send(message_str) + logger.trace( # type: ignore + f"[WebSocket] Message sent: {msg.get('action', 'unknown')}" + ) + except Exception as e: + logger.error(f"[WebSocket] Failed to send message: {str(e)}") + # 如果发送失败,将消息重新放回队列(可选) + # await self.send_queue.put(msg) + break + + # 在批量发送之间添加小延迟,避免过载 + if batch_size > 5: + await asyncio.sleep(0.001) + + except asyncio.TimeoutError: + # 超时是正常的,继续循环 + continue + except asyncio.CancelledError: + logger.debug("[WebSocket] Send queue processor cancelled") + break + except Exception as e: + logger.error(f"[WebSocket] Error in send queue processor: {str(e)}") + await asyncio.sleep(0.1) + + except Exception as e: + logger.error(f"[WebSocket] Fatal error in send queue processor: {str(e)}") + finally: + logger.debug("[WebSocket] Send queue processor stopped") + # 消息处理方法 async def _message_handler(self): """处理接收到的消息""" @@ -831,24 +909,23 @@ class WebSocketClient(BaseCommunicationClient): # MessageSender接口实现 async def send_message(self, message: Dict[str, Any]) -> None: - """内部发送消息方法,使用锁确保线程安全""" - if not self.connected or not self.websocket: + """内部发送消息方法,将消息放入发送队列""" + if not self.connected: logger.warning("[WebSocket] Not connected, cannot send message") return - # 检查锁是否已初始化(在事件循环启动后才会创建) - if not self.send_lock: - logger.warning("[WebSocket] Send lock not initialized, cannot send message safely") + # 检查发送队列是否已初始化 + if not self.send_queue: + logger.warning("[WebSocket] Send queue not initialized, cannot send message") return - message_str = json.dumps(message, ensure_ascii=False) - # 使用异步锁防止并发写入导致的竞态条件 - async with self.send_lock: - try: - await self.websocket.send(message_str) - logger.debug(f"[WebSocket] Message sent: {message['action']}") - except Exception as e: - logger.error(f"[WebSocket] Failed to send message: {str(e)}") + try: + # 尝试将消息放入队列(非阻塞) + self.send_queue.put_nowait(message) + logger.trace(f"[WebSocket] Message queued: {message['action']}") # type: ignore + except asyncio.QueueFull: + logger.error(f"[WebSocket] Send queue full, dropping message: {message['action']}") + # 可选:在队列满时采取其他策略,如等待或丢弃旧消息 def is_connected(self) -> bool: """检查是否已连接(TaskScheduler调用的接口)"""