mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 13:01:12 +00:00
847 lines
33 KiB
Python
847 lines
33 KiB
Python
import uuid
|
||
from pydantic import BaseModel, field_serializer, field_validator
|
||
from pydantic import Field
|
||
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING
|
||
|
||
from unilabos.utils.log import logger
|
||
|
||
if TYPE_CHECKING:
|
||
from unilabos.devices.workstation.workstation_base import WorkstationBase
|
||
from pylabrobot.resources import Resource as PLRResource
|
||
|
||
|
||
class ResourceDictPositionSize(BaseModel):
|
||
depth: float = Field(description="Depth", default=0.0)
|
||
width: float = Field(description="Width", default=0.0)
|
||
height: float = Field(description="Height", default=0.0)
|
||
|
||
|
||
class ResourceDictPositionScale(BaseModel):
|
||
x: float = Field(description="x scale", default=0.0)
|
||
y: float = Field(description="y scale", default=0.0)
|
||
z: float = Field(description="z scale", default=0.0)
|
||
|
||
|
||
class ResourceDictPositionObject(BaseModel):
|
||
x: float = Field(description="X coordinate", default=0.0)
|
||
y: float = Field(description="Y coordinate", default=0.0)
|
||
z: float = Field(description="Z coordinate", default=0.0)
|
||
|
||
|
||
class ResourceDictPosition(BaseModel):
|
||
size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize)
|
||
scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale)
|
||
layout: Literal["2d"] = Field(description="Resource layout", default="2d")
|
||
position: ResourceDictPositionObject = Field(
|
||
description="Resource position", default_factory=ResourceDictPositionObject
|
||
)
|
||
position3d: ResourceDictPositionObject = Field(
|
||
description="Resource position in 3D space", default_factory=ResourceDictPositionObject
|
||
)
|
||
rotation: ResourceDictPositionObject = Field(
|
||
description="Resource rotation", default_factory=ResourceDictPositionObject
|
||
)
|
||
|
||
|
||
# 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化
|
||
class ResourceDict(BaseModel):
|
||
id: str = Field(description="Resource ID")
|
||
uuid: str = Field(description="Resource UUID")
|
||
name: str = Field(description="Resource name")
|
||
description: str = Field(description="Resource description", default="")
|
||
schema: Dict[str, Any] = Field(description="Resource schema", default_factory=dict)
|
||
model: Dict[str, Any] = Field(description="Resource model", default_factory=dict)
|
||
icon: str = Field(description="Resource icon", default="")
|
||
parent: Optional["ResourceDict"] = Field(
|
||
description="Parent resource object", default=None, serialization_alias="parent_uuid"
|
||
)
|
||
type: Literal["device"] | str = Field(description="Resource type")
|
||
klass: str = Field(alias="class", description="Resource class name")
|
||
position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
|
||
config: Dict[str, Any] = Field(description="Resource configuration")
|
||
data: Dict[str, Any] = Field(description="Resource data")
|
||
|
||
@field_serializer("parent")
|
||
def _serialize_parent(self, parent: Optional["ResourceDict"]):
|
||
return self.parent_uuid
|
||
|
||
@field_validator("parent", mode="before")
|
||
@classmethod
|
||
def _deserialize_parent(cls, parent: Optional["ResourceDict"]):
|
||
if isinstance(parent, ResourceDict):
|
||
return parent
|
||
else:
|
||
return None
|
||
|
||
@property
|
||
def parent_uuid(self) -> str:
|
||
"""获取父节点的UUID"""
|
||
return self.parent.uuid if self.parent is not None else ""
|
||
|
||
@property
|
||
def parent_name(self) -> Optional[str]:
|
||
"""获取父节点的UUID"""
|
||
return self.parent.name if self.parent is not None else None
|
||
|
||
@property
|
||
def is_root_node(self) -> bool:
|
||
"""判断资源是否为根节点"""
|
||
return self.parent is None
|
||
|
||
|
||
class GraphData(BaseModel):
|
||
"""图数据结构,包含节点和边"""
|
||
|
||
nodes: List["ResourceTreeInstance"] = Field(description="Resource nodes list", default_factory=list)
|
||
links: List[Dict[str, Any]] = Field(description="Resource links/edges list", default_factory=list)
|
||
|
||
|
||
class ResourceDictInstance(object):
|
||
"""ResourceDict的实例,同时提供一些方法"""
|
||
|
||
def __init__(self, res_content: "ResourceDict"):
|
||
self.res_content = res_content
|
||
self.children = []
|
||
self.typ = "dict"
|
||
|
||
@classmethod
|
||
def get_resource_instance_from_dict(cls, content: Dict[str, Any]) -> "ResourceDictInstance":
|
||
"""从字典创建资源实例"""
|
||
if "id" not in content:
|
||
content["id"] = content["name"]
|
||
if "uuid" not in content:
|
||
content["uuid"] = str(uuid.uuid4())
|
||
if "description" in content and content["description"] is None:
|
||
del content["description"]
|
||
if "model" in content and content["model"] is None:
|
||
del content["model"]
|
||
if "schema" in content and content["schema"] is None:
|
||
del content["schema"]
|
||
if "x" in content.get("position", {}):
|
||
# 说明是老版本的position格式,转换成新的
|
||
content["position"] = {"position": content["position"]}
|
||
if not content.get("class"):
|
||
content["class"] = ""
|
||
if not content.get("config"): # todo: 后续从后端保证字段非空
|
||
content["config"] = {}
|
||
if not content.get("data"):
|
||
content["data"] = {}
|
||
return ResourceDictInstance(ResourceDict.model_validate(content))
|
||
|
||
def get_nested_dict(self) -> Dict[str, Any]:
|
||
"""获取资源实例的嵌套字典表示"""
|
||
res_dict = self.res_content.model_dump(by_alias=True)
|
||
res_dict["children"] = {child.res_content.name: child.get_nested_dict() for child in self.children}
|
||
res_dict["parent"] = self.res_content.parent_name
|
||
res_dict["position"] = self.res_content.position.position.model_dump()
|
||
return res_dict
|
||
|
||
|
||
class ResourceTreeInstance(object):
|
||
"""
|
||
资源树,表示一个根节点及其所有子节点的层次结构,继承ResourceDictInstance表示自己是根节点
|
||
"""
|
||
|
||
@staticmethod
|
||
def _build_uuid_map(resource_list: List[ResourceDictInstance]) -> Dict[str, ResourceDictInstance]:
|
||
"""构建uuid到资源对象的映射,并检查重复"""
|
||
uuid_map: Dict[str, ResourceDictInstance] = {}
|
||
for res_instance in resource_list:
|
||
res = res_instance.res_content
|
||
if res.uuid in uuid_map:
|
||
raise ValueError(f"发现重复的uuid: {res.uuid}")
|
||
uuid_map[res.uuid] = res_instance
|
||
return uuid_map
|
||
|
||
@staticmethod
|
||
def _build_uuid_instance_map(
|
||
resource_list: List[ResourceDictInstance],
|
||
) -> Dict[str, ResourceDictInstance]:
|
||
"""构建uuid到资源实例的映射"""
|
||
return {res_instance.res_content.uuid: res_instance for res_instance in resource_list}
|
||
|
||
@staticmethod
|
||
def _collect_tree_nodes(
|
||
root_instance: ResourceDictInstance, uuid_map: Dict[str, ResourceDict]
|
||
) -> List[ResourceDictInstance]:
|
||
"""使用BFS收集属于某个根节点的所有节点"""
|
||
# BFS遍历,根据parent_uuid字段找到所有属于这棵树的节点
|
||
tree_nodes = [root_instance]
|
||
visited = {root_instance.res_content.uuid}
|
||
queue = [root_instance.res_content.uuid]
|
||
|
||
while queue:
|
||
current_uuid = queue.pop(0)
|
||
# 查找所有parent_uuid指向当前节点的子节点
|
||
for uuid_str, res in uuid_map.items():
|
||
if res.parent_uuid == current_uuid and uuid_str not in visited:
|
||
child_instance = ResourceDictInstance(res)
|
||
tree_nodes.append(child_instance)
|
||
visited.add(uuid_str)
|
||
queue.append(uuid_str)
|
||
|
||
return tree_nodes
|
||
|
||
def __init__(self, resource: ResourceDictInstance):
|
||
self.root_node = resource
|
||
self._validate_tree()
|
||
|
||
def _validate_tree(self):
|
||
"""
|
||
验证树结构的一致性
|
||
- 验证uuid唯一性
|
||
- 验证parent-children关系一致性
|
||
|
||
Raises:
|
||
ValueError: 当发现不一致时
|
||
"""
|
||
known_uuids: set = set()
|
||
|
||
def validate_node(node: ResourceDictInstance):
|
||
# 检查uuid唯一性
|
||
if node.res_content.uuid in known_uuids:
|
||
raise ValueError(f"发现重复的uuid: {node.res_content.uuid}")
|
||
if node.res_content.uuid:
|
||
known_uuids.add(node.res_content.uuid)
|
||
else:
|
||
print(f"警告: 资源 {node.res_content.id} 没有uuid")
|
||
|
||
# 验证并递归处理子节点
|
||
for child in node.children:
|
||
if child.res_content.parent != node.res_content:
|
||
parent_id = child.res_content.parent.id if child.res_content.parent else None
|
||
raise ValueError(
|
||
f"节点 {child.res_content.id} 的parent引用不正确,应该指向 {node.res_content.id},但实际指向 {parent_id}"
|
||
)
|
||
validate_node(child)
|
||
|
||
validate_node(self.root_node)
|
||
|
||
def get_all_nodes(self) -> List[ResourceDictInstance]:
|
||
"""
|
||
获取树中的所有节点(深度优先遍历)
|
||
|
||
Returns:
|
||
所有节点的资源实例列表
|
||
"""
|
||
nodes = []
|
||
|
||
def collect_nodes(node: ResourceDictInstance):
|
||
nodes.append(node)
|
||
for child in node.children:
|
||
collect_nodes(child)
|
||
|
||
collect_nodes(self.root_node)
|
||
return nodes
|
||
|
||
def find_by_uuid(self, target_uuid: str) -> Optional[ResourceDictInstance]:
|
||
"""
|
||
通过uuid查找节点
|
||
|
||
Args:
|
||
target_uuid: 目标uuid
|
||
|
||
Returns:
|
||
找到的节点资源实例,如果没找到返回None
|
||
"""
|
||
|
||
def search(node: ResourceDictInstance) -> Optional[ResourceDictInstance]:
|
||
if node.res_content.uuid == target_uuid:
|
||
return node
|
||
for child in node.children:
|
||
res = search(child)
|
||
if res:
|
||
return res
|
||
return None
|
||
|
||
result = search(self.root_node)
|
||
return result
|
||
|
||
|
||
class ResourceTreeSet(object):
|
||
"""
|
||
多个根节点的resource集合,包含多个ResourceTree
|
||
"""
|
||
|
||
def __init__(self, resource_list: List[List[ResourceDictInstance]] | List[ResourceTreeInstance]):
|
||
"""
|
||
初始化资源树集合
|
||
|
||
Args:
|
||
resource_list: 可以是以下两种类型之一:
|
||
- List[ResourceTree]: 已经构建好的树列表
|
||
- List[List[ResourceInstanceDict]]: 嵌套列表,每个内部列表代表一棵树
|
||
|
||
Raises:
|
||
TypeError: 当传入不支持的类型时
|
||
"""
|
||
if not resource_list:
|
||
self.trees: List[ResourceTreeInstance] = []
|
||
elif isinstance(resource_list[0], ResourceTreeInstance):
|
||
# 已经是ResourceTree列表
|
||
self.trees = cast(List[ResourceTreeInstance], resource_list)
|
||
elif isinstance(resource_list[0], list):
|
||
pass
|
||
else:
|
||
raise TypeError(
|
||
f"不支持的类型: {type(resource_list[0])}。"
|
||
f"ResourceTreeSet 只接受 List[ResourceTree] 或 List[List[ResourceInstanceDict]]"
|
||
)
|
||
|
||
@classmethod
|
||
def from_plr_resources(cls, resources: List["PLRResource"]) -> "ResourceTreeSet":
|
||
"""
|
||
从plr资源创建ResourceTreeSet
|
||
"""
|
||
|
||
def replace_plr_type(source: str):
|
||
replace_info = {
|
||
"plate": "plate",
|
||
"well": "well",
|
||
"tip_spot": "container",
|
||
"trash": "container",
|
||
"deck": "deck",
|
||
"tip_rack": "container",
|
||
}
|
||
if source in replace_info:
|
||
return replace_info[source]
|
||
else:
|
||
print("转换pylabrobot的时候,出现未知类型", source)
|
||
return "container"
|
||
|
||
def build_uuid_mapping(res: "PLRResource", uuid_list: list):
|
||
"""递归构建uuid映射字典"""
|
||
uuid_list.append(getattr(res, "unilabos_uuid", ""))
|
||
for child in res.children:
|
||
build_uuid_mapping(child, uuid_list)
|
||
|
||
def resource_plr_inner(
|
||
d: dict, parent_resource: Optional[ResourceDict], states: dict, uuids: list
|
||
) -> ResourceDictInstance:
|
||
current_uuid = uuids.pop(0)
|
||
|
||
# 先构建当前节点的字典(不包含children)
|
||
r_dict = {
|
||
"id": d["name"],
|
||
"uuid": current_uuid,
|
||
"name": d["name"],
|
||
"parent": parent_resource, # 直接传入 ResourceDict 对象
|
||
"type": replace_plr_type(d.get("category", "")),
|
||
"class": d.get("class", ""),
|
||
"position": (
|
||
{"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]}
|
||
if d["location"]
|
||
else {"x": 0, "y": 0, "z": 0}
|
||
),
|
||
"config": {k: v for k, v in d.items() if k not in ["name", "children", "parent_name", "location"]},
|
||
"data": states[d["name"]],
|
||
}
|
||
|
||
# 先转换为 ResourceDictInstance,获取其中的 ResourceDict
|
||
current_instance = ResourceDictInstance.get_resource_instance_from_dict(r_dict)
|
||
current_resource = current_instance.res_content
|
||
|
||
# 递归处理子节点,传入当前节点的 ResourceDict 作为 parent
|
||
current_instance.children = [
|
||
resource_plr_inner(child, current_resource, states, uuids) for child in d["children"]
|
||
]
|
||
|
||
return current_instance
|
||
|
||
trees = []
|
||
for resource in resources:
|
||
# 构建uuid列表
|
||
uuid_list = []
|
||
build_uuid_mapping(resource, uuid_list)
|
||
|
||
serialized_data = resource.serialize()
|
||
all_states = resource.serialize_all_state()
|
||
|
||
# 根节点没有父节点,传入 None
|
||
root_instance = resource_plr_inner(serialized_data, None, all_states, uuid_list)
|
||
tree_instance = ResourceTreeInstance(root_instance)
|
||
trees.append(tree_instance)
|
||
return cls(trees)
|
||
|
||
def to_plr_resources(self) -> Tuple[List["PLRResource"], List[Dict[str, str]]]:
|
||
"""
|
||
将 ResourceTreeSet 转换为 PLR 资源列表
|
||
|
||
Returns:
|
||
Tuple[List[PLRResource], List[Dict[str, str]]]:
|
||
- PLR 资源实例列表
|
||
- 每个资源对应的 name_to_uuid 映射字典列表
|
||
"""
|
||
from pylabrobot.resources import Resource as PLRResource
|
||
from pylabrobot.utils.object_parsing import find_subclass
|
||
import inspect
|
||
|
||
# 类型映射
|
||
TYPE_MAP = {"plate": "plate", "well": "well", "container": "tip_spot", "deck": "deck", "tip_rack": "tip_rack"}
|
||
|
||
def collect_node_data(node: ResourceDictInstance, name_to_uuid: dict, all_states: dict):
|
||
"""一次遍历收集 name_to_uuid 和 all_states"""
|
||
name_to_uuid[node.res_content.name] = node.res_content.uuid
|
||
all_states[node.res_content.name] = node.res_content.data
|
||
for child in node.children:
|
||
collect_node_data(child, name_to_uuid, all_states)
|
||
|
||
def node_to_plr_dict(node: ResourceDictInstance, has_model: bool):
|
||
"""转换节点为 PLR 字典格式"""
|
||
res = node.res_content
|
||
plr_type = TYPE_MAP.get(res.type, "tip_spot")
|
||
if res.type not in TYPE_MAP:
|
||
logger.warning(f"未知类型 {res.type},使用默认类型 tip_spot")
|
||
|
||
d = {
|
||
"name": res.name,
|
||
"type": plr_type,
|
||
"size_x": res.config.get("size_x", 0),
|
||
"size_y": res.config.get("size_y", 0),
|
||
"size_z": res.config.get("size_z", 0),
|
||
"location": {
|
||
"x": res.position.position.x,
|
||
"y": res.position.position.y,
|
||
"z": res.position.position.z,
|
||
"type": "Coordinate",
|
||
},
|
||
"rotation": {"x": 0, "y": 0, "z": 0, "type": "Rotation"},
|
||
"category": plr_type,
|
||
"children": [node_to_plr_dict(child, has_model) for child in node.children],
|
||
"parent_name": res.parent_name,
|
||
**res.config,
|
||
}
|
||
if has_model:
|
||
d["model"] = res.config.get("model", None)
|
||
return d
|
||
|
||
plr_resources = []
|
||
name_to_uuid_maps = []
|
||
tracker = DeviceNodeResourceTracker()
|
||
|
||
for tree in self.trees:
|
||
name_to_uuid: Dict[str, str] = {}
|
||
all_states: Dict[str, Any] = {}
|
||
collect_node_data(tree.root_node, name_to_uuid, all_states)
|
||
|
||
has_model = tree.root_node.res_content.type != "deck"
|
||
plr_dict = node_to_plr_dict(tree.root_node, has_model)
|
||
|
||
try:
|
||
sub_cls = find_subclass(plr_dict["type"], PLRResource)
|
||
if sub_cls is None:
|
||
raise ValueError(f"无法找到类型 {plr_dict['type']} 对应的 PLR 资源类")
|
||
|
||
spec = inspect.signature(sub_cls)
|
||
if "category" not in spec.parameters:
|
||
plr_dict.pop("category", None)
|
||
|
||
plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True)
|
||
plr_resource.load_all_state(all_states)
|
||
|
||
# 使用 DeviceNodeResourceTracker 设置 UUID
|
||
tracker.loop_set_uuid(plr_resource, name_to_uuid)
|
||
|
||
plr_resources.append(plr_resource)
|
||
name_to_uuid_maps.append(name_to_uuid)
|
||
|
||
except Exception as e:
|
||
logger.error(f"转换 PLR 资源失败: {e}")
|
||
import traceback
|
||
|
||
logger.error(f"堆栈: {traceback.format_exc()}")
|
||
raise
|
||
|
||
return plr_resources, name_to_uuid_maps
|
||
|
||
@classmethod
|
||
def from_nested_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet":
|
||
"""
|
||
从扁平化的资源列表创建ResourceTreeSet,自动按根节点分组
|
||
|
||
Args:
|
||
nested_list: 扁平化的资源实例列表,可能包含多个根节点
|
||
|
||
Returns:
|
||
ResourceTreeSet实例
|
||
|
||
Raises:
|
||
ValueError: 当没有找到任何根节点时
|
||
"""
|
||
# 找到所有根节点
|
||
known_uuids = {res_instance.res_content.uuid for res_instance in nested_list}
|
||
root_instances = [
|
||
ResourceTreeInstance(res_instance)
|
||
for res_instance in nested_list
|
||
if res_instance.res_content.is_root_node or res_instance.res_content.parent_uuid not in known_uuids
|
||
]
|
||
return cls(root_instances)
|
||
|
||
@property
|
||
def root_nodes(self) -> List[ResourceDictInstance]:
|
||
"""
|
||
获取所有树的根节点
|
||
|
||
Returns:
|
||
所有根节点的资源实例列表
|
||
"""
|
||
return [tree.root_node for tree in self.trees]
|
||
|
||
@property
|
||
def all_nodes(self) -> List[ResourceDictInstance]:
|
||
"""
|
||
获取所有树中的所有节点
|
||
|
||
Returns:
|
||
所有节点的资源实例列表
|
||
"""
|
||
return [node for tree in self.trees for node in tree.get_all_nodes()]
|
||
|
||
def find_by_uuid(self, target_uuid: str) -> Optional[ResourceDictInstance]:
|
||
"""
|
||
在所有树中通过uuid查找节点
|
||
|
||
Args:
|
||
target_uuid: 目标uuid
|
||
|
||
Returns:
|
||
找到的节点资源实例,如果没找到返回None
|
||
"""
|
||
for tree in self.trees:
|
||
result = tree.find_by_uuid(target_uuid)
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
def dump(self) -> List[List[Dict[str, Any]]]:
|
||
"""
|
||
将 ResourceTreeSet 序列化为嵌套列表格式
|
||
|
||
序列化时:
|
||
- parent 自动转换为 parent_uuid(在 ResourceDict.model_dump 中处理)
|
||
- children 不会被序列化(exclude=True)
|
||
|
||
Returns:
|
||
List[List[Dict]]: 每个内层列表代表一棵树的扁平化资源字典列表
|
||
"""
|
||
result = []
|
||
for tree in self.trees:
|
||
# 获取树的所有节点并序列化
|
||
tree_nodes = [node.res_content.model_dump(by_alias=True) for node in tree.get_all_nodes()]
|
||
result.append(tree_nodes)
|
||
return result
|
||
|
||
@classmethod
|
||
def load(cls, data: List[List[Dict[str, Any]]]) -> "ResourceTreeSet":
|
||
"""
|
||
从序列化的嵌套列表格式反序列化为 ResourceTreeSet
|
||
|
||
Args:
|
||
data: List[List[Dict]]: 序列化的数据,每个内层列表代表一棵树
|
||
|
||
Returns:
|
||
ResourceTreeSet: 反序列化后的资源树集合
|
||
"""
|
||
# 将每个字典转换为 ResourceInstanceDict
|
||
# FIXME: 需要重新确定parent关系
|
||
nested_lists = []
|
||
for tree_data in data:
|
||
flatten_instances = [
|
||
ResourceDictInstance.get_resource_instance_from_dict(node_dict) for node_dict in tree_data
|
||
]
|
||
nested_lists.append(flatten_instances)
|
||
|
||
# 使用现有的构造函数创建 ResourceTreeSet
|
||
return cls(nested_lists)
|
||
|
||
|
||
class DeviceNodeResourceTracker(object):
|
||
|
||
def __init__(self):
|
||
self.resources = []
|
||
self.resource2parent_resource = {}
|
||
self.uuid_to_resources = {}
|
||
pass
|
||
|
||
def prefix_path(self, resource):
|
||
resource_prefix_path = "/"
|
||
resource_parent = getattr(resource, "parent", None)
|
||
while resource_parent is not None:
|
||
resource_prefix_path = f"/{resource_parent.name}" + resource_prefix_path
|
||
resource_parent = resource_parent.parent
|
||
|
||
return resource_prefix_path
|
||
|
||
def map_uuid_to_resource(self, resource, uuid_map: Dict[str, str]):
|
||
for old_uuid, new_uuid in uuid_map.items():
|
||
if old_uuid != new_uuid:
|
||
if old_uuid in self.uuid_to_resources:
|
||
instance = self.uuid_to_resources.pop(old_uuid)
|
||
if isinstance(resource, dict):
|
||
resource["uuid"] = new_uuid
|
||
else: # 实例的
|
||
setattr(instance, "unilabos_uuid", new_uuid)
|
||
self.uuid_to_resources[new_uuid] = instance
|
||
print(f"更新uuid映射: {old_uuid} -> {new_uuid} | {instance}")
|
||
|
||
def loop_set_uuid(self, resource, name_to_uuid_map: Dict[str, str]) -> int:
|
||
"""
|
||
递归遍历资源树,根据 name 设置所有节点的 uuid
|
||
|
||
Args:
|
||
resource: 资源对象(可以是dict或实例)
|
||
name_to_uuid_map: name到uuid的映射字典,{name: uuid}
|
||
|
||
Returns:
|
||
更新的资源数量
|
||
"""
|
||
if isinstance(resource, list):
|
||
return sum(self.loop_set_uuid(r, name_to_uuid_map) for r in resource)
|
||
|
||
update_count = 0
|
||
|
||
# 先递归处理所有子节点
|
||
children = getattr(resource, "children", [])
|
||
for child in children:
|
||
update_count += self.loop_set_uuid(child, name_to_uuid_map)
|
||
|
||
# 获取当前资源的name
|
||
if isinstance(resource, dict):
|
||
resource_name = resource.get("name")
|
||
else:
|
||
resource_name = getattr(resource, "name", None)
|
||
|
||
# 如果name在映射中,则设置uuid
|
||
if resource_name and resource_name in name_to_uuid_map:
|
||
new_uuid = name_to_uuid_map[resource_name]
|
||
# 更新资源的uuid
|
||
if isinstance(resource, dict):
|
||
resource["uuid"] = new_uuid
|
||
else:
|
||
# 对于PLR资源,设置unilabos_uuid
|
||
setattr(resource, "unilabos_uuid", new_uuid)
|
||
self.uuid_to_resources[new_uuid] = resource
|
||
update_count += 1
|
||
logger.debug(f"设置资源UUID: {resource_name} -> {new_uuid}")
|
||
|
||
return update_count
|
||
|
||
def loop_update_uuid(self, resource, uuid_map: Dict[str, str]) -> int:
|
||
"""
|
||
递归遍历资源树,更新所有节点的uuid
|
||
|
||
Args:
|
||
resource: 资源对象(可以是dict或实例)
|
||
uuid_map: uuid映射字典,{old_uuid: new_uuid}
|
||
|
||
Returns:
|
||
更新的资源数量
|
||
"""
|
||
if isinstance(resource, list):
|
||
return sum(self.loop_update_uuid(r, uuid_map) for r in resource)
|
||
|
||
update_count = 0
|
||
|
||
# 先递归处理所有子节点
|
||
children = getattr(resource, "children", [])
|
||
for child in children:
|
||
update_count += self.loop_update_uuid(child, uuid_map)
|
||
|
||
# 获取当前资源的uuid
|
||
if isinstance(resource, dict):
|
||
current_uuid = resource.get("uuid")
|
||
else:
|
||
current_uuid = getattr(resource, "unilabos_uuid", None)
|
||
|
||
# 如果当前uuid在映射中,则更新
|
||
if current_uuid and current_uuid in uuid_map:
|
||
new_uuid = uuid_map[current_uuid]
|
||
if current_uuid != new_uuid:
|
||
# 更新资源的uuid
|
||
if isinstance(resource, dict):
|
||
resource["uuid"] = new_uuid
|
||
else:
|
||
# 对于PLR资源,更新unilabos_uuid
|
||
if hasattr(resource, "unilabos_uuid"):
|
||
setattr(resource, "unilabos_uuid", new_uuid)
|
||
|
||
# 更新uuid_to_resources映射
|
||
if current_uuid in self.uuid_to_resources:
|
||
instance = self.uuid_to_resources.pop(current_uuid)
|
||
self.uuid_to_resources[new_uuid] = instance
|
||
|
||
update_count += 1
|
||
logger.debug(f"更新uuid: {current_uuid} -> {new_uuid} | {resource}")
|
||
|
||
return update_count
|
||
|
||
def parent_resource(self, resource):
|
||
if id(resource) in self.resource2parent_resource:
|
||
return self.resource2parent_resource[id(resource)]
|
||
else:
|
||
return resource
|
||
|
||
def add_resource(self, resource):
|
||
for r in self.resources:
|
||
if id(r) == id(resource):
|
||
return
|
||
self.resources.append(resource)
|
||
|
||
def clear_resource(self):
|
||
self.resources = []
|
||
|
||
def figure_resource(self, query_resource, try_mode=False):
|
||
if isinstance(query_resource, list):
|
||
return [self.figure_resource(r, try_mode) for r in query_resource]
|
||
elif (
|
||
isinstance(query_resource, dict) and "id" not in query_resource and "name" not in query_resource
|
||
): # 临时处理,要删除的,driver有太多类型错误标注
|
||
return [self.figure_resource(r, try_mode) for r in query_resource.values()]
|
||
res_id = (
|
||
query_resource.id # type: ignore
|
||
if hasattr(query_resource, "id")
|
||
else (query_resource.get("id") if isinstance(query_resource, dict) else None)
|
||
)
|
||
res_name = (
|
||
query_resource.name # type: ignore
|
||
if hasattr(query_resource, "name")
|
||
else (query_resource.get("name") if isinstance(query_resource, dict) else None)
|
||
)
|
||
res_identifier = res_id if res_id else res_name
|
||
identifier_key = "id" if res_id else "name"
|
||
resource_cls_type = type(query_resource)
|
||
if res_identifier is None:
|
||
logger.warning(f"resource {query_resource} 没有id或name,暂不能对应figure")
|
||
res_list = []
|
||
for r in self.resources:
|
||
if isinstance(query_resource, dict):
|
||
res_list.extend(self.loop_find_resource(r, object, identifier_key, query_resource[identifier_key]))
|
||
else:
|
||
res_list.extend(
|
||
self.loop_find_resource(
|
||
r, resource_cls_type, identifier_key, getattr(query_resource, identifier_key)
|
||
)
|
||
)
|
||
if not try_mode:
|
||
assert len(res_list) > 0, f"没有找到资源 {query_resource},请检查资源是否存在"
|
||
assert len(res_list) == 1, f"{query_resource} 找到多个资源,请检查资源是否唯一: {res_list}"
|
||
else:
|
||
return [i[1] for i in res_list]
|
||
# 后续加入其他对比方式
|
||
self.resource2parent_resource[id(query_resource)] = res_list[0][0]
|
||
self.resource2parent_resource[id(res_list[0][1])] = res_list[0][0]
|
||
return res_list[0][1]
|
||
|
||
def loop_find_resource(
|
||
self, resource, target_resource_cls_type, identifier_key, compare_value, parent_res=None
|
||
) -> List[Tuple[Any, Any]]:
|
||
res_list = []
|
||
# print(resource, target_resource_cls_type, identifier_key, compare_value)
|
||
children = getattr(resource, "children", [])
|
||
for child in children:
|
||
res_list.extend(
|
||
self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource)
|
||
)
|
||
if issubclass(type(resource), target_resource_cls_type):
|
||
if target_resource_cls_type == dict:
|
||
if identifier_key in resource:
|
||
if resource[identifier_key] == compare_value:
|
||
res_list.append((parent_res, resource))
|
||
elif hasattr(resource, identifier_key):
|
||
if getattr(resource, identifier_key) == compare_value:
|
||
res_list.append((parent_res, resource))
|
||
return res_list
|
||
|
||
def filter_find_list(self, res_list, compare_std_dict):
|
||
new_list = []
|
||
for res in res_list:
|
||
for k, v in compare_std_dict.items():
|
||
if hasattr(res, k):
|
||
if getattr(res, k) == v:
|
||
new_list.append(res)
|
||
return new_list
|
||
|
||
|
||
if __name__ == "__main__":
|
||
from pylabrobot.resources import corning_6_wellplate_16point8ml_flat
|
||
|
||
# 测试 from_plr_resources 和 to_plr_resources 的往返转换
|
||
print("=" * 60)
|
||
print("测试 PLR 资源转换往返")
|
||
print("=" * 60)
|
||
|
||
# 1. 创建一个 PLR 资源并设置 UUID
|
||
original_plate = corning_6_wellplate_16point8ml_flat("test_plate")
|
||
|
||
# 使用 DeviceNodeResourceTracker 设置 UUID
|
||
tracker = DeviceNodeResourceTracker()
|
||
name_to_uuid = {}
|
||
|
||
# 递归生成 name_to_uuid 映射
|
||
def build_uuid_map(resource):
|
||
name_to_uuid[resource.name] = str(uuid.uuid4())
|
||
for child in resource.children:
|
||
build_uuid_map(child)
|
||
|
||
build_uuid_map(original_plate)
|
||
|
||
# 使用 tracker 的 loop_set_uuid 方法设置 UUID
|
||
tracker.loop_set_uuid(original_plate, name_to_uuid)
|
||
|
||
print(f"\n1. 原始 PLR 资源: {original_plate.name}")
|
||
print(f" - UUID: {getattr(original_plate, 'unilabos_uuid', 'N/A')}")
|
||
print(f" - 子节点数量: {len(original_plate.children)}")
|
||
if original_plate.children:
|
||
print(f" - 第一个子节点: {original_plate.children[0].name}")
|
||
print(f" - 第一个子节点 UUID: {getattr(original_plate.children[0], 'unilabos_uuid', 'N/A')}")
|
||
|
||
# 2. 将 PLR 资源转换为 ResourceTreeSet
|
||
resource_tree_set = ResourceTreeSet.from_plr_resources([original_plate])
|
||
print(f"\n2. 转换为 ResourceTreeSet:")
|
||
print(f" - 树的数量: {len(resource_tree_set.trees)}")
|
||
print(f" - 根节点: {resource_tree_set.root_nodes[0].res_content.name}")
|
||
print(f" - 所有节点数量: {len(resource_tree_set.all_nodes)}")
|
||
|
||
# 3. 将 ResourceTreeSet 转换回 PLR 资源
|
||
plr_resources, name_to_uuid_maps = resource_tree_set.to_plr_resources()
|
||
converted_plate = plr_resources[0]
|
||
print(f"\n3. 转换回 PLR 资源: {converted_plate.name}")
|
||
print(f" - 子节点数量: {len(converted_plate.children)}")
|
||
if converted_plate.children:
|
||
print(f" - 第一个子节点: {converted_plate.children[0].name}")
|
||
|
||
# 4. 验证 UUID 映射
|
||
name_to_uuid = name_to_uuid_maps[0]
|
||
print(f"\n4. UUID 映射:")
|
||
print(f" - 映射条目数: {len(name_to_uuid)}")
|
||
print(f" - 示例映射: {list(name_to_uuid.items())[:3]}")
|
||
|
||
# 5. 验证 unilabos_uuid 属性
|
||
print(f"\n5. 验证 unilabos_uuid 设置:")
|
||
if hasattr(converted_plate, "unilabos_uuid"):
|
||
print(f" - 根节点 UUID: {getattr(converted_plate, 'unilabos_uuid')}")
|
||
if converted_plate.children and hasattr(converted_plate.children[0], "unilabos_uuid"):
|
||
print(f" - 第一个子节点 UUID: {getattr(converted_plate.children[0], 'unilabos_uuid')}")
|
||
else:
|
||
print(" - 警告: unilabos_uuid 未设置")
|
||
|
||
# 6. 验证 UUID 保持不变
|
||
print(f"\n6. 验证 UUID 在往返过程中保持不变:")
|
||
original_uuid = getattr(original_plate, "unilabos_uuid")
|
||
converted_uuid = getattr(converted_plate, "unilabos_uuid")
|
||
print(f" - 原始 UUID: {original_uuid}")
|
||
print(f" - 转换后 UUID: {converted_uuid}")
|
||
print(f" - UUID 保持不变: {original_uuid == converted_uuid}")
|
||
|
||
# 7. 再次往返转换,验证稳定性
|
||
resource_tree_set_2 = ResourceTreeSet.from_plr_resources([converted_plate])
|
||
plr_resources_2, name_to_uuid_maps_2 = resource_tree_set_2.to_plr_resources()
|
||
print(f"\n7. 第二次往返转换:")
|
||
print(f" - 资源名称: {plr_resources_2[0].name}")
|
||
print(f" - 子节点数量: {len(plr_resources_2[0].children)}")
|
||
print(f" - UUID 依然保持: {getattr(plr_resources_2[0], 'unilabos_uuid') == original_uuid}")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("✅ 测试完成! 所有转换正常工作")
|
||
print("=" * 60)
|