Merge branch 'dev' into prcix9320

This commit is contained in:
zhangshixiang
2026-01-15 17:40:25 +08:00
parent be054589b5
commit 269ce440d1
10 changed files with 312 additions and 172 deletions

View File

@@ -1,7 +1,7 @@
import inspect
import traceback
import uuid
from pydantic import BaseModel, field_serializer, field_validator
from pydantic import BaseModel, field_serializer, field_validator, ValidationError
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
@@ -147,20 +147,24 @@ class ResourceDictInstance(object):
if not content.get("extra"): # MagicCode
content["extra"] = {}
if "position" in content:
pose = content.get("pose",{})
if "position" not in pose :
pose = content.get("pose", {})
if "position" not in pose:
if "position" in content["position"]:
pose["position"] = content["position"]["position"]
else:
pose["position"] = {"x": 0, "y": 0, "z": 0}
if "size" not in pose:
pose["size"] = {
"width": content["config"].get("size_x", 0),
"height": content["config"].get("size_y", 0),
"depth": content["config"].get("size_z", 0)
"width": content["config"].get("size_x", 0),
"height": content["config"].get("size_y", 0),
"depth": content["config"].get("size_z", 0),
}
content["pose"] = pose
return ResourceDictInstance(ResourceDict.model_validate(content))
try:
res_dict = ResourceDict.model_validate(content)
return ResourceDictInstance(res_dict)
except ValidationError as err:
raise err
def get_plr_nested_dict(self) -> Dict[str, Any]:
"""获取资源实例的嵌套字典表示"""
@@ -322,7 +326,7 @@ class ResourceTreeSet(object):
)
@classmethod
def from_plr_resources(cls, resources: List["PLRResource"]) -> "ResourceTreeSet":
def from_plr_resources(cls, resources: List["PLRResource"], known_newly_created=False) -> "ResourceTreeSet":
"""
从plr资源创建ResourceTreeSet
"""
@@ -339,6 +343,8 @@ class ResourceTreeSet(object):
}
if source in replace_info:
return replace_info[source]
elif source is None:
return ""
else:
print("转换pylabrobot的时候出现未知类型", source)
return source
@@ -349,7 +355,8 @@ class ResourceTreeSet(object):
if not uid:
uid = str(uuid.uuid4())
res.unilabos_uuid = uid
logger.warning(f"{res}没有uuid请设置后再传入默认填充{uid}\n{traceback.format_exc()}")
if not known_newly_created:
logger.warning(f"{res}没有uuid请设置后再传入默认填充{uid}\n{traceback.format_exc()}")
# 获取unilabos_extra默认为空字典
extra = getattr(res, "unilabos_extra", {})
@@ -448,7 +455,13 @@ class ResourceTreeSet(object):
from pylabrobot.utils.object_parsing import find_subclass
# 类型映射
TYPE_MAP = {"plate": "Plate", "well": "Well", "deck": "Deck", "container": "RegularContainer", "tip_spot": "TipSpot"}
TYPE_MAP = {
"plate": "Plate",
"well": "Well",
"deck": "Deck",
"container": "RegularContainer",
"tip_spot": "TipSpot",
}
def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict, name_to_extra: dict):
"""一次遍历收集 name_to_uuid, all_states 和 name_to_extra"""
@@ -918,6 +931,33 @@ class DeviceNodeResourceTracker(object):
return self._traverse_and_process(resource, process)
def loop_find_with_uuid(self, resource, target_uuid: str):
"""
递归遍历资源树,根据 uuid 查找并返回对应的资源
Args:
resource: 资源对象可以是list、dict或实例
target_uuid: 要查找的uuid
Returns:
找到的资源对象未找到则返回None
"""
found_resource = None
def process(res):
nonlocal found_resource
if found_resource is not None:
return 0 # 已找到,跳过后续处理
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid and current_uuid == target_uuid:
found_resource = res
logger.trace(f"找到资源UUID: {target_uuid}")
return 1
return 0
self._traverse_and_process(resource, process)
return found_resource
def loop_set_extra(self, resource, name_to_extra_map: Dict[str, dict]) -> int:
"""
递归遍历资源树,根据 name 设置所有节点的 extra
@@ -1103,7 +1143,7 @@ class DeviceNodeResourceTracker(object):
for key in keys_to_remove:
self.resource2parent_resource.pop(key, None)
logger.debug(f"成功移除资源: {resource}")
logger.trace(f"[ResourceTracker] 成功移除资源: {resource}")
return True
def clear_resource(self):