fix pump transfer. fix resource update when protocol & ros callback

This commit is contained in:
Xuwznln
2026-02-05 23:21:43 +08:00
parent 1d181743ea
commit 5dda5c61ce
8 changed files with 213 additions and 148 deletions

View File

@@ -452,8 +452,9 @@ unilab --ak your_ak --sk your_sk -g test/experiments/mock_devices/mock_all.json
**操作步骤:** **操作步骤:**
1. 将两个 `container` 拖拽到 `workstation` 1. 将两个 `container` 拖拽到 `workstation`
2.`virtual_transfer_pump` 拖拽到 `workstation` 2.`virtual_multiway_valve` 拖拽到 `workstation`
3. 在画布上连接它们(建立父子关系) 3. `virtual_transfer_pump` 拖拽到 `workstation`
4. 在画布上连接它们(建立父子关系)
![设备连接](image/links.png) ![设备连接](image/links.png)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 275 KiB

After

Width:  |  Height:  |  Size: 415 KiB

View File

@@ -95,8 +95,29 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float:
return total_volume return total_volume
def is_integrated_pump(node_name): def is_integrated_pump(node_class: str, node_name: str = "") -> bool:
return "pump" in node_name and "valve" in node_name """
判断是否为泵阀一体设备
"""
class_lower = (node_class or "").lower()
name_lower = (node_name or "").lower()
if "pump" not in class_lower and "pump" not in name_lower:
return False
integrated_markers = [
"valve",
"pump_valve",
"pumpvalve",
"integrated",
"transfer_pump",
]
for marker in integrated_markers:
if marker in class_lower or marker in name_lower:
return True
return False
def find_connected_pump(G, valve_node): def find_connected_pump(G, valve_node):
@@ -186,7 +207,9 @@ def build_pump_valve_maps(G, pump_backbone):
debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") debug_print(f"🔧 过滤后的骨架: {filtered_backbone}")
for node in filtered_backbone: for node in filtered_backbone:
if is_integrated_pump(G.nodes[node]["class"]): node_data = G.nodes.get(node, {})
node_class = node_data.get("class", "") or ""
if is_integrated_pump(node_class, node):
pumps_from_node[node] = node pumps_from_node[node] = node
valve_from_node[node] = node valve_from_node[node] = node
debug_print(f" - 集成泵-阀: {node}") debug_print(f" - 集成泵-阀: {node}")

View File

@@ -31,14 +31,14 @@ class VirtualTransferPump:
# 从config或kwargs中获取参数确保类型正确 # 从config或kwargs中获取参数确保类型正确
if config: if config:
self.max_volume = float(config.get('max_volume', 25.0)) self.max_volume = float(config.get("max_volume", 25.0))
self.port = config.get('port', 'VIRTUAL') self.port = config.get("port", "VIRTUAL")
else: else:
self.max_volume = float(kwargs.get('max_volume', 25.0)) self.max_volume = float(kwargs.get("max_volume", 25.0))
self.port = kwargs.get('port', 'VIRTUAL') self.port = kwargs.get("port", "VIRTUAL")
self._transfer_rate = float(kwargs.get('transfer_rate', 0)) self._transfer_rate = float(kwargs.get("transfer_rate", 0))
self.mode = kwargs.get('mode', VirtualPumpMode.Normal) self.mode = kwargs.get("mode", VirtualPumpMode.Normal)
# 状态变量 - 确保都是正确类型 # 状态变量 - 确保都是正确类型
self._status = "Idle" self._status = "Idle"
@@ -54,7 +54,9 @@ class VirtualTransferPump:
self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}") self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}")
print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨") print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨")
print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s") print(
f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s"
)
print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}") print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode): def post_init(self, ros_node: BaseROS2DeviceNode):
@@ -189,7 +191,9 @@ class VirtualTransferPump:
operation_emoji = "📍" operation_emoji = "📍"
self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}") self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}")
self.logger.info(f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)") self.logger.info(
f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)"
)
self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s") self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s")
self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s")
@@ -207,7 +211,11 @@ class VirtualTransferPump:
for i in range(steps + 1): for i in range(steps + 1):
# 计算当前位置和进度 # 计算当前位置和进度
progress = (i / steps) * 100 if steps > 0 else 100 progress = (i / steps) * 100 if steps > 0 else 100
current_pos = start_position + (target_position - start_position) * (i / steps) if steps > 0 else target_position current_pos = (
start_position + (target_position - start_position) * (i / steps)
if steps > 0
else target_position
)
# 更新状态 # 更新状态
if i < steps: if i < steps:
@@ -244,7 +252,9 @@ class VirtualTransferPump:
# 📊 最终状态日志 # 📊 最终状态日志
if volume_to_move > 0.01: if volume_to_move > 0.01:
self.logger.info(f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") self.logger.info(
f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL"
)
# 返回符合action定义的结果 # 返回符合action定义的结果
return { return {
@@ -252,7 +262,7 @@ class VirtualTransferPump:
"message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})", "message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})",
"final_position": self._position, "final_position": self._position,
"final_volume": self._current_volume, "final_volume": self._current_volume,
"operation_type": operation_type "operation_type": operation_type,
} }
except Exception as e: except Exception as e:
@@ -262,7 +272,7 @@ class VirtualTransferPump:
"success": False, "success": False,
"message": error_msg, "message": error_msg,
"final_position": self._position, "final_position": self._position,
"final_volume": self._current_volume "final_volume": self._current_volume,
} }
# 其他泵操作方法 # 其他泵操作方法
@@ -388,7 +398,9 @@ class VirtualTransferPump:
return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差 return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差
def __str__(self): def __str__(self):
return f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" return (
f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})"
)
def __repr__(self): def __repr__(self):
return self.__str__() return self.__str__()

