mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-08 16:05:14 +00:00
添加抓取后物料上传
This commit is contained in:
@@ -30,10 +30,11 @@ from pylabrobot.liquid_handling.standard import (
|
||||
ResourceMove,
|
||||
ResourceDrop,
|
||||
)
|
||||
from pylabrobot.resources import ResourceHolder, ResourceStack, Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, PlateAdapter, TubeRack
|
||||
from pylabrobot.resources import ResourceHolder, ResourceStack, Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, PlateAdapter, TubeRack, create_homogeneous_resources, create_ordered_items_2d
|
||||
|
||||
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract, SimpleReturn
|
||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
|
||||
from unilabos.resources.itemized_carrier import ItemizedCarrier
|
||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
|
||||
|
||||
|
||||
class PRCXIError(RuntimeError):
|
||||
@@ -86,19 +87,81 @@ class PRCXI9300Container(Plate):
|
||||
category: str,
|
||||
ordering: collections.OrderedDict,
|
||||
model: Optional[str] = None,
|
||||
material_info: Optional[Dict[str, Any]] = None,
|
||||
ordering_layout: str = "col-major",
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
|
||||
self._unilabos_state = {}
|
||||
self.sites = kwargs.get("sites", [])
|
||||
self.sites = create_homogeneous_resources(
|
||||
klass=ResourceHolder,
|
||||
locations=[Coordinate(0, 0, 0)],
|
||||
resource_size_x=size_x,
|
||||
resource_size_y=size_y,
|
||||
resource_size_z=size_z,
|
||||
name_prefix=name,
|
||||
)[0]
|
||||
# 为 ItemizedCarrier 添加 _unilabos_state 属性,以便与其他 PRCXI 组件兼容
|
||||
sites_resource = ItemizedCarrier(
|
||||
name=name+"_sites",
|
||||
sites={name: self.sites},
|
||||
size_x=size_x,
|
||||
size_y=size_y,
|
||||
size_z=size_z,
|
||||
category="warehouse",
|
||||
model=model,
|
||||
)
|
||||
sites_resource._unilabos_state = {} # 添加 _unilabos_state 属性
|
||||
if material_info:
|
||||
sites_resource._unilabos_state["Material"] = material_info
|
||||
|
||||
self.assign_child_resource(sites_resource, location=self.sites.location)
|
||||
|
||||
# 保存排序方式,供graphio.py的坐标映射使用
|
||||
# 使用独立属性避免与父类的layout冲突
|
||||
self.ordering_layout = ordering_layout
|
||||
|
||||
def serialize(self) -> dict:
|
||||
"""序列化时保存 ordering_layout 属性"""
|
||||
data = super().serialize()
|
||||
data['ordering_layout'] = self.ordering_layout
|
||||
return data
|
||||
|
||||
def load_state(self, state: Dict[str, Any]) -> None:
|
||||
"""从给定的状态加载工作台信息。"""
|
||||
super().load_state(state)
|
||||
self._unilabos_state = state
|
||||
|
||||
def get_site(self) -> ResourceHolder:
|
||||
"""获取容器的站点"""
|
||||
return self.sites
|
||||
|
||||
def add_resource_to_site(self, resource) -> None:
|
||||
"""向站点添加资源"""
|
||||
self.sites.assign_child_resource(resource)
|
||||
|
||||
def get_resource_at_site(self):
|
||||
"""获取站点上的资源"""
|
||||
return self.sites.children[0] if self.sites.children else None
|
||||
|
||||
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
|
||||
data = super().serialize_state()
|
||||
data.update(self._unilabos_state)
|
||||
|
||||
# 避免序列化 ResourceHolder 对象
|
||||
if hasattr(self, 'sites') and self.sites:
|
||||
# 创建 sites 的可序列化版本
|
||||
if hasattr(self.sites, '__class__') and 'pylabrobot' in str(self.sites.__class__.__module__):
|
||||
data['sites'] = {
|
||||
"__pylabrobot_object__": True,
|
||||
"class": self.sites.__class__.__name__,
|
||||
"module": self.sites.__class__.__module__,
|
||||
"name": getattr(self.sites, 'name', str(self.sites))
|
||||
}
|
||||
else:
|
||||
data['sites'] = self.sites
|
||||
|
||||
return data
|
||||
class PRCXI9300Plate(Plate):
|
||||
"""
|
||||
@@ -210,9 +273,16 @@ class PRCXI9300TipRack(TipRack):
|
||||
# 使用 ordering 参数,只包含位置信息(键)
|
||||
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
|
||||
else:
|
||||
# ordering 的值已经是对象,可以直接使用
|
||||
items = ordering
|
||||
ordering_param = None
|
||||
# ordering 的值已经是对象,需要过滤掉 None 值
|
||||
# 只保留有效的对象,用于 ordered_items 参数
|
||||
valid_items = {k: v for k, v in ordering.items() if v is not None}
|
||||
if valid_items:
|
||||
items = valid_items
|
||||
ordering_param = None
|
||||
else:
|
||||
# 如果没有有效对象,使用 ordering 参数
|
||||
items = None
|
||||
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
|
||||
else:
|
||||
items = None
|
||||
ordering_param = None
|
||||
@@ -348,9 +418,16 @@ class PRCXI9300TubeRack(TubeRack):
|
||||
# 使用 ordering 参数,只包含位置信息(键)
|
||||
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
|
||||
else:
|
||||
# ordering 的值已经是对象,可以直接使用
|
||||
items_to_pass = ordering
|
||||
ordering_param = None
|
||||
# ordering 的值已经是对象,需要过滤掉 None 值
|
||||
# 只保留有效的对象,用于 ordered_items 参数
|
||||
valid_items = {k: v for k, v in ordering.items() if v is not None}
|
||||
if valid_items:
|
||||
items_to_pass = valid_items
|
||||
ordering_param = None
|
||||
else:
|
||||
# 如果没有有效对象,使用 ordering 参数
|
||||
items_to_pass = None
|
||||
ordering_param = collections.OrderedDict((k, None) for k in ordering.keys())
|
||||
elif items is not None:
|
||||
# 兼容旧的 items 参数
|
||||
items_to_pass = items
|
||||
@@ -804,7 +881,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
||||
**backend_kwargs,
|
||||
):
|
||||
|
||||
return await super().move_plate(
|
||||
res = await super().move_plate(
|
||||
plate,
|
||||
to,
|
||||
intermediate_locations,
|
||||
@@ -816,6 +893,12 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
||||
target_plate_number = to,
|
||||
**backend_kwargs,
|
||||
)
|
||||
plate.unassign()
|
||||
to.assign_child_resource(plate, location=Coordinate(0, 0, 0))
|
||||
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
|
||||
"resources": [self.deck]
|
||||
})
|
||||
return res
|
||||
|
||||
class PRCXI9300Backend(LiquidHandlerBackend):
|
||||
"""PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。
|
||||
|
||||
Reference in New Issue
Block a user