Add create_resource and test_resource example.

This commit is contained in:
Xuwznln
2026-01-12 21:17:28 +08:00
parent 965bf36e8d
commit ec015e16cd
5 changed files with 144 additions and 66 deletions

View File

@@ -1151,11 +1151,7 @@ def initialize_resource(resource_config: dict, resource_type: Any = None) -> Uni
if resource_class_config["type"] == "pylabrobot":
resource_plr = RESOURCE(name=resource_config["name"])
if resource_type != ResourcePLR:
tree_sets = ResourceTreeSet.from_plr_resources([resource_plr])
# r = resource_plr_to_ulab(resource_plr=resource_plr, parent_name=resource_config.get("parent", None))
# # r = resource_plr_to_ulab(resource_plr=resource_plr)
# if resource_config.get("position") is not None:
# r["position"] = resource_config["position"]
tree_sets = ResourceTreeSet.from_plr_resources([resource_plr], known_newly_created=True)
r = tree_sets.dump()
else:
r = resource_plr

View File

@@ -147,17 +147,17 @@ class ResourceDictInstance(object):
if not content.get("extra"): # MagicCode
content["extra"] = {}
if "position" in content:
pose = content.get("pose",{})
if "position" not in pose :
pose = content.get("pose", {})
if "position" not in pose:
if "position" in content["position"]:
pose["position"] = content["position"]["position"]
else:
pose["position"] = {"x": 0, "y": 0, "z": 0}
if "size" not in pose:
pose["size"] = {
"width": content["config"].get("size_x", 0),
"height": content["config"].get("size_y", 0),
"depth": content["config"].get("size_z", 0)
"width": content["config"].get("size_x", 0),
"height": content["config"].get("size_y", 0),
"depth": content["config"].get("size_z", 0),
}
content["pose"] = pose
return ResourceDictInstance(ResourceDict.model_validate(content))
@@ -322,7 +322,7 @@ class ResourceTreeSet(object):
)
@classmethod
def from_plr_resources(cls, resources: List["PLRResource"]) -> "ResourceTreeSet":
def from_plr_resources(cls, resources: List["PLRResource"], known_newly_created=False) -> "ResourceTreeSet":
"""
从plr资源创建ResourceTreeSet
"""
@@ -349,7 +349,8 @@ class ResourceTreeSet(object):
if not uid:
uid = str(uuid.uuid4())
res.unilabos_uuid = uid
logger.warning(f"{res}没有uuid请设置后再传入默认填充{uid}\n{traceback.format_exc()}")
if not known_newly_created:
logger.warning(f"{res}没有uuid请设置后再传入默认填充{uid}\n{traceback.format_exc()}")
# 获取unilabos_extra默认为空字典
extra = getattr(res, "unilabos_extra", {})
@@ -448,7 +449,13 @@ class ResourceTreeSet(object):
from pylabrobot.utils.object_parsing import find_subclass
# 类型映射
TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer", "tip_spot": "TipSpot"}
TYPE_MAP = {
"plate": "Plate",
"well": "Well",
"deck": "Deck",
"container": "RegularContainer",
"tip_spot": "TipSpot",
}
def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict):
"""一次遍历收集 name_to_uuid, all_states 和 name_to_extra"""
@@ -918,6 +925,33 @@ class DeviceNodeResourceTracker(object):
return self._traverse_and_process(resource, process)
def loop_find_with_uuid(self, resource, target_uuid: str):
"""
递归遍历资源树,根据 uuid 查找并返回对应的资源
Args:
resource: 资源对象可以是list、dict或实例
target_uuid: 要查找的uuid
Returns:
找到的资源对象未找到则返回None
"""
found_resource = None
def process(res):
nonlocal found_resource
if found_resource is not None:
return 0 # 已找到,跳过后续处理
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid and current_uuid == target_uuid:
found_resource = res
logger.trace(f"找到资源UUID: {target_uuid}")
return 1
return 0
self._traverse_and_process(resource, process)
return found_resource
def loop_set_extra(self, resource, name_to_extra_map: Dict[str, dict]) -> int:
"""
递归遍历资源树,根据 name 设置所有节点的 extra