View File

@@ -11,6 +11,7 @@ Virtual Workbench Device - 模拟工作台设备
注意:调用来自线程池,使用 threading.Lock 进行同步 注意:调用来自线程池,使用 threading.Lock 进行同步
""" """
import logging import logging
import time import time
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
@@ -22,12 +23,15 @@ from typing_extensions import TypedDict
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
from unilabos.utils.decorator import not_action from unilabos.utils.decorator import not_action
from unilabos.resources.resource_tracker import SampleUUIDsType
# ============ TypedDict 返回类型定义 ============ # ============ TypedDict 返回类型定义 ============
class MoveToHeatingStationResult(TypedDict): class MoveToHeatingStationResult(TypedDict):
"""move_to_heating_station 返回类型""" """move_to_heating_station 返回类型"""
success: bool success: bool
station_id: int station_id: int
material_id: str material_id: str
@@ -37,6 +41,7 @@ class MoveToHeatingStationResult(TypedDict):
class StartHeatingResult(TypedDict): class StartHeatingResult(TypedDict):
"""start_heating 返回类型""" """start_heating 返回类型"""
success: bool success: bool
station_id: int station_id: int
material_id: str material_id: str
@@ -46,6 +51,7 @@ class StartHeatingResult(TypedDict):
class MoveToOutputResult(TypedDict): class MoveToOutputResult(TypedDict):
"""move_to_output 返回类型""" """move_to_output 返回类型"""
success: bool success: bool
station_id: int station_id: int
material_id: str material_id: str
@@ -53,6 +59,7 @@ class MoveToOutputResult(TypedDict):
class PrepareMaterialsResult(TypedDict): class PrepareMaterialsResult(TypedDict):
"""prepare_materials 返回类型 - 批量准备物料""" """prepare_materials 返回类型 - 批量准备物料"""
success: bool success: bool
count: int count: int
material_1: int # 物料编号1 material_1: int # 物料编号1
@@ -65,8 +72,10 @@ class PrepareMaterialsResult(TypedDict):
# ============ 状态枚举 ============ # ============ 状态枚举 ============
class HeatingStationState(Enum): class HeatingStationState(Enum):
"""加热台状态枚举""" """加热台状态枚举"""
IDLE = "idle" # 空闲 IDLE = "idle" # 空闲
OCCUPIED = "occupied" # 已放置物料,等待加热 OCCUPIED = "occupied" # 已放置物料,等待加热
HEATING = "heating" # 加热中 HEATING = "heating" # 加热中
@@ -75,6 +84,7 @@ class HeatingStationState(Enum):
class ArmState(Enum): class ArmState(Enum):
"""机械臂状态枚举""" """机械臂状态枚举"""
IDLE = "idle" # 空闲 IDLE = "idle" # 空闲
BUSY = "busy" # 工作中 BUSY = "busy" # 工作中
@@ -82,6 +92,7 @@ class ArmState(Enum):
@dataclass @dataclass
class HeatingStation: class HeatingStation:
"""加热台数据结构""" """加热台数据结构"""
station_id: int station_id: int
state: HeatingStationState = HeatingStationState.IDLE state: HeatingStationState = HeatingStationState.IDLE
current_material: Optional[str] = None # 当前物料 (如 "A1", "A2") current_material: Optional[str] = None # 当前物料 (如 "A1", "A2")
@@ -137,8 +148,7 @@ class VirtualWorkbench:
# 加热台状态 (station_id -> HeatingStation) - 立即初始化不依赖initialize() # 加热台状态 (station_id -> HeatingStation) - 立即初始化不依赖initialize()
self._heating_stations: Dict[int, HeatingStation] = { self._heating_stations: Dict[int, HeatingStation] = {
i: HeatingStation(station_id=i) i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1)
for i in range(1, self.NUM_HEATING_STATIONS + 1)
} }
self._stations_lock = RLock() # 可重入锁,保护加热台状态 self._stations_lock = RLock() # 可重入锁,保护加热台状态
@@ -178,14 +188,16 @@ class VirtualWorkbench:
station.heating_progress = 0.0 station.heating_progress = 0.0
# 初始化状态 # 初始化状态
self.data.update({ self.data.update(
"status": "Ready", {
"arm_state": ArmState.IDLE.value, "status": "Ready",
"arm_current_task": None, "arm_state": ArmState.IDLE.value,
"heating_stations": self._get_stations_status(), "arm_current_task": None,
"active_tasks_count": 0, "heating_stations": self._get_stations_status(),
"message": "工作台就绪", "active_tasks_count": 0,
}) "message": "工作台就绪",
}
)
self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪") self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪")
return True return True
@@ -204,12 +216,14 @@ class VirtualWorkbench:
with self._tasks_lock: with self._tasks_lock:
self._active_tasks.clear() self._active_tasks.clear()
self.data.update({ self.data.update(
"status": "Offline", {
"arm_state": ArmState.IDLE.value, "status": "Offline",
"heating_stations": {}, "arm_state": ArmState.IDLE.value,
"message": "工作台已关闭", "heating_stations": {},
}) "message": "工作台已关闭",
}
)
return True return True
def _get_stations_status(self) -> Dict[int, Dict[str, Any]]: def _get_stations_status(self) -> Dict[int, Dict[str, Any]]:
@@ -227,12 +241,14 @@ class VirtualWorkbench:
def _update_data_status(self, message: Optional[str] = None): def _update_data_status(self, message: Optional[str] = None):
"""更新状态数据""" """更新状态数据"""
self.data.update({ self.data.update(
"arm_state": self._arm_state.value, {
"arm_current_task": self._arm_current_task, "arm_state": self._arm_state.value,
"heating_stations": self._get_stations_status(), "arm_current_task": self._arm_current_task,
"active_tasks_count": len(self._active_tasks), "heating_stations": self._get_stations_status(),
}) "active_tasks_count": len(self._active_tasks),
}
)
if message: if message:
self.data["message"] = message self.data["message"] = message
@@ -280,6 +296,7 @@ class VirtualWorkbench:
def prepare_materials( def prepare_materials(
self, self,
sample_uuids: SampleUUIDsType,
count: int = 5, count: int = 5,
) -> PrepareMaterialsResult: ) -> PrepareMaterialsResult:
""" """
@@ -297,10 +314,7 @@ class VirtualWorkbench:
# 生成物料列表 A1 - A{count} # 生成物料列表 A1 - A{count}
materials = [i for i in range(1, count + 1)] materials = [i for i in range(1, count + 1)]
self.logger.info( self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}")
f"[准备物料] 生成 {count} 个物料: "
f"A1-A{count} -> material_1~material_{count}"
)
return { return {
"success": True, "success": True,
@@ -315,6 +329,7 @@ class VirtualWorkbench:
def move_to_heating_station( def move_to_heating_station(
self, self,
sample_uuids: SampleUUIDsType,
material_number: int, material_number: int,
) -> MoveToHeatingStationResult: ) -> MoveToHeatingStationResult:
""" """
@@ -407,6 +422,7 @@ class VirtualWorkbench:
def start_heating( def start_heating(
self, self,
sample_uuids: SampleUUIDsType,
station_id: int, station_id: int,
material_number: int, material_number: int,
) -> StartHeatingResult: ) -> StartHeatingResult:
@@ -503,6 +519,7 @@ class VirtualWorkbench:
def move_to_output( def move_to_output(
self, self,
sample_uuids: SampleUUIDsType,
station_id: int, station_id: int,
material_number: int, material_number: int,
) -> MoveToOutputResult: ) -> MoveToOutputResult:

View File

@@ -412,16 +412,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
else: else:
for r in rts.root_nodes: for r in rts.root_nodes:
r.res_content.parent_uuid = self.uuid r.res_content.parent_uuid = self.uuid
rts_plr_instances = rts.to_plr_resources()
if ( if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer):
len(LIQUID_INPUT_SLOT)
and LIQUID_INPUT_SLOT[0] == -1
and len(rts.root_nodes) == 1
and isinstance(rts.root_nodes[0], RegularContainer)
):
# noinspection PyTypeChecker # noinspection PyTypeChecker
container_instance: RegularContainer = rts.root_nodes[0] container_instance: RegularContainer = rts_plr_instances[0]
found_resources = self.resource_tracker.figure_resource({"id": container_instance.name}, try_mode=True) found_resources = self.resource_tracker.figure_resource({"name": container_instance.name}, try_mode=True)
if not len(found_resources): if not len(found_resources):
self.resource_tracker.add_resource(container_instance) self.resource_tracker.add_resource(container_instance)
logger.info(f"添加物料{container_instance.name}到资源跟踪器") logger.info(f"添加物料{container_instance.name}到资源跟踪器")
@@ -430,7 +425,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
found_resource = found_resources[0] found_resource = found_resources[0]
if isinstance(found_resource, RegularContainer): if isinstance(found_resource, RegularContainer):
logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}")
found_resource.state.update(json.loads(container_instance.state)) found_resource.state.update(container_instance.state)
elif isinstance(found_resource, dict): elif isinstance(found_resource, dict):
raise ValueError("已不支持 字典 版本的RegularContainer") raise ValueError("已不支持 字典 版本的RegularContainer")
else: else:
@@ -443,7 +438,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"action": "add", "action": "add",
"data": { "data": {
"data": rts.dump(), "data": rts.dump(),
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else self.uuid,
"first_add": False, "first_add": False,
}, },
} }
@@ -1538,11 +1533,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
if isinstance(rs, list): if isinstance(rs, list):
for r in rs: for r in rs:
res = self.resource_tracker.parent_resource(r) # 获取 resource 对象 res = self.resource_tracker.parent_resource(r) # 获取 resource 对象
if res is None:
res = rs
if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
else: else:
res = self.resource_tracker.parent_resource(rs) res = self.resource_tracker.parent_resource(rs)
if id(res) not in seen: if res is None:
seen.add(id(res)) res = rs
unique_resources.append(res) if id(res) not in seen:
seen.add(id(res))
unique_resources.append(res)
# 使用新的资源树接口 # 使用新的资源树接口
if unique_resources: if unique_resources:

View File

@@ -340,6 +340,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False) plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False)
# 获取父资源 # 获取父资源
res = self.resource_tracker.parent_resource(plr) res = self.resource_tracker.parent_resource(plr)
if res is None:
res = plr
if id(res) not in seen: if id(res) not in seen:
seen.add(id(res)) seen.add(id(res))
unique_resources.append(res) unique_resources.append(res)

View File

@@ -52,7 +52,8 @@ class DeviceClassCreator(Generic[T]):
if self.device_instance is not None: if self.device_instance is not None:
for c in self.children: for c in self.children:
if c.res_content.type != "device": if c.res_content.type != "device":
self.resource_tracker.add_resource(c.get_plr_nested_dict()) res = ResourceTreeSet([ResourceTreeInstance(c)]).to_plr_resources()[0]
self.resource_tracker.add_resource(res)
def create_instance(self, data: Dict[str, Any]) -> T: def create_instance(self, data: Dict[str, Any]) -> T:
""" """
@@ -119,7 +120,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
# return resource, source_type # return resource, source_type
def _process_resource_references( def _process_resource_references(
self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None self, data: Any, processed_child_names: Optional[Dict[str, Any]], to_dict=False, states=None, prefix_path="", name_to_uuid=None
) -> Any: ) -> Any:
""" """
递归处理资源引用替换_resource_child_name对应的资源 递归处理资源引用替换_resource_child_name对应的资源
@@ -164,6 +165,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
states[prefix_path] = resource_instance.serialize_all_state() states[prefix_path] = resource_instance.serialize_all_state()
return serialized return serialized
else: else:
processed_child_names[child_name] = resource_instance
self.resource_tracker.add_resource(resource_instance) self.resource_tracker.add_resource(resource_instance)
# 立即设置UUIDstate已经在resource_ulab_to_plr中处理过了 # 立即设置UUIDstate已经在resource_ulab_to_plr中处理过了
if name_to_uuid: if name_to_uuid:
@@ -182,12 +184,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
result = {} result = {}
for key, value in data.items(): for key, value in data.items():
new_prefix = f"{prefix_path}.{key}" if prefix_path else key new_prefix = f"{prefix_path}.{key}" if prefix_path else key
result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid) result[key] = self._process_resource_references(value, processed_child_names, to_dict, states, new_prefix, name_to_uuid)
return result return result
elif isinstance(data, list): elif isinstance(data, list):
return [ return [
self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) self._process_resource_references(item, processed_child_names, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid)
for i, item in enumerate(data) for i, item in enumerate(data)
] ]
@@ -234,7 +236,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
# 首先处理资源引用 # 首先处理资源引用
states = {} states = {}
processed_data = self._process_resource_references( processed_data = self._process_resource_references(
data, to_dict=True, states=states, name_to_uuid=name_to_uuid data, {}, to_dict=True, states=states, name_to_uuid=name_to_uuid
) )
try: try:
@@ -270,7 +272,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
arg_value = spec_args[param_name].annotation arg_value = spec_args[param_name].annotation
data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value
logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}")
processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid) processed_child_names = {}
processed_data = self._process_resource_references(data, processed_child_names, to_dict=False, name_to_uuid=name_to_uuid)
for child_name, resource_instance in processed_data.items():
for ind, name in enumerate([child.res_content.name for child in self.children]):
if name == child_name:
self.children.pop(ind)
self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用调用的自身的attach_resource self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用调用的自身的attach_resource
except Exception as e: except Exception as e:
logger.error(f"PyLabRobot创建实例失败: {e}") logger.error(f"PyLabRobot创建实例失败: {e}")
@@ -342,9 +349,10 @@ class WorkstationNodeCreator(DeviceClassCreator[T]):
try: try:
# 创建实例额外补充一个给protocol node的字段后面考虑取消 # 创建实例额外补充一个给protocol node的字段后面考虑取消
data["children"] = self.children data["children"] = self.children
for child in self.children: # super(WorkstationNodeCreator, self).create_instance(data)的时候会attach
if child.res_content.type != "device": # for child in self.children:
self.resource_tracker.add_resource(child.get_plr_nested_dict()) # if child.res_content.type != "device":
# self.resource_tracker.add_resource(child.get_plr_nested_dict())
deck_dict = data.get("deck") deck_dict = data.get("deck")
if deck_dict: if deck_dict:
from pylabrobot.resources import Deck, Resource from pylabrobot.resources import Deck, Resource