prcxi resource (#202)

* prcxi resource

* prcxi_resource

* Fix upload error not showing.
Support str type category.

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>
This commit is contained in:
shuchang
2025-12-23 15:08:04 +08:00
committed by GitHub
parent 706323dc3e
commit acbaff7bb7
32 changed files with 224537 additions and 201 deletions

View File

@@ -6,7 +6,7 @@ import os
import socket
import time
import uuid
from typing import Any, List, Dict, Optional, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from typing import Any, List, Dict, Optional, OrderedDict, Tuple, TypedDict, Union, Sequence, Iterator, Literal
from pylabrobot.liquid_handling import (
LiquidHandlerBackend,
@@ -28,7 +28,7 @@ from pylabrobot.liquid_handling.standard import (
ResourceMove,
ResourceDrop,
)
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, TubeRack, PlateAdapter
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
@@ -70,50 +70,129 @@ class PRCXI9300Deck(Deck):
super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 6 # PRCXI 9300 有 6 个槽位
class PRCXI9300Container(Plate, TipRack):
"""PRCXI 9300 的专用 Container 类,继承自 Plate和TipRack。
该类定义了 PRCXI 9300 的工作台布局和槽位信息
class PRCXI9300Plate(Plate):
"""
专用孔板类:
1. 继承自 PLR 原生 Plate保留所有物理特性。
2. 增加 material_info 参数,用于在初始化时直接绑定 Unilab UUID
"""
def __init__(
self,
name: str,
size_x: float,
size_y: float,
size_z: float,
category: str,
ordering: collections.OrderedDict,
model: Optional[str] = None,
**kwargs,
):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
category: str = "plate",
ordered_items: collections.OrderedDict = None,
ordering: Optional[collections.OrderedDict] = None,
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
items = ordered_items if ordered_items is not None else ordering
super().__init__(name, size_x, size_y, size_z,
ordered_items=items,
category=category,
model=model, **kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。"""
super().load_state(state)
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
try:
data = super().serialize_state()
except AttributeError:
data = {}
if hasattr(self, '_unilabos_state') and self._unilabos_state:
safe_state = {}
for k, v in self._unilabos_state.items():
# 如果是 Material 字典,深入检查
if k == "Material" and isinstance(v, dict):
safe_material = {}
for mk, mv in v.items():
# 只保留基本数据类型 (字符串, 数字, 布尔值, 列表, 字典)
if isinstance(mv, (str, int, float, bool, list, dict, type(None))):
safe_material[mk] = mv
else:
# 打印日志提醒(可选)
# print(f"Warning: Removing non-serializable key {mk} from {self.name}")
pass
safe_state[k] = safe_material
# 其他顶层属性也进行类型检查
elif isinstance(v, (str, int, float, bool, list, dict, type(None))):
safe_state[k] = v
data.update(safe_state)
return data
class PRCXI9300TipRack(TipRack):
""" 专用吸头盒类 """
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
category: str = "tip_rack",
ordered_items: collections.OrderedDict = None,
ordering: Optional[collections.OrderedDict] = None,
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
items = ordered_items if ordered_items is not None else ordering
super().__init__(name, size_x, size_y, size_z,
ordered_items=items,
category=category,
model=model, **kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def load_state(self, state: Dict[str, Any]) -> None:
super().load_state(state)
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
data = super().serialize_state()
data.update(self._unilabos_state)
try:
data = super().serialize_state()
except AttributeError:
data = {}
if hasattr(self, '_unilabos_state') and self._unilabos_state:
safe_state = {}
for k, v in self._unilabos_state.items():
# 如果是 Material 字典,深入检查
if k == "Material" and isinstance(v, dict):
safe_material = {}
for mk, mv in v.items():
# 只保留基本数据类型 (字符串, 数字, 布尔值, 列表, 字典)
if isinstance(mv, (str, int, float, bool, list, dict, type(None))):
safe_material[mk] = mv
else:
# 打印日志提醒(可选)
# print(f"Warning: Removing non-serializable key {mk} from {self.name}")
pass
safe_state[k] = safe_material
# 其他顶层属性也进行类型检查
elif isinstance(v, (str, int, float, bool, list, dict, type(None))):
safe_state[k] = v
data.update(safe_state)
return data
class PRCXI9300Trash(Trash):
"""PRCXI 9300 的专用 Trash 类,继承自 Trash。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str, **kwargs):
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
category: str = "trash",
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
if name != "trash":
name = "trash"
print("PRCXI9300Trash name must be 'trash', using 'trash' instead.")
super().__init__(name, size_x, size_y, size_z, category=category, **kwargs)
print(f"Warning: PRCXI9300Trash usually expects name='trash' for backend logic, but got '{name}'.")
super().__init__(name, size_x, size_y, size_z, **kwargs)
self._unilabos_state = {}
# 初始化时注入 UUID
if material_info:
self._unilabos_state["Material"] = material_info
def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。"""
@@ -121,10 +200,152 @@ class PRCXI9300Trash(Trash):
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
data = super().serialize_state()
data.update(self._unilabos_state)
try:
data = super().serialize_state()
except AttributeError:
data = {}
if hasattr(self, '_unilabos_state') and self._unilabos_state:
safe_state = {}
for k, v in self._unilabos_state.items():
# 如果是 Material 字典,深入检查
if k == "Material" and isinstance(v, dict):
safe_material = {}
for mk, mv in v.items():
# 只保留基本数据类型 (字符串, 数字, 布尔值, 列表, 字典)
if isinstance(mv, (str, int, float, bool, list, dict, type(None))):
safe_material[mk] = mv
else:
# 打印日志提醒(可选)
# print(f"Warning: Removing non-serializable key {mk} from {self.name}")
pass
safe_state[k] = safe_material
# 其他顶层属性也进行类型检查
elif isinstance(v, (str, int, float, bool, list, dict, type(None))):
safe_state[k] = v
data.update(safe_state)
return data
class PRCXI9300TubeRack(TubeRack):
"""
专用管架类:用于 EP 管架、试管架等。
继承自 PLR 的 TubeRack并支持注入 material_info (UUID)。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
category: str = "tube_rack",
items: Optional[Dict[str, Any]] = None,
ordered_items: Optional[OrderedDict] = None,
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
**kwargs):
# 兼容处理PLR 的 TubeRack 构造函数可能接受 items 或 ordered_items
items_to_pass = items if items is not None else ordered_items
super().__init__(name, size_x, size_y, size_z,
ordered_items=ordered_items,
model=model,
**kwargs)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
try:
data = super().serialize_state()
except AttributeError:
data = {}
if hasattr(self, '_unilabos_state') and self._unilabos_state:
safe_state = {}
for k, v in self._unilabos_state.items():
# 如果是 Material 字典,深入检查
if k == "Material" and isinstance(v, dict):
safe_material = {}
for mk, mv in v.items():
# 只保留基本数据类型 (字符串, 数字, 布尔值, 列表, 字典)
if isinstance(mv, (str, int, float, bool, list, dict, type(None))):
safe_material[mk] = mv
else:
# 打印日志提醒(可选)
# print(f"Warning: Removing non-serializable key {mk} from {self.name}")
pass
safe_state[k] = safe_material
# 其他顶层属性也进行类型检查
elif isinstance(v, (str, int, float, bool, list, dict, type(None))):
safe_state[k] = v
data.update(safe_state)
return data
class PRCXI9300PlateAdapter(PlateAdapter):
"""
专用板式适配器类:用于承载 Plate 的底座(如 PCR 适配器、磁吸架等)。
支持注入 material_info (UUID)。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float,
category: str = "plate_adapter",
model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
# 参数给予默认值 (标准96孔板尺寸)
adapter_hole_size_x: float = 127.76,
adapter_hole_size_y: float = 85.48,
adapter_hole_size_z: float = 10.0, # 假设凹槽深度或板子放置高度
dx: Optional[float] = None,
dy: Optional[float] = None,
dz: float = 0.0, # 默认Z轴偏移
**kwargs):
# 自动居中计算:如果未指定 dx/dy则根据适配器尺寸和孔尺寸计算居中位置
if dx is None:
dx = (size_x - adapter_hole_size_x) / 2
if dy is None:
dy = (size_y - adapter_hole_size_y) / 2
super().__init__(
name=name,
size_x=size_x,
size_y=size_y,
size_z=size_z,
dx=dx,
dy=dy,
dz=dz,
adapter_hole_size_x=adapter_hole_size_x,
adapter_hole_size_y=adapter_hole_size_y,
adapter_hole_size_z=adapter_hole_size_z,
model=model,
**kwargs
)
self._unilabos_state = {}
if material_info:
self._unilabos_state["Material"] = material_info
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
try:
data = super().serialize_state()
except AttributeError:
data = {}
if hasattr(self, '_unilabos_state') and self._unilabos_state:
safe_state = {}
for k, v in self._unilabos_state.items():
# 如果是 Material 字典,深入检查
if k == "Material" and isinstance(v, dict):
safe_material = {}
for mk, mv in v.items():
# 只保留基本数据类型 (字符串, 数字, 布尔值, 列表, 字典)
if isinstance(mv, (str, int, float, bool, list, dict, type(None))):
safe_material[mk] = mv
else:
# 打印日志提醒(可选)
# print(f"Warning: Removing non-serializable key {mk} from {self.name}")
pass
safe_state[k] = safe_material
# 其他顶层属性也进行类型检查
elif isinstance(v, (str, int, float, bool, list, dict, type(None))):
safe_state[k] = v
data.update(safe_state)
return data
class PRCXI9300Handler(LiquidHandlerAbstract):
support_touch_tip = False
@@ -154,10 +375,15 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
tablets_info = []
count = 0
for child in deck.children:
if "Material" in child._unilabos_state:
child_state = getattr(child, "_unilabos_state", {})
if "Material" in child_state:
count += 1
tablets_info.append(
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
WorkTablets(
Number=count,
Code=f"T{count}",
Material=child_state["Material"]
)
)
if is_9320:
print("当前设备是9320")
@@ -434,7 +660,6 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
return await super().move_to(well, dis_to_top, channel)
class PRCXI9300Backend(LiquidHandlerBackend):
"""PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。
@@ -1533,31 +1758,31 @@ if __name__ == "__main__":
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul, opentrons_96_tiprack_10ul
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat, nest_96_wellplate_2ml_deep
def get_well_container(name: str) -> PRCXI9300Container:
def get_well_container(name: str) -> PRCXI9300Plate:
well_containers = corning_96_wellplate_360ul_flat(name).serialize()
plate = PRCXI9300Container(
name=name, size_x=50, size_y=50, size_z=10, category="plate", ordering=well_containers["ordering"]
plate = PRCXI9300Plate(
name=name, size_x=50, size_y=50, size_z=10, category="plate", ordered_items=well_containers["ordering"]
)
plate_serialized = plate.serialize()
plate_serialized["parent_name"] = deck.name
well_containers.update({k: v for k, v in plate_serialized.items() if k not in ["children"]})
new_plate: PRCXI9300Container = PRCXI9300Container.deserialize(well_containers)
new_plate: PRCXI9300Plate = PRCXI9300Plate.deserialize(well_containers)
return new_plate
def get_tip_rack(name: str, child_prefix: str = "tip") -> PRCXI9300Container:
def get_tip_rack(name: str, child_prefix: str = "tip") -> PRCXI9300TipRack:
tip_racks = opentrons_96_tiprack_10ul(name).serialize()
tip_rack = PRCXI9300Container(
tip_rack = PRCXI9300TipRack(
name=name,
size_x=50,
size_y=50,
size_z=10,
category="tip_rack",
ordering=collections.OrderedDict({k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()}),
ordered_items=collections.OrderedDict({k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()}),
)
tip_rack_serialized = tip_rack.serialize()
tip_rack_serialized["parent_name"] = deck.name
tip_racks.update({k: v for k, v in tip_rack_serialized.items() if k not in ["children"]})
new_tip_rack: PRCXI9300Container = PRCXI9300Container.deserialize(tip_racks)
new_tip_rack: PRCXI9300TipRack = PRCXI9300TipRack.deserialize(tip_racks)
return new_tip_rack
plate1 = get_tip_rack("RackT1")
@@ -1604,8 +1829,8 @@ if __name__ == "__main__":
}
}
)
plate7 = PRCXI9300Container(
name="plateT7", size_x=50, size_y=50, size_z=10, category="plate", ordering=collections.OrderedDict()
plate7 = PRCXI9300Plate(
name="plateT7", size_x=50, size_y=50, size_z=10, category="plate", ordered_items=collections.OrderedDict()
)
plate7.load_state({"Material": {"uuid": "04211a2dc93547fe9bf6121eac533650"}})
plate8 = get_tip_rack("PlateT8")
@@ -1679,13 +1904,13 @@ if __name__ == "__main__":
deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
deck.assign_child_resource(
PRCXI9300Container(
PRCXI9300Plate(
name="container_for_nothin3",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict(),
ordered_items=collections.OrderedDict(),
),
location=Coordinate(0, 0, 0),
)
@@ -1693,48 +1918,48 @@ if __name__ == "__main__":
deck.assign_child_resource(plate5, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate6, location=Coordinate(0, 0, 0))
deck.assign_child_resource(
PRCXI9300Container(
PRCXI9300Plate(
name="container_for_nothing7",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict(),
ordered_items=collections.OrderedDict(),
),
location=Coordinate(0, 0, 0),
)
deck.assign_child_resource(
PRCXI9300Container(
PRCXI9300Plate(
name="container_for_nothing8",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict(),
ordered_items=collections.OrderedDict(),
),
location=Coordinate(0, 0, 0),
)
deck.assign_child_resource(plate9, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate10, location=Coordinate(0, 0, 0))
deck.assign_child_resource(
PRCXI9300Container(
PRCXI9300Plate(
name="container_for_nothing11",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict(),
ordered_items=collections.OrderedDict(),
),
location=Coordinate(0, 0, 0),
)
deck.assign_child_resource(
PRCXI9300Container(
PRCXI9300Plate(
name="container_for_nothing12",
size_x=50,
size_y=50,
size_z=10,
category="plate",
ordering=collections.OrderedDict(),
ordered_items=collections.OrderedDict(),
),
location=Coordinate(0, 0, 0),
)