From 55165024dde0565b96c3b7e6a3fce688de29d159 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 4 Sep 2025 20:19:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dasync=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/app/ws_client.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index d6aa1b18..4d87f4ae 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -142,9 +142,9 @@ class TaskScheduler: # 执行相应的任务 should_continue = False if item.task_type == "query_action_status": - should_continue = await self._process_query_status_item(item) + should_continue = asyncio.run_coroutine_threadsafe(self._process_query_status_item(item), self.message_sender.event_loop).result() elif item.task_type == "job_call_back_status": - should_continue = await self._process_job_callback_item(item) + should_continue = asyncio.run_coroutine_threadsafe(self._process_job_callback_item(item), self.message_sender.event_loop).result() else: logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}") continue @@ -622,8 +622,8 @@ class WebSocketClient(BaseCommunicationClient): self.message_queue = asyncio.Queue() if not self.is_disabled else None self.reconnect_count = 0 - # 消息发送锁(解决并发写入问题) - self.send_lock = asyncio.Lock() + # 消息发送锁(解决并发写入问题)- 延迟初始化 + self.send_lock = None # 任务调度器 self.task_scheduler = None @@ -709,6 +709,9 @@ class WebSocketClient(BaseCommunicationClient): self.event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.event_loop) + # 在正确的事件循环中创建锁 + self.send_lock = asyncio.Lock() + # 运行连接逻辑 self.event_loop.run_until_complete(self._connection_handler()) except Exception as e: @@ -832,6 +835,12 @@ class WebSocketClient(BaseCommunicationClient): if not self.connected or not self.websocket: logger.warning("[WebSocket] Not connected, cannot send message") return + + # 检查锁是否已初始化(在事件循环启动后才会创建) + if not self.send_lock: + logger.warning("[WebSocket] Send lock not initialized, cannot send message safely") + return + message_str = json.dumps(message, ensure_ascii=False) # 使用异步锁防止并发写入导致的竞态条件 async with self.send_lock: