From 31c9f9a17236d9fc1b349f44e8df0156e3c622fc Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 13 Jan 2026 20:21:37 +0800 Subject: [PATCH] =?UTF-8?q?=E7=89=A9=E6=96=99=E6=9B=B4=E6=96=B0=E4=B9=9F?= =?UTF-8?q?=E6=98=AF=E7=94=A8=E7=88=B6=E8=8A=82=E7=82=B9=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E6=8A=A5=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/app/ws_client.py | 6 +- unilabos/resources/resource_tracker.py | 2 +- unilabos/ros/nodes/base_device_node.py | 185 ++++++++++++++---------- unilabos/ros/nodes/presets/host_node.py | 9 +- 4 files changed, 114 insertions(+), 88 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 4c87d36..ea8c8e7 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -848,7 +848,7 @@ class MessageProcessor: device_action_groups[key_add].append(item["uuid"]) logger.info( - f"[MessageProcessor] Resource migrated: {item['uuid'][:8]} from {device_old_id} to {device_id}" + f"[资源同步] 跨站Transfer: {item['uuid'][:8]} from {device_old_id} to {device_id}" ) else: # 正常update @@ -863,11 +863,11 @@ class MessageProcessor: device_action_groups[key] = [] device_action_groups[key].append(item["uuid"]) - logger.info(f"触发物料更新 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}") + logger.trace(f"[资源同步] 动作 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}") # 为每个(device_id, action)创建独立的更新线程 for (device_id, actual_action), items in device_action_groups.items(): - logger.info(f"设备 {device_id} 物料更新 {actual_action} 数量: {len(items)}") + logger.trace(f"[资源同步] {device_id} 物料动作 {actual_action} 数量: {len(items)}") def _notify_resource_tree(dev_id, act, item_list): try: diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index ea8e5cf..afd4c3b 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -1143,7 +1143,7 @@ class DeviceNodeResourceTracker(object): for key in keys_to_remove: self.resource2parent_resource.pop(key, None) - logger.debug(f"成功移除资源: {resource}") + logger.trace(f"[ResourceTracker] 成功移除资源: {resource}") return True def clear_resource(self): diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 8dd4a28..c99df6c 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -656,61 +656,71 @@ class BaseROS2DeviceNode(Node, Generic[T]): def transfer_to_new_resource( self, plr_resource: "ResourcePLR", tree: ResourceTreeInstance, additional_add_params: Dict[str, Any] - ): + ) -> Optional["ResourcePLR"]: parent_uuid = tree.root_node.res_content.parent_uuid - if parent_uuid: - parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid) - if parent_resource is None: + if not parent_uuid: + self.lab_logger().warning( + f"物料{plr_resource} parent未知,挂载到当前节点下,额外参数:{additional_add_params}" + ) + return None + if parent_uuid == self.uuid: + self.lab_logger().warning( + f"物料{plr_resource}请求挂载到{self.identifier},额外参数:{additional_add_params}" + ) + return None + parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid) + if parent_resource is None: + self.lab_logger().warning( + f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在" + ) + else: + try: + # 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步 + additional_params = {} + extra = getattr(plr_resource, "unilabos_extra", {}) + if len(extra): + self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra)) + if "update_resource_site" in extra: + additional_add_params["site"] = extra["update_resource_site"] + site = additional_add_params.get("site", None) + spec = inspect.signature(parent_resource.assign_child_resource) + if "spot" in spec.parameters: + ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering") + if ordering_dict: + site = list(ordering_dict.keys()).index(site) + additional_params["spot"] = site + old_parent = plr_resource.parent + if old_parent is not None: + # plr并不支持同一个deck的加载和卸载 + self.lab_logger().warning(f"物料{plr_resource}请求从{old_parent}卸载") + old_parent.unassign_child_resource(plr_resource) self.lab_logger().warning( - f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在" + f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}" ) - else: - try: - # 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步 - additional_params = {} - extra = getattr(plr_resource, "unilabos_extra", {}) - if len(extra): - self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra)) - if "update_resource_site" in extra: - additional_add_params["site"] = extra["update_resource_site"] - site = additional_add_params.get("site", None) - spec = inspect.signature(parent_resource.assign_child_resource) - if "spot" in spec.parameters: - ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering") - if ordering_dict: - site = list(ordering_dict.keys()).index(site) - additional_params["spot"] = site - old_parent = plr_resource.parent - if old_parent is not None: - # plr并不支持同一个deck的加载和卸载 - self.lab_logger().warning(f"物料{plr_resource}请求从{old_parent}卸载") - old_parent.unassign_child_resource(plr_resource) - self.lab_logger().warning( - f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}" - ) - # ⭐ assign 之前,需要从 resources 列表中移除 - # 因为资源将不再是顶级资源,而是成为 parent_resource 的子资源 - # 如果不移除,figure_resource 会找到两次:一次在 resources,一次在 parent 的 children - resource_id = id(plr_resource) - for i, r in enumerate(self.resource_tracker.resources): - if id(r) == resource_id: - self.resource_tracker.resources.pop(i) - self.lab_logger().debug( - f"从顶级资源列表中移除 {plr_resource.name}(即将成为 {parent_resource.name} 的子资源)" - ) - break + # ⭐ assign 之前,需要从 resources 列表中移除 + # 因为资源将不再是顶级资源,而是成为 parent_resource 的子资源 + # 如果不移除,figure_resource 会找到两次:一次在 resources,一次在 parent 的 children + resource_id = id(plr_resource) + for i, r in enumerate(self.resource_tracker.resources): + if id(r) == resource_id: + self.resource_tracker.resources.pop(i) + self.lab_logger().debug( + f"从顶级资源列表中移除 {plr_resource.name}(即将成为 {parent_resource.name} 的子资源)" + ) + break - parent_resource.assign_child_resource(plr_resource, location=None, **additional_params) + parent_resource.assign_child_resource(plr_resource, location=None, **additional_params) - func = getattr(self.driver_instance, "resource_tree_transfer", None) - if callable(func): - # 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了) - func(old_parent, plr_resource, parent_resource) - except Exception as e: - self.lab_logger().warning( - f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}" - ) + func = getattr(self.driver_instance, "resource_tree_transfer", None) + if callable(func): + # 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了) + func(old_parent, plr_resource, parent_resource) + return parent_resource + except Exception as e: + self.lab_logger().warning( + f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}" + ) async def s2c_resource_tree(self, req: SerialCommand_Request, res: SerialCommand_Response): """ @@ -725,7 +735,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): def _handle_add( plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any] - ) -> Dict[str, Any]: + ) -> Tuple[Dict[str, Any], List[ResourcePLR]]: """ 处理资源添加操作的内部函数 @@ -737,15 +747,20 @@ class BaseROS2DeviceNode(Node, Generic[T]): Returns: 操作结果字典 """ + parents = [] # 放的是被变更的物料 / 被变更的物料父级 for plr_resource, tree in zip(plr_resources, tree_set.trees): self.resource_tracker.add_resource(plr_resource) - self.transfer_to_new_resource(plr_resource, tree, additional_add_params) + parent = self.transfer_to_new_resource(plr_resource, tree, additional_add_params) + if parent is not None: + parents.append(parent) + else: + parents.append(plr_resource) func = getattr(self.driver_instance, "resource_tree_add", None) if callable(func): func(plr_resources) - return {"success": True, "action": "add"} + return {"success": True, "action": "add"}, parents def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]: """ @@ -780,11 +795,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): if plr_resource.parent is not None: plr_resource.parent.unassign_child_resource(plr_resource) self.resource_tracker.remove_resource(plr_resource) - self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点") + self.lab_logger().info(f"[资源同步] 移除物料 {plr_resource} 及其子节点") for other_plr_resource in other_plr_resources: self.resource_tracker.remove_resource(other_plr_resource) - self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点") + self.lab_logger().info(f"[资源同步] 移除物料 {other_plr_resource} 及其子节点") return { "success": True, @@ -816,11 +831,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): original_instance: ResourcePLR = self.resource_tracker.figure_resource( {"uuid": tree.root_node.res_content.uuid}, try_mode=False ) + original_parent_resource = original_instance.parent + original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None) + target_parent_resource_uuid = tree.root_node.res_content.uuid_parent + not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None + old_name = original_instance.name + new_name = plr_resource.name + parent_appended = False - # Update操作中包含改名:需要先remove再add - if original_instance.name != plr_resource.name: - old_name = original_instance.name - new_name = plr_resource.name + # Update操作中包含改名:需要先remove再add,这里更新父节点即可 + if not not_same_parent and old_name != new_name: self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}") # 收集所有相关的uuid(包括子节点) @@ -829,12 +849,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): _handle_add([original_instance], tree_set, additional_add_params) self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}") + original_instances.append(original_parent_resource) + parent_appended = True # 常规更新:不涉及改名 - original_parent_resource = original_instance.parent - original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None) - target_parent_resource_uuid = tree.root_node.res_content.uuid_parent - self.lab_logger().info( f"物料{original_instance} 原始父节点{original_parent_resource_uuid} " f"目标父节点{target_parent_resource_uuid} 更新" @@ -845,13 +863,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501 # 如果父节点变化,需要重新挂载 - if ( - original_parent_resource_uuid != target_parent_resource_uuid - and original_parent_resource is not None - ): - self.transfer_to_new_resource(original_instance, tree, additional_add_params) + if not_same_parent: + parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params) + original_instances.append(parent) + parent_appended = True else: - # 判断是否变更了resource_site + # 判断是否变更了resource_site,重新登记 target_site = original_instance.unilabos_extra.get("update_resource_site") sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else [] @@ -859,7 +876,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): site_index = sites.index(original_instance) site_name = site_names[site_index] if site_name != target_site: - self.transfer_to_new_resource(original_instance, tree, additional_add_params) + parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params) + if parent is not None: + original_instances.append(parent) + parent_appended = True # 加载状态 original_instance.load_all_state(states) @@ -867,7 +887,8 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().info( f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count} 个" ) - original_instances.append(original_instance) + if not parent_appended: + original_instances.append(original_instance) # 调用driver的update回调 func = getattr(self.driver_instance, "resource_tree_update", None) @@ -884,8 +905,8 @@ class BaseROS2DeviceNode(Node, Generic[T]): action = i.get("action") # remove, add, update resources_uuid: List[str] = i.get("data") # 资源数据 additional_add_params = i.get("additional_add_params", {}) # 额外参数 - self.lab_logger().info( - f"[Resource Tree Update] Processing {action} operation, " f"resources count: {len(resources_uuid)}" + self.lab_logger().trace( + f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}" ) tree_set = None if action in ["add", "update"]: @@ -897,8 +918,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): if tree_set is None: raise ValueError("tree_set不能为None") plr_resources = tree_set.to_plr_resources() - result = _handle_add(plr_resources, tree_set, additional_add_params) - new_tree_set = ResourceTreeSet.from_plr_resources(plr_resources) + result, parents = _handle_add(plr_resources, tree_set, additional_add_params) + parents: List[Optional["ResourcePLR"]] = [i for i in parents if i is not None] + de_dupe_parents = list(set(parents)) + new_tree_set = ResourceTreeSet.from_plr_resources(de_dupe_parents) # 去重 + for tree in new_tree_set.trees: + if tree.root_node.res_content.uuid_parent is None and self.node_name != "host_node": + tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 @@ -917,7 +943,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): plr_resources.append(ResourceTreeSet([tree]).to_plr_resources()[0]) result, original_instances = _handle_update(plr_resources, tree_set, additional_add_params) if not BasicConfig.no_update_feedback: - new_tree_set = ResourceTreeSet.from_plr_resources(original_instances) + new_tree_set = ResourceTreeSet.from_plr_resources(original_instances) # 去重 + for tree in new_tree_set.trees: + if tree.root_node.res_content.uuid_parent is None and self.node_name != "host_node": + tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 @@ -937,15 +966,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 返回处理结果 result_json = {"results": results, "total": len(data)} res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder) - self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations") + # self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations") except json.JSONDecodeError as e: error_msg = f"Invalid JSON format: {str(e)}" - self.lab_logger().error(f"[Resource Tree Update] {error_msg}") + self.lab_logger().error(f"[资源同步] {error_msg}") res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False) except Exception as e: error_msg = f"Unexpected error: {str(e)}" - self.lab_logger().error(f"[Resource Tree Update] {error_msg}") + self.lab_logger().error(f"[资源同步] {error_msg}") self.lab_logger().error(traceback.format_exc()) res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False) diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 0f6b697..cf9fd70 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -362,8 +362,7 @@ class HostNode(BaseROS2DeviceNode): request.command = "" future = sclient.call_async(request) # Use timeout for result as well - future.result(timeout_sec=5.0) - self.lab_logger().debug(f"[Host Node] Re-register completed for {device_namespace}") + future.result() except Exception as e: # Gracefully handle destruction during shutdown if "destruction was requested" in str(e) or self._shutting_down: @@ -1515,7 +1514,7 @@ class HostNode(BaseROS2DeviceNode): # 构建服务地址 srv_address = f"/srv{namespace}/s2c_resource_tree" - self.lab_logger().info(f"[Host Node-Resource] Notifying {device_id} for resource tree {action} operation") + self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------") # 创建服务客户端 sclient = self.create_client(SerialCommand, srv_address) @@ -1550,9 +1549,7 @@ class HostNode(BaseROS2DeviceNode): time.sleep(0.05) response = future.result() - self.lab_logger().info( - f"[Host Node-Resource] Resource tree {action} notification completed for {device_id}" - ) + self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------") return True except Exception as e: