From 6ddceb8393067ca43a0a0970b0f412cd6ec07bb5 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 4 Sep 2025 19:31:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dedge=E4=B8=8A=E6=8A=A5?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/app/main.py | 6 +++--- unilabos/app/ws_client.py | 25 ++++++++++++++----------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index fd03a2fb..08ef94fe 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -283,7 +283,7 @@ def main(): materials.extend(lab_registry.obtain_registry_device_info()) materials = {k["id"]: k for k in materials} nodes = {k["id"]: k for k in data["nodes"]} - + edge_info = len(resource_edge_info) for ind, i in enumerate(resource_edge_info[::-1]): source_node = nodes[i["source"]] target_node = nodes[i["target"]] @@ -293,11 +293,11 @@ def main(): target_handler_keys = [h["handler_key"] for h in materials[target_node["class"]]["handles"] if h["io_type"] == 'target'] if not source_handle in source_handler_keys: print_status(f"节点 {source_node['id']} 的source端点 {source_handle} 不存在,请检查,支持的端点 {source_handler_keys}", "error") - resource_edge_info.pop(-ind - 1) + resource_edge_info.pop(edge_info - ind - 1) continue if not target_handle in target_handler_keys: print_status(f"节点 {target_node['id']} 的target端点 {target_handle} 不存在,请检查,支持的端点 {target_handler_keys}", "error") - resource_edge_info.pop(-ind - 1) + resource_edge_info.pop(edge_info - ind - 1) continue devices_and_resources = dict_from_graph(graph_res.physical_setup_graph) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 2f4b4c7b..d6aa1b18 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -622,6 +622,9 @@ class WebSocketClient(BaseCommunicationClient): self.message_queue = asyncio.Queue() if not self.is_disabled else None self.reconnect_count = 0 + # 消息发送锁(解决并发写入问题) + self.send_lock = asyncio.Lock() + # 任务调度器 self.task_scheduler = None @@ -825,16 +828,18 @@ class WebSocketClient(BaseCommunicationClient): # MessageSender接口实现 async def send_message(self, message: Dict[str, Any]) -> None: - """内部发送消息方法""" + """内部发送消息方法,使用锁确保线程安全""" if not self.connected or not self.websocket: logger.warning("[WebSocket] Not connected, cannot send message") return - try: - message_str = json.dumps(message, ensure_ascii=False) - await self.websocket.send(message_str) - logger.debug(f"[WebSocket] Message sent: {message['action']}") - except Exception as e: - logger.error(f"[WebSocket] Failed to send message: {str(e)}") + message_str = json.dumps(message, ensure_ascii=False) + # 使用异步锁防止并发写入导致的竞态条件 + async with self.send_lock: + try: + await self.websocket.send(message_str) + logger.debug(f"[WebSocket] Message sent: {message['action']}") + except Exception as e: + logger.error(f"[WebSocket] Failed to send message: {str(e)}") def is_connected(self) -> bool: """检查是否已连接(TaskScheduler调用的接口)""" @@ -856,8 +861,7 @@ class WebSocketClient(BaseCommunicationClient): }, }, } - if self.event_loop: - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result() logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") def publish_job_status( @@ -875,8 +879,7 @@ class WebSocketClient(BaseCommunicationClient): logger.warning("[WebSocket] Not connected, cannot send ping") return message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} - if self.event_loop: - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result() logger.debug(f"[WebSocket] Ping sent: {ping_id}") def cancel_goal(self, job_id: str) -> None: