From a722636938175d39b69612109cd13a4999ba35a6 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 10 Sep 2025 20:01:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0addr=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/app/main.py | 15 +++++- unilabos/app/ws_client.py | 109 +++++++++++++++++++++++++++----------- 2 files changed, 91 insertions(+), 33 deletions(-) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 08ef94fe..4dc79bbc 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -17,7 +17,7 @@ unilabos_dir = os.path.dirname(os.path.dirname(current_dir)) if unilabos_dir not in sys.path: sys.path.append(unilabos_dir) -from unilabos.config.config import load_config, BasicConfig +from unilabos.config.config import load_config, BasicConfig, HTTPConfig from unilabos.utils.banner_print import print_status, print_unilab_banner from unilabos.resources.graphio import modify_to_backend_format @@ -146,6 +146,12 @@ def parse_args(): default="", help="实验室请求的sk", ) + parser.add_argument( + "--addr", + type=str, + default="https://uni-lab.bohrium.com/api/v1", + help="实验室后端地址", + ) parser.add_argument( "--websocket", action="store_true", @@ -231,6 +237,13 @@ def main(): # 设置BasicConfig参数 BasicConfig.ak = args_dict.get("ak", "") BasicConfig.sk = args_dict.get("sk", "") + if args_dict["addr"] == "test": + print_status("使用测试环境地址", "info") + HTTPConfig.remote_addr = "https://uni-lab.test.bohrium.com/api/v1" + elif args_dict["addr"] == "local": + print_status("使用本地环境地址", "info") + HTTPConfig.remote_addr = "http://127.0.0.1:48197/api/v1" + HTTPConfig.remote_addr = args_dict.get("addr", "") BasicConfig.working_dir = working_dir BasicConfig.is_host_mode = not args_dict.get("without_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 10b377ce..fecb01c7 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -385,21 +385,21 @@ class TaskScheduler: # 创建取消事件,todo:要移动到query_state中 self.cancel_events[req.job_id] = asyncio.Event() + # 启动callback定时发送 + await self._start_job_callback(req.job_id, req.device_id, req.action, req.task_id, device_action_key) + # 创建兼容HostNode的QueueItem对象 + job_queue_item = QueueItem( + task_type="job_call_back_status", + device_id=req.device_id, + action_name=req.action, + task_id=req.task_id, + job_id=req.job_id, + device_action_key=device_action_key, + next_run_time=time.time(), + ) try: - # 启动callback定时发送 - await self._start_job_callback(req.job_id, req.device_id, req.action, req.task_id, device_action_key) - # 创建兼容HostNode的QueueItem对象 - job_queue_item = QueueItem( - task_type="job_call_back_status", - device_id=req.device_id, - action_name=req.action, - task_id=req.task_id, - job_id=req.job_id, - device_action_key=device_action_key, - next_run_time=time.time(), - ) host_node = HostNode.get_instance(0) if not host_node: logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}") @@ -413,15 +413,9 @@ class TaskScheduler: except Exception as e: logger.error(f"[TaskScheduler] Exception during job start for job_id {req.job_id}: {str(e)}") traceback.print_exc() - # 异常结束,先停止callback,然后发送失败状态 - await self._stop_job_callback( - req.job_id, "failed", serialize_result_info(traceback.format_exc(), False, {}) + self.publish_job_status( + {}, job_queue_item, "failed", serialize_result_info(traceback.format_exc(), False, {}) ) - - host_node = HostNode.get_instance(0) - if host_node: - host_node._device_action_status[device_action_key].job_ids.pop(req.job_id, None) - logger.warning(f"[TaskScheduler] Cleaned up failed job from HostNode: {req.job_id}") except Exception as e: logger.error(f"[TaskScheduler] Error handling job start: {str(e)}") @@ -458,7 +452,7 @@ class TaskScheduler: logger.error(f"[TaskScheduler] HostNode not available for cancel goal: {job_id}") # 停止callback并发送取消状态 - await self._stop_job_callback(job_id, "cancelled", "Job was cancelled by user request") + await self._stop_job_callback(job_id, "cancelled") logger.info(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}") else: logger.warning(f"[TaskScheduler] Job not found in active jobs for cancellation: {job_id}") @@ -515,7 +509,7 @@ class TaskScheduler: else: logger.debug(f"[TaskScheduler] Action queue not available for job callback: {job_id}") - async def _stop_job_callback(self, job_id: str, final_status: str, return_info: Optional[str] = None) -> None: + async def _stop_job_callback(self, job_id: str, final_status: str) -> None: """停止job的callback定时发送并发送最终结果""" logger.info(f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}") if job_id not in self.active_jobs: @@ -561,10 +555,24 @@ class TaskScheduler: # 给其他同名action至少执行一次的机会 with self.immediate_execution_lock: self.immediate_execution_flags[item.device_action_key] = time.time() + 3 - # 如果是最终状态,通过_stop_job_callback处理 - asyncio.run_coroutine_threadsafe( - self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop - ).result() + + # 检查是否在同一个事件循环中 + try: + current_loop = asyncio.get_running_loop() + if current_loop == self.message_sender.event_loop: + # 在同一个事件循环中,直接创建任务 + asyncio.create_task(self._stop_job_callback(item.job_id, status)) + else: + # 不在同一个事件循环中,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe( + self._stop_job_callback(item.job_id, status), self.message_sender.event_loop + ) + except RuntimeError: + # 没有运行中的事件循环,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe( + self._stop_job_callback(item.job_id, status), self.message_sender.event_loop + ) + # 执行结果信息上传 message = { "action": "job_status", @@ -579,9 +587,21 @@ class TaskScheduler: "timestamp": time.time(), }, } - asyncio.run_coroutine_threadsafe( - self.message_sender.send_message(message), self.message_sender.event_loop - ).result() + + # 同样检查事件循环 + try: + current_loop = asyncio.get_running_loop() + if current_loop == self.message_sender.event_loop: + # 在同一个事件循环中,直接创建任务 + asyncio.create_task(self.message_sender.send_message(message)) + else: + # 不在同一个事件循环中,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe( + self.message_sender.send_message(message), self.message_sender.event_loop + ) + except RuntimeError: + # 没有运行中的事件循环,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe(self.message_sender.send_message(message), self.message_sender.event_loop) logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore @@ -623,7 +643,6 @@ class WebSocketClient(BaseCommunicationClient): self.connected = False # 消息处理 - self.message_queue = asyncio.Queue() if not self.is_disabled else None self.reconnect_count = 0 # 消息发送队列和处理器 @@ -947,7 +966,20 @@ class WebSocketClient(BaseCommunicationClient): }, }, } - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result() + + # 检查是否在同一个事件循环中 + try: + current_loop = asyncio.get_running_loop() + if current_loop == self.event_loop: + # 在同一个事件循环中,直接创建任务 + asyncio.create_task(self.send_message(message)) + else: + # 不在同一个事件循环中,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) + except RuntimeError: + # 没有运行中的事件循环,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) + logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") def publish_job_status( @@ -965,7 +997,20 @@ class WebSocketClient(BaseCommunicationClient): logger.warning("[WebSocket] Not connected, cannot send ping") return message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result() + + # 检查是否在同一个事件循环中 + try: + current_loop = asyncio.get_running_loop() + if current_loop == self.event_loop: + # 在同一个事件循环中,直接创建任务 + asyncio.create_task(self.send_message(message)) + else: + # 不在同一个事件循环中,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) + except RuntimeError: + # 没有运行中的事件循环,使用 run_coroutine_threadsafe + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) + logger.debug(f"[WebSocket] Ping sent: {ping_id}") def cancel_goal(self, job_id: str) -> None: