diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 82925851..4d8f7134 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -6,8 +6,8 @@ from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKIN from unilabos.utils.log import logger if TYPE_CHECKING: - # from unilabos.devices.workstation.workstation_base import WorkstationBase - from pylabrobot.resources import Resource as PLRResource, corning_6_wellplate_16point8ml_flat + from unilabos.devices.workstation.workstation_base import WorkstationBase + from pylabrobot.resources import Resource as PLRResource class ResourceDictPositionSize(BaseModel): @@ -372,49 +372,79 @@ class ResourceTreeSet(object): - PLR 资源实例列表 - 每个资源对应的 name_to_uuid 映射字典列表 """ - from unilabos.resources.graphio import resource_ulab_to_plr + from pylabrobot.resources import Resource as PLRResource + from pylabrobot.utils.object_parsing import find_subclass + import inspect + + # 类型映射 + TYPE_MAP = {"plate": "plate", "well": "well", "container": "tip_spot", "deck": "deck", "tip_rack": "tip_rack"} + + def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict): + """一次遍历收集 name_to_uuid 和 all_states""" + name_to_uuid[node.res_content.name] = node.res_content.uuid + all_states[node.res_content.name] = node.res_content.data + for child in node.children: + collect_node_data(child, name_to_uuid, all_states) + + def node_to_plr_dict(node: ResourceDictInstance, has_model: bool): + """转换节点为 PLR 字典格式""" + res = node.res_content + plr_type = TYPE_MAP.get(res.type, "tip_spot") + if res.type not in TYPE_MAP: + logger.warning(f"未知类型 {res.type},使用默认类型 tip_spot") + + d = { + "name": res.name, + "type": plr_type, + "size_x": res.config.get("size_x", 0), + "size_y": res.config.get("size_y", 0), + "size_z": res.config.get("size_z", 0), + "location": { + "x": res.position.position.x, + "y": res.position.position.y, + "z": res.position.position.z, + "type": "Coordinate", + }, + "rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"}, + "category": plr_type, + "children": [node_to_plr_dict(child, has_model) for child in node.children], + "parent_name": res.parent_name, + **res.config, + } + if has_model: + d["model"] = res.config.get("model", None) + return d plr_resources = [] name_to_uuid_maps = [] - - def build_name_to_uuid_map(node: ResourceDictInstance, result: Dict[str, str]): - """递归构建 name 到 uuid 的映射""" - result[node.res_content.name] = node.res_content.uuid - for child in node.children: - build_name_to_uuid_map(child, result) + tracker = DeviceNodeResourceTracker() for tree in self.trees: - # 构建 name_to_uuid 映射 - name_to_uuid = {} - build_name_to_uuid_map(tree.root_node, name_to_uuid) + name_to_uuid: Dict[str, str] = {} + all_states: Dict[str, Any] = {} + collect_node_data(tree.root_node, name_to_uuid, all_states) - # 使用 get_nested_dict 获取字典表示 - resource_dict = tree.root_node.get_nested_dict() - - # 判断是否包含 model(Deck 下没有 model) - plr_model = tree.root_node.res_content.type != "deck" + has_model = tree.root_node.res_content.type != "deck" + plr_dict = node_to_plr_dict(tree.root_node, has_model) try: - # 使用 resource_ulab_to_plr 创建 PLR 资源实例 - plr_resource = resource_ulab_to_plr(resource_dict, plr_model=plr_model) + sub_cls = find_subclass(plr_dict["type"], PLRResource) + if sub_cls is None: + raise ValueError(f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类") - # 设置 unilabos_uuid 属性到资源及其所有子节点 - def set_uuid_recursive(plr_res: "PLRResource", node: ResourceDictInstance): - """递归设置 PLR 资源的 unilabos_uuid 属性""" - setattr(plr_res, "unilabos_uuid", node.res_content.uuid) - # 匹配子节点(通过 name) - for plr_child in plr_res.children: - matching_node = next( - (child for child in node.children if child.res_content.name == plr_child.name), - None, - ) - if matching_node: - set_uuid_recursive(plr_child, matching_node) + spec = inspect.signature(sub_cls) + if "category" not in spec.parameters: + plr_dict.pop("category", None) - set_uuid_recursive(plr_resource, tree.root_node) + plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) + plr_resource.load_all_state(all_states) + + # 使用 DeviceNodeResourceTracker 设置 UUID + tracker.loop_set_uuid(plr_resource, name_to_uuid) plr_resources.append(plr_resource) name_to_uuid_maps.append(name_to_uuid) + except Exception as e: logger.error(f"转换 PLR 资源失败: {e}") import traceback @@ -685,9 +715,7 @@ class DeviceNodeResourceTracker(object): res_list = [] for r in self.resources: if isinstance(query_resource, dict): - res_list.extend( - self.loop_find_resource(r, object, identifier_key, query_resource[identifier_key]) - ) + res_list.extend(self.loop_find_resource(r, object, identifier_key, query_resource[identifier_key])) else: res_list.extend( self.loop_find_resource( @@ -735,134 +763,84 @@ class DeviceNodeResourceTracker(object): if __name__ == "__main__": - import sys - import os + from pylabrobot.resources import corning_6_wellplate_16point8ml_flat - a = corning_6_wellplate_16point8ml_flat("a").serialize() - # 尝试导入 pylabrobot,如果失败则尝试从本地 pylabrobot_repo 导入 - try: - from pylabrobot.resources import Resource, Coordinate - except ImportError: - # 尝试添加本地 pylabrobot_repo 路径 - # __file__ is unilabos/ros/nodes/resource_tracker.py - # We need to go up 4 levels to get to project root - current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) - pylabrobot_path = os.path.join(current_dir, "pylabrobot_repo") - if os.path.exists(pylabrobot_path): - sys.path.insert(0, pylabrobot_path) - try: - from pylabrobot.resources import Resource, Coordinate - except ImportError: - print("pylabrobot 未安装,且无法从本地 pylabrobot_repo 导入") - print("如需运行测试,请先安装: pip install pylabrobot") - exit(0) - else: - print("pylabrobot 未安装,跳过测试") - print("如需运行测试,请先安装: pip install pylabrobot") - exit(0) + # 测试 from_plr_resources 和 to_plr_resources 的往返转换 + print("=" * 60) + print("测试 PLR 资源转换往返") + print("=" * 60) - # 创建一个简单的测试资源 - def create_test_resource(name: str): - """创建一个简单的测试用资源""" - # 创建父资源 - parent = Resource(name=name, size_x=100.0, size_y=100.0, size_z=50.0, category="container") + # 1. 创建一个 PLR 资源并设置 UUID + original_plate = corning_6_wellplate_16point8ml_flat("test_plate") - # 添加一些子资源 - for i in range(3): - child = Resource(name=f"{name}_child_{i}", size_x=20.0, size_y=20.0, size_z=10.0, category="container") - child.location = Coordinate(x=i * 30, y=0, z=0) - parent.assign_child_resource(child, location=child.location) + # 使用 DeviceNodeResourceTracker 设置 UUID + tracker = DeviceNodeResourceTracker() + name_to_uuid = {} - return parent + # 递归生成 name_to_uuid 映射 + def build_uuid_map(resource): + name_to_uuid[resource.name] = str(uuid.uuid4()) + for child in resource.children: + build_uuid_map(child) - print("=" * 80) - print("测试 1: 基本序列化和反序列化") - print("=" * 80) + build_uuid_map(original_plate) - # 创建原始 PLR 资源 - original_resource = create_test_resource("test_resource") - print(f"\n1. 创建原始 PLR 资源: {original_resource.name}") - print(f" 子节点数量: {len(original_resource.children)}") + # 使用 tracker 的 loop_set_uuid 方法设置 UUID + tracker.loop_set_uuid(original_plate, name_to_uuid) - # 手动设置 unilabos_uuid(模拟实际使用场景) - def set_test_uuid(res: "PLRResource", prefix="uuid"): - """递归设置测试用的 uuid""" - import uuid as uuid_module + print(f"\n1. 原始 PLR 资源: {original_plate.name}") + print(f" - UUID: {getattr(original_plate, 'unilabos_uuid', 'N/A')}") + print(f" - 子节点数量: {len(original_plate.children)}") + if original_plate.children: + print(f" - 第一个子节点: {original_plate.children[0].name}") + print(f" - 第一个子节点 UUID: {getattr(original_plate.children[0], 'unilabos_uuid', 'N/A')}") - setattr(res, "unilabos_uuid", f"{prefix}-{uuid_module.uuid4()}") - for i, child in enumerate(res.children): - set_test_uuid(child, f"{prefix}-{i}") + # 2. 将 PLR 资源转换为 ResourceTreeSet + resource_tree_set = ResourceTreeSet.from_plr_resources([original_plate]) + print(f"\n2. 转换为 ResourceTreeSet:") + print(f" - 树的数量: {len(resource_tree_set.trees)}") + print(f" - 根节点: {resource_tree_set.root_nodes[0].res_content.name}") + print(f" - 所有节点数量: {len(resource_tree_set.all_nodes)}") - set_test_uuid(original_resource, "root") - print(f" 根节点 UUID: {getattr(original_resource, 'unilabos_uuid', 'None')}") + # 3. 将 ResourceTreeSet 转换回 PLR 资源 + plr_resources, name_to_uuid_maps = resource_tree_set.to_plr_resources() + converted_plate = plr_resources[0] + print(f"\n3. 转换回 PLR 资源: {converted_plate.name}") + print(f" - 子节点数量: {len(converted_plate.children)}") + if converted_plate.children: + print(f" - 第一个子节点: {converted_plate.children[0].name}") - # 转换为 ResourceTreeSet (from_plr_resources) - print("\n2. 使用 from_plr_resources 转换为 ResourceTreeSet") - resource_tree_set = ResourceTreeSet.from_plr_resources([original_resource]) - print(f" 树的数量: {len(resource_tree_set.trees)}") - print(f" 根节点名称: {resource_tree_set.root_nodes[0].res_content.name}") - print(f" 根节点 UUID: {resource_tree_set.root_nodes[0].res_content.uuid}") - print(f" 总节点数: {len(resource_tree_set.all_nodes)}") - - # 转换回 PLR 资源 (to_plr_resources) - print("\n3. 使用 to_plr_resources 转换回 PLR 资源") - try: - plr_resources, name_to_uuid_maps = resource_tree_set.to_plr_resources() - except ModuleNotFoundError as e: - print(f" ❌ 缺少依赖模块: {e}") - print(" 提示: to_plr_resources 方法实现完成,但需要安装额外的依赖(如 networkx)") - print("\n测试部分完成!from_plr_resources 已验证正常工作。") - exit(0) - print(f" PLR 资源数量: {len(plr_resources)}") - print(f" name_to_uuid 映射数量: {len(name_to_uuid_maps)}") - - restored_resource = plr_resources[0] + # 4. 验证 UUID 映射 name_to_uuid = name_to_uuid_maps[0] + print(f"\n4. UUID 映射:") + print(f" - 映射条目数: {len(name_to_uuid)}") + print(f" - 示例映射: {list(name_to_uuid.items())[:3]}") - print(f" 恢复的资源名称: {restored_resource.name}") - print(f" 恢复的资源子节点数: {len(restored_resource.children)}") - print(f" 恢复的资源 UUID: {getattr(restored_resource, 'unilabos_uuid', 'None')}") - print(f" name_to_uuid 映射条目数: {len(name_to_uuid)}") + # 5. 验证 unilabos_uuid 属性 + print(f"\n5. 验证 unilabos_uuid 设置:") + if hasattr(converted_plate, "unilabos_uuid"): + print(f" - 根节点 UUID: {getattr(converted_plate, 'unilabos_uuid')}") + if converted_plate.children and hasattr(converted_plate.children[0], "unilabos_uuid"): + print(f" - 第一个子节点 UUID: {getattr(converted_plate.children[0], 'unilabos_uuid')}") + else: + print(" - 警告: unilabos_uuid 未设置") - # 验证 UUID 映射 - print("\n4. 验证 UUID 映射") - original_uuid = getattr(original_resource, "unilabos_uuid", None) - restored_uuid = getattr(restored_resource, "unilabos_uuid", None) - print(f" 原始根节点 UUID: {original_uuid}") - print(f" 恢复后根节点 UUID: {restored_uuid}") - print(f" UUID 匹配: {original_uuid == restored_uuid}") + # 6. 验证 UUID 保持不变 + print(f"\n6. 验证 UUID 在往返过程中保持不变:") + original_uuid = getattr(original_plate, "unilabos_uuid") + converted_uuid = getattr(converted_plate, "unilabos_uuid") + print(f" - 原始 UUID: {original_uuid}") + print(f" - 转换后 UUID: {converted_uuid}") + print(f" - UUID 保持不变: {original_uuid == converted_uuid}") - # 验证 name_to_uuid 映射完整性 - def count_all_nodes(res: "PLRResource") -> int: - """递归统计节点总数""" - return 1 + sum(count_all_nodes(child) for child in res.children) + # 7. 再次往返转换,验证稳定性 + resource_tree_set_2 = ResourceTreeSet.from_plr_resources([converted_plate]) + plr_resources_2, name_to_uuid_maps_2 = resource_tree_set_2.to_plr_resources() + print(f"\n7. 第二次往返转换:") + print(f" - 资源名称: {plr_resources_2[0].name}") + print(f" - 子节点数量: {len(plr_resources_2[0].children)}") + print(f" - UUID 依然保持: {getattr(plr_resources_2[0], 'unilabos_uuid') == original_uuid}") - original_node_count = count_all_nodes(original_resource) - restored_node_count = count_all_nodes(restored_resource) - mapping_count = len(name_to_uuid) - - print(f"\n 原始资源节点总数: {original_node_count}") - print(f" 恢复资源节点总数: {restored_node_count}") - print(f" 映射字典条目数: {mapping_count}") - print(f" 节点数量匹配: {original_node_count == restored_node_count == mapping_count}") - - # 验证子节点的 UUID - print("\n5. 验证子节点 UUID (前3个)") - for i, (original_child, restored_child) in enumerate( - zip(original_resource.children[:3], restored_resource.children[:3]) - ): - orig_uuid = getattr(original_child, "unilabos_uuid", None) - rest_uuid = getattr(restored_child, "unilabos_uuid", None) - print(f" 子节点 {i}: {original_child.name}") - print(f" 原始 UUID: {orig_uuid}") - print(f" 恢复 UUID: {rest_uuid}") - print(f" 匹配: {orig_uuid == rest_uuid}") - - # 测试 name_to_uuid 映射的正确性 - print("\n6. 验证 name_to_uuid 映射内容 (前5个)") - for i, (name, uuid_val) in enumerate(list(name_to_uuid.items())[:5]): - print(f" {name} -> {uuid_val}") - - print("\n" + "=" * 80) - print("测试完成!") - print("=" * 80) + print("\n" + "=" * 60) + print("✅ 测试完成! 所有转换正常工作") + print("=" * 60) diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index ccc96cb4..f72edf29 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -150,7 +150,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): target_type = import_manager.get_class(type_path) contain_model = not issubclass(target_type, Deck) resource, target_type = self._process_resource_mapping(resource, target_type) - resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) + resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state states[prefix_path] = resource_instance.serialize_all_state() # 使用 prefix_path 作为 key 存储资源状态 if to_dict: @@ -159,7 +159,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): return serialized else: self.resource_tracker.add_resource(resource_instance) - # 立即设置UUID + # 立即设置UUID,state已经在resource_ulab_to_plr中处理过了 if name_to_uuid: self.resource_tracker.loop_set_uuid(resource_instance, name_to_uuid) return resource_instance @@ -244,7 +244,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): v[kk] = vv self.device_instance.load_all_state(v) self.resource_tracker.add_resource(self.device_instance) - self.post_create() + self.post_create() # 对应DeviceClassCreator进行调用 return self.device_instance # type: ignore except Exception as e: # 先静默继续,尝试另外一种创建方法 @@ -265,7 +265,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid) - self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) + self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用,调用的自身的attach_resource except Exception as e: logger.error(f"PyLabRobot创建实例失败: {e}") logger.error(f"PyLabRobot创建实例堆栈: {traceback.format_exc()}")