mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 21:41:16 +00:00
liquid states
This commit is contained in:
@@ -105,40 +105,41 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
|
||||
return nested_dict_to_list(resource), Resource
|
||||
return resource, source_type
|
||||
|
||||
def _process_resource_references(self, data: Any, to_dict=False) -> Any:
|
||||
def _process_resource_references(self, data: Any, to_dict=False, states=None, prefix_path="") -> Any:
|
||||
"""
|
||||
递归处理资源引用,替换_resource_child_name对应的资源
|
||||
|
||||
Args:
|
||||
data: 需要处理的数据,可能是字典、列表或其他类型
|
||||
to_dict: 转换成对应的实例,还是转换成对应的字典
|
||||
to_dict: 是否返回字典形式的资源
|
||||
states: 用于保存所有资源状态
|
||||
prefix_path: 当前递归路径
|
||||
|
||||
Returns:
|
||||
处理后的数据
|
||||
"""
|
||||
from pylabrobot.resources import Deck, Resource
|
||||
if states is None:
|
||||
states = {}
|
||||
|
||||
if isinstance(data, dict):
|
||||
# 检查是否包含资源引用
|
||||
if "_resource_child_name" in data:
|
||||
child_name = data["_resource_child_name"]
|
||||
if child_name in self.children:
|
||||
# 找到了对应的资源
|
||||
resource = self.children[child_name]
|
||||
|
||||
# 检查是否需要转换资源类型
|
||||
if "_resource_type" in data:
|
||||
type_path = data["_resource_type"]
|
||||
try:
|
||||
# 尝试导入指定的类型
|
||||
target_type = import_manager.get_class(type_path)
|
||||
contain_model = not issubclass(target_type, Deck)
|
||||
resource, target_type = self._process_resource_mapping(resource, target_type)
|
||||
# 在截图中格式,是deserialize,所以这里要转成plr resource可deserialize的字典
|
||||
# 这样后面执行deserialize的时候能够正确反序列化对应的物料
|
||||
resource_instance: Resource = resource_ulab_to_plr(resource, contain_model)
|
||||
|
||||
# 使用 prefix_path 作为 key 存储资源状态
|
||||
if to_dict:
|
||||
return resource_instance.serialize()
|
||||
serialized = resource_instance.serialize()
|
||||
states[prefix_path] = resource_instance.serialize_all_state()
|
||||
return serialized
|
||||
else:
|
||||
self.resource_tracker.add_resource(resource_instance)
|
||||
return resource_instance
|
||||
@@ -151,18 +152,21 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
|
||||
else:
|
||||
logger.warning(f"找不到资源引用 '{child_name}',保持原值不变")
|
||||
|
||||
# 递归处理字典的每个值
|
||||
# 递归处理每个键值
|
||||
result = {}
|
||||
for key, value in data.items():
|
||||
result[key] = self._process_resource_references(value, to_dict)
|
||||
new_prefix = f"{prefix_path}.{key}" if prefix_path else key
|
||||
result[key] = self._process_resource_references(value, to_dict, states, new_prefix)
|
||||
return result
|
||||
|
||||
# 处理列表类型
|
||||
elif isinstance(data, list):
|
||||
return [self._process_resource_references(item, to_dict) for item in data]
|
||||
return [
|
||||
self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]")
|
||||
for i, item in enumerate(data)
|
||||
]
|
||||
|
||||
# 其他类型直接返回
|
||||
return data
|
||||
else:
|
||||
return data
|
||||
|
||||
def create_instance(self, data: Dict[str, Any]) -> Optional[T]:
|
||||
"""
|
||||
@@ -187,10 +191,18 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
|
||||
logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}")
|
||||
|
||||
# 首先处理资源引用
|
||||
processed_data = self._process_resource_references(data, to_dict=True)
|
||||
states = {}
|
||||
processed_data = self._process_resource_references(data, to_dict=True, states=states)
|
||||
|
||||
try:
|
||||
self.device_instance = deserialize_method(**processed_data)
|
||||
all_states = self.device_instance.serialize_all_state()
|
||||
for k, v in states.items():
|
||||
logger.debug(f"PyLabRobot反序列化设置状态:{k}")
|
||||
for kk, vv in all_states.items():
|
||||
if kk not in v:
|
||||
v[kk] = vv
|
||||
self.device_instance.deck.load_all_state(v)
|
||||
self.resource_tracker.add_resource(self.device_instance)
|
||||
self.post_create()
|
||||
return self.device_instance # type: ignore
|
||||
@@ -225,6 +237,10 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
|
||||
if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction(getattr(self.device_instance, "setup")):
|
||||
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
|
||||
def done_cb(*args):
|
||||
from pylabrobot.resources import set_volume_tracking
|
||||
# from pylabrobot.resources import set_tip_tracking
|
||||
set_volume_tracking(enabled=True)
|
||||
# set_tip_tracking(enabled=True) # 序列化tip_spot has为False
|
||||
logger.debug(f"PyLabRobot设备实例 {self.device_instance} 设置完成")
|
||||
from unilabos.config.config import BasicConfig
|
||||
if BasicConfig.vis_2d_enable:
|
||||
|
||||
Reference in New Issue
Block a user