From f94985632bb2cf7ca773905c70f6406ae7be5deb Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 22 Oct 2025 14:50:05 +0800 Subject: [PATCH] support material extra support update_resource_site in extra --- .../workstation/bioyond_studio/station.py | 1 + unilabos/ros/nodes/base_device_node.py | 5 + unilabos/ros/nodes/resource_tracker.py | 112 ++++++++++++++---- 3 files changed, 98 insertions(+), 20 deletions(-) diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index 33975d8c..21957cd2 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -4,6 +4,7 @@ Bioyond Workstation Implementation 集成Bioyond物料管理的工作站示例 """ +import time import traceback from datetime import datetime from typing import Dict, Any, List, Optional, Union diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 289fe513..4382db3d 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -624,6 +624,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: # 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步 additional_params = {} + extra = getattr(plr_resource, "extra", {}) + if len(extra): + self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra)) + if "update_resource_site" in extra: + additional_params["site"] = extra["update_resource_site"] site = additional_add_params.get("site", None) spec = inspect.signature(parent_resource.assign_child_resource) if "spot" in spec.parameters: diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 1506007b..9fb85faa 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -32,7 +32,7 @@ class ResourceDictPositionObject(BaseModel): class ResourceDictPosition(BaseModel): size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) - layout: Literal["2d", "x-y", "z-y", "x-z", ""] = Field(description="Resource layout", default="x-y") + layout: Literal["2d", "x-y", "z-y", "x-z"] = Field(description="Resource layout", default="x-y") position: ResourceDictPositionObject = Field( description="Resource position", default_factory=ResourceDictPositionObject ) @@ -42,7 +42,9 @@ class ResourceDictPosition(BaseModel): rotation: ResourceDictPositionObject = Field( description="Resource rotation", default_factory=ResourceDictPositionObject ) - cross_section_type: Literal["rectangle", "circle", "rounded_rectangle", ""] = Field(description="Cross section type", default="rectangle") + cross_section_type: Literal["rectangle", "circle", "rounded_rectangle"] = Field( + description="Cross section type", default="rectangle" + ) # 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化 @@ -51,7 +53,9 @@ class ResourceDict(BaseModel): uuid: str = Field(description="Resource UUID") name: str = Field(description="Resource name") description: str = Field(description="Resource description", default="") - resource_schema: Dict[str, Any] = Field(description="Resource schema", default_factory=dict, serialization_alias="schema", validation_alias="schema") + resource_schema: Dict[str, Any] = Field( + description="Resource schema", default_factory=dict, serialization_alias="schema", validation_alias="schema" + ) model: Dict[str, Any] = Field(description="Resource model", default_factory=dict) icon: str = Field(description="Resource icon", default="") parent_uuid: Optional["str"] = Field(description="Parent resource uuid", default=None) # 先设定parent_uuid @@ -62,6 +66,7 @@ class ResourceDict(BaseModel): pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) config: Dict[str, Any] = Field(description="Resource configuration") data: Dict[str, Any] = Field(description="Resource data") + extra: Dict[str, Any] = Field(description="Extra data") @field_serializer("parent_uuid") def _serialize_parent(self, parent_uuid: Optional["ResourceDict"]): @@ -138,6 +143,8 @@ class ResourceDictInstance(object): content["config"] = {} if not content.get("data"): content["data"] = {} + if not content.get("extra"): # MagicCode + content["extra"] = {} if "pose" not in content: content["pose"] = content.get("position", {}) return ResourceDictInstance(ResourceDict.model_validate(content)) @@ -322,21 +329,25 @@ class ResourceTreeSet(object): print("转换pylabrobot的时候,出现未知类型", source) return source - def build_uuid_mapping(res: "PLRResource", uuid_list: list): - """递归构建uuid映射字典""" + def build_uuid_mapping(res: "PLRResource", uuid_list: list, parent_uuid: Optional[str] = None): + """递归构建uuid和extra映射字典,返回(current_uuid, parent_uuid, extra)元组列表""" uid = getattr(res, "unilabos_uuid", "") if not uid: uid = str(uuid.uuid4()) res.unilabos_uuid = uid logger.warning(f"{res}没有uuid,请设置后再传入,默认填充{uid}!\n{traceback.format_exc()}") - uuid_list.append(uid) + + # 获取unilabos_extra,默认为空字典 + extra = getattr(res, "unilabos_extra", {}) + + uuid_list.append((uid, parent_uuid, extra)) for child in res.children: - build_uuid_mapping(child, uuid_list) + build_uuid_mapping(child, uuid_list, uid) def resource_plr_inner( d: dict, parent_resource: Optional[ResourceDict], states: dict, uuids: list ) -> ResourceDictInstance: - current_uuid = uuids.pop(0) + current_uuid, parent_uuid, extra = uuids.pop(0) raw_pos = ( {"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]} @@ -359,13 +370,30 @@ class ResourceTreeSet(object): "uuid": current_uuid, "name": d["name"], "parent": parent_resource, # 直接传入 ResourceDict 对象 + "parent_uuid": parent_uuid, # 使用 parent_uuid 而不是 parent 对象 "type": replace_plr_type(d.get("category", "")), "class": d.get("class", ""), "position": pos, "pose": pos, - "config": {k: v for k, v in d.items() if k not in - ["name", "children", "parent_name", "location", "rotation", "size_x", "size_y", "size_z", "cross_section_type", "bottom_type"]}, + "config": { + k: v + for k, v in d.items() + if k + not in [ + "name", + "children", + "parent_name", + "location", + "rotation", + "size_x", + "size_y", + "size_z", + "cross_section_type", + "bottom_type", + ] + }, "data": states[d["name"]], + "extra": extra, } # 先转换为 ResourceDictInstance,获取其中的 ResourceDict @@ -383,7 +411,7 @@ class ResourceTreeSet(object): for resource in resources: # 构建uuid列表 uuid_list = [] - build_uuid_mapping(resource, uuid_list) + build_uuid_mapping(resource, uuid_list, getattr(resource.parent, "unilabos_uuid", None)) serialized_data = resource.serialize() all_states = resource.serialize_all_state() @@ -408,12 +436,13 @@ class ResourceTreeSet(object): # 类型映射 TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer"} - def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict): - """一次遍历收集 name_to_uuid 和 all_states""" + def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict): + """一次遍历收集 name_to_uuid, all_states 和 name_to_extra""" name_to_uuid[node.res_content.name] = node.res_content.uuid all_states[node.res_content.name] = node.res_content.data + name_to_extra[node.res_content.name] = node.res_content.extra for child in node.children: - collect_node_data(child, name_to_uuid, all_states) + collect_node_data(child, name_to_uuid, all_states, name_to_extra) def node_to_plr_dict(node: ResourceDictInstance, has_model: bool): """转换节点为 PLR 字典格式""" @@ -423,6 +452,7 @@ class ResourceTreeSet(object): logger.warning(f"未知类型 {res.type}") d = { + **res.config, "name": res.name, "type": res.config.get("type", plr_type), "size_x": res.config.get("size_x", 0), @@ -438,33 +468,36 @@ class ResourceTreeSet(object): "category": res.config.get("category", plr_type), "children": [node_to_plr_dict(child, has_model) for child in node.children], "parent_name": res.parent_instance_name, - **res.config, + "extra": res.extra, } if has_model: d["model"] = res.config.get("model", None) return d plr_resources = [] - trees = [] tracker = DeviceNodeResourceTracker() for tree in self.trees: name_to_uuid: Dict[str, str] = {} all_states: Dict[str, Any] = {} - collect_node_data(tree.root_node, name_to_uuid, all_states) + name_to_extra: Dict[str, dict] = {} + collect_node_data(tree.root_node, name_to_uuid, all_states, name_to_extra) has_model = tree.root_node.res_content.type != "deck" plr_dict = node_to_plr_dict(tree.root_node, has_model) try: sub_cls = find_subclass(plr_dict["type"], PLRResource) if sub_cls is None: - raise ValueError(f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}") + raise ValueError( + f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类。原始信息:{tree.root_node.res_content}" + ) spec = inspect.signature(sub_cls) if "category" not in spec.parameters: plr_dict.pop("category", None) plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) plr_resource.load_all_state(all_states) - # 使用 DeviceNodeResourceTracker 设置 UUID + # 使用 DeviceNodeResourceTracker 设置 UUID 和 Extra tracker.loop_set_uuid(plr_resource, name_to_uuid) + tracker.loop_set_extra(plr_resource, name_to_extra) plr_resources.append(plr_resource) except Exception as e: @@ -806,6 +839,20 @@ class DeviceNodeResourceTracker(object): else: setattr(resource, "unilabos_uuid", new_uuid) + @staticmethod + def set_resource_extra(resource, extra: dict): + """ + 设置资源的 extra,统一处理 dict 和 instance 两种类型 + + Args: + resource: 资源对象(dict或实例) + extra: extra字典值 + """ + if isinstance(resource, dict): + resource["extra"] = extra + else: + setattr(resource, "unilabos_extra", extra) + def _traverse_and_process(self, resource, process_func) -> int: """ 递归遍历资源树,对每个节点执行处理函数 @@ -854,6 +901,29 @@ class DeviceNodeResourceTracker(object): return self._traverse_and_process(resource, process) + def loop_set_extra(self, resource, name_to_extra_map: Dict[str, dict]) -> int: + """ + 递归遍历资源树,根据 name 设置所有节点的 extra + + Args: + resource: 资源对象(可以是dict或实例) + name_to_extra_map: name到extra的映射字典,{name: extra} + + Returns: + 更新的资源数量 + """ + + def process(res): + resource_name = self._get_resource_attr(res, "name") + if resource_name and resource_name in name_to_extra_map: + extra = name_to_extra_map[resource_name] + self.set_resource_extra(res, extra) + logger.debug(f"设置资源Extra: {resource_name} -> {extra}") + return 1 + return 0 + + return self._traverse_and_process(resource, process) + def loop_update_uuid(self, resource, uuid_map: Dict[str, str]) -> int: """ 递归遍历资源树,更新所有节点的uuid @@ -896,7 +966,9 @@ class DeviceNodeResourceTracker(object): if current_uuid: old = self.uuid_to_resources.get(current_uuid) self.uuid_to_resources[current_uuid] = res - logger.debug(f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}") + logger.debug( + f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}" + ) return 0 self._traverse_and_process(resource, process)