diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 3d7761e..08ef94f 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -258,7 +258,7 @@ def main(): print_unilab_banner(args_dict) # 注册表 - build_registry(args_dict["registry_path"], False, args_dict["upload_registry"]) + lab_registry = build_registry(args_dict["registry_path"], False, args_dict["upload_registry"]) if args_dict["graph"] is None: request_startup_json = http_client.request_startup_json() if not request_startup_json: @@ -279,6 +279,27 @@ def main(): graph_res.physical_setup_graph = graph resource_edge_info = modify_to_backend_format(data["links"]) + materials = lab_registry.obtain_registry_resource_info() + materials.extend(lab_registry.obtain_registry_device_info()) + materials = {k["id"]: k for k in materials} + nodes = {k["id"]: k for k in data["nodes"]} + edge_info = len(resource_edge_info) + for ind, i in enumerate(resource_edge_info[::-1]): + source_node = nodes[i["source"]] + target_node = nodes[i["target"]] + source_handle = i["sourceHandle"] + target_handle = i["targetHandle"] + source_handler_keys = [h["handler_key"] for h in materials[source_node["class"]]["handles"] if h["io_type"] == 'source'] + target_handler_keys = [h["handler_key"] for h in materials[target_node["class"]]["handles"] if h["io_type"] == 'target'] + if not source_handle in source_handler_keys: + print_status(f"节点 {source_node['id']} 的source端点 {source_handle} 不存在,请检查,支持的端点 {source_handler_keys}", "error") + resource_edge_info.pop(edge_info - ind - 1) + continue + if not target_handle in target_handler_keys: + print_status(f"节点 {target_node['id']} 的target端点 {target_handle} 不存在,请检查,支持的端点 {target_handler_keys}", "error") + resource_edge_info.pop(edge_info - ind - 1) + continue + devices_and_resources = dict_from_graph(graph_res.physical_setup_graph) # args_dict["resources_config"] = initialize_resources(list(deepcopy(devices_and_resources).values())) args_dict["resources_config"] = list(devices_and_resources.values()) diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 7a50bf4..c8bab67 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -58,10 +58,12 @@ class HTTPClient: headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=100, ) + if self.backend_go and response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"添加物料关系失败: {response.text}") if response.status_code != 200 and response.status_code != 201: logger.error(f"添加物料关系失败: {response.status_code}, {response.text}") - elif self.backend_go: - logger.info(f"添加物料关系 {response.text}") return response def resource_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response: @@ -80,10 +82,12 @@ class HTTPClient: headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"}, timeout=100, ) + if self.backend_go and response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"添加物料失败: {response.text}") if response.status_code != 200: logger.error(f"添加物料失败: {response.text}") - elif self.backend_go: - logger.info(f"添加物料 {response.text}") return response def resource_get(self, id: str, with_children: bool = False) -> Dict[str, Any]: diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 69c70b1..4d87f4a 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -142,9 +142,9 @@ class TaskScheduler: # 执行相应的任务 should_continue = False if item.task_type == "query_action_status": - should_continue = await self._process_query_status_item(item) + should_continue = asyncio.run_coroutine_threadsafe(self._process_query_status_item(item), self.message_sender.event_loop).result() elif item.task_type == "job_call_back_status": - should_continue = await self._process_job_callback_item(item) + should_continue = asyncio.run_coroutine_threadsafe(self._process_job_callback_item(item), self.message_sender.event_loop).result() else: logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}") continue @@ -558,10 +558,9 @@ class TaskScheduler: with self.immediate_execution_lock: self.immediate_execution_flags[item.device_action_key] = time.time() + 3 # 如果是最终状态,通过_stop_job_callback处理 - if self.message_sender.event_loop: - asyncio.run_coroutine_threadsafe( - self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop - ).result() + asyncio.run_coroutine_threadsafe( + self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop + ).result() # 执行结果信息上传 message = { "action": "job_status", @@ -576,11 +575,9 @@ class TaskScheduler: "timestamp": time.time(), }, } - try: - loop = asyncio.get_event_loop() - loop.create_task(self.message_sender.send_message(message)) - except RuntimeError: - asyncio.run(self.message_sender.send_message(message)) + asyncio.run_coroutine_threadsafe( + self.message_sender.send_message(message), self.message_sender.event_loop + ).result() logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore @@ -616,7 +613,7 @@ class WebSocketClient(BaseCommunicationClient): # WebSocket连接相关 self.websocket = None self.connection_loop = None - self.event_loop = None + self.event_loop: asyncio.AbstractEventLoop = None # type: ignore self.connection_thread = None self.is_running = False self.connected = False @@ -625,6 +622,9 @@ class WebSocketClient(BaseCommunicationClient): self.message_queue = asyncio.Queue() if not self.is_disabled else None self.reconnect_count = 0 + # 消息发送锁(解决并发写入问题)- 延迟初始化 + self.send_lock = None + # 任务调度器 self.task_scheduler = None @@ -709,6 +709,9 @@ class WebSocketClient(BaseCommunicationClient): self.event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.event_loop) + # 在正确的事件循环中创建锁 + self.send_lock = asyncio.Lock() + # 运行连接逻辑 self.event_loop.run_until_complete(self._connection_handler()) except Exception as e: @@ -826,24 +829,26 @@ class WebSocketClient(BaseCommunicationClient): if host_node: host_node.handle_pong_response(pong_data) - # 消息发送方法 - async def _send_message(self, message: Dict[str, Any]): - """内部发送消息方法""" + # MessageSender接口实现 + async def send_message(self, message: Dict[str, Any]) -> None: + """内部发送消息方法,使用锁确保线程安全""" if not self.connected or not self.websocket: logger.warning("[WebSocket] Not connected, cannot send message") return - try: - message_str = json.dumps(message, ensure_ascii=False) - await self.websocket.send(message_str) - logger.debug(f"[WebSocket] Message sent: {message['action']}") - except Exception as e: - logger.error(f"[WebSocket] Failed to send message: {str(e)}") + # 检查锁是否已初始化(在事件循环启动后才会创建) + if not self.send_lock: + logger.warning("[WebSocket] Send lock not initialized, cannot send message safely") + return - # MessageSender接口实现 - async def send_message(self, message: Dict[str, Any]) -> None: - """发送消息(TaskScheduler调用的接口)""" - await self._send_message(message) + message_str = json.dumps(message, ensure_ascii=False) + # 使用异步锁防止并发写入导致的竞态条件 + async with self.send_lock: + try: + await self.websocket.send(message_str) + logger.debug(f"[WebSocket] Message sent: {message['action']}") + except Exception as e: + logger.error(f"[WebSocket] Failed to send message: {str(e)}") def is_connected(self) -> bool: """检查是否已连接(TaskScheduler调用的接口)""" @@ -865,8 +870,7 @@ class WebSocketClient(BaseCommunicationClient): }, }, } - if self.event_loop: - asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result() logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") def publish_job_status( @@ -884,8 +888,7 @@ class WebSocketClient(BaseCommunicationClient): logger.warning("[WebSocket] Not connected, cannot send ping") return message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} - if self.event_loop: - asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) + asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result() logger.debug(f"[WebSocket] Ping sent: {ping_id}") def cancel_goal(self, job_id: str) -> None: