From 4e52c7d2f40d66b2ce69c10702819c6a0992515b Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 4 Sep 2025 17:11:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Devent=20loop=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/app/ws_client.py | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 69c70b13..2f4b4c7b 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -558,10 +558,9 @@ class TaskScheduler: with self.immediate_execution_lock: self.immediate_execution_flags[item.device_action_key] = time.time() + 3 # 如果是最终状态,通过_stop_job_callback处理 - if self.message_sender.event_loop: - asyncio.run_coroutine_threadsafe( - self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop - ).result() + asyncio.run_coroutine_threadsafe( + self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop + ).result() # 执行结果信息上传 message = { "action": "job_status", @@ -576,11 +575,9 @@ class TaskScheduler: "timestamp": time.time(), }, } - try: - loop = asyncio.get_event_loop() - loop.create_task(self.message_sender.send_message(message)) - except RuntimeError: - asyncio.run(self.message_sender.send_message(message)) + asyncio.run_coroutine_threadsafe( + self.message_sender.send_message(message), self.message_sender.event_loop + ).result() logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore @@ -616,7 +613,7 @@ class WebSocketClient(BaseCommunicationClient): # WebSocket连接相关 self.websocket = None self.connection_loop = None - self.event_loop = None + self.event_loop: asyncio.AbstractEventLoop = None # type: ignore self.connection_thread = None self.is_running = False self.connected = False @@ -826,13 +823,12 @@ class WebSocketClient(BaseCommunicationClient): if host_node: host_node.handle_pong_response(pong_data) - # 消息发送方法 - async def _send_message(self, message: Dict[str, Any]): + # MessageSender接口实现 + async def send_message(self, message: Dict[str, Any]) -> None: """内部发送消息方法""" if not self.connected or not self.websocket: logger.warning("[WebSocket] Not connected, cannot send message") return - try: message_str = json.dumps(message, ensure_ascii=False) await self.websocket.send(message_str) @@ -840,11 +836,6 @@ class WebSocketClient(BaseCommunicationClient): except Exception as e: logger.error(f"[WebSocket] Failed to send message: {str(e)}") - # MessageSender接口实现 - async def send_message(self, message: Dict[str, Any]) -> None: - """发送消息(TaskScheduler调用的接口)""" - await self._send_message(message) - def is_connected(self) -> bool: """检查是否已连接(TaskScheduler调用的接口)""" return self.connected and not self.is_disabled @@ -866,7 +857,7 @@ class WebSocketClient(BaseCommunicationClient): }, } if self.event_loop: - asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") def publish_job_status( @@ -885,7 +876,7 @@ class WebSocketClient(BaseCommunicationClient): return message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} if self.event_loop: - asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) logger.debug(f"[WebSocket] Ping sent: {ping_id}") def cancel_goal(self, job_id: str) -> None: