diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 531f6ef..31d18c1 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -30,10 +30,11 @@ from pylabrobot.liquid_handling.standard import ( ResourceMove, ResourceDrop, ) -from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, TubeRack, PlateAdapter +from pylabrobot.resources import ResourceHolder, ResourceStack, Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, PlateAdapter, TubeRack, create_homogeneous_resources, create_ordered_items_2d -from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract -from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode +from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract, SimpleReturn +from unilabos.resources.itemized_carrier import ItemizedCarrier +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode class PRCXIError(RuntimeError): @@ -122,11 +123,61 @@ class PRCXI9300Plate(Plate): model: Optional[str] = None, material_info: Optional[Dict[str, Any]] = None, **kwargs): - items = ordered_items if ordered_items is not None else ordering - super().__init__(name, size_x, size_y, size_z, - ordered_items=items, - category=category, - model=model, **kwargs) + # 如果 ordered_items 不为 None,直接使用 + if ordered_items is not None: + items = ordered_items + elif ordering is not None: + # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) + # 如果是字符串,说明这是位置名称,需要让 Plate 自己创建 Well 对象 + # 我们只传递位置信息(键),不传递值,使用 ordering 参数 + if ordering and isinstance(next(iter(ordering.values()), None), str): + # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict + # 传递 ordering 参数而不是 ordered_items,让 Plate 自己创建 Well 对象 + items = None + # 使用 ordering 参数,只包含位置信息(键) + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + else: + # ordering 的值是对象(可能是 Well 对象),检查是否有有效的 location + # 如果是反序列化过程,Well 对象可能没有正确的 location,需要让 Plate 重新创建 + sample_value = next(iter(ordering.values()), None) + if sample_value is not None and hasattr(sample_value, 'location'): + # 如果是 Well 对象但 location 为 None,说明是反序列化过程 + # 让 Plate 自己创建 Well 对象 + if sample_value.location is None: + items = None + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + else: + # Well 对象有有效的 location,可以直接使用 + items = ordering + ordering_param = None + elif sample_value is None: + # ordering 的值都是 None,让 Plate 自己创建 Well 对象 + items = None + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + else: + # 其他情况,直接使用 + items = ordering + ordering_param = None + else: + items = None + ordering_param = collections.OrderedDict() # 提供空的 ordering + + # 根据情况传递不同的参数 + if items is not None: + super().__init__(name, size_x, size_y, size_z, + ordered_items=items, + category=category, + model=model, **kwargs) + elif ordering_param is not None: + # 传递 ordering 参数,让 Plate 自己创建 Well 对象 + super().__init__(name, size_x, size_y, size_z, + ordering=ordering_param, + category=category, + model=model, **kwargs) + else: + super().__init__(name, size_x, size_y, size_z, + category=category, + model=model, **kwargs) self._unilabos_state = {} if material_info: @@ -173,11 +224,50 @@ class PRCXI9300TipRack(TipRack): model: Optional[str] = None, material_info: Optional[Dict[str, Any]] = None, **kwargs): - items = ordered_items if ordered_items is not None else ordering - super().__init__(name, size_x, size_y, size_z, - ordered_items=items, - category=category, - model=model, **kwargs) + # 如果 ordered_items 不为 None,直接使用 + if ordered_items is not None: + items = ordered_items + elif ordering is not None: + # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) + # 如果是字符串,说明这是位置名称,需要让 TipRack 自己创建 Tip 对象 + # 我们只传递位置信息(键),不传递值,使用 ordering 参数 + if ordering and isinstance(next(iter(ordering.values()), None), str): + # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict + # 传递 ordering 参数而不是 ordered_items,让 TipRack 自己创建 Tip 对象 + items = None + # 使用 ordering 参数,只包含位置信息(键) + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + else: + # ordering 的值已经是对象,需要过滤掉 None 值 + # 只保留有效的对象,用于 ordered_items 参数 + valid_items = {k: v for k, v in ordering.items() if v is not None} + if valid_items: + items = valid_items + ordering_param = None + else: + # 如果没有有效对象,使用 ordering 参数 + items = None + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + else: + items = None + ordering_param = None + + # 根据情况传递不同的参数 + if items is not None: + super().__init__(name, size_x, size_y, size_z, + ordered_items=items, + category=category, + model=model, **kwargs) + elif ordering_param is not None: + # 传递 ordering 参数,让 TipRack 自己创建 Tip 对象 + super().__init__(name, size_x, size_y, size_z, + ordering=ordering_param, + category=category, + model=model, **kwargs) + else: + super().__init__(name, size_x, size_y, size_z, + category=category, + model=model, **kwargs) self._unilabos_state = {} if material_info: self._unilabos_state["Material"] = material_info @@ -278,12 +368,55 @@ class PRCXI9300TubeRack(TubeRack): material_info: Optional[Dict[str, Any]] = None, **kwargs): - # 兼容处理:PLR 的 TubeRack 构造函数可能接受 items 或 ordered_items - items_to_pass = items if items is not None else ordered_items - super().__init__(name, size_x, size_y, size_z, - ordered_items=ordered_items, - model=model, - **kwargs) + # 如果 ordered_items 不为 None,直接使用 + if ordered_items is not None: + items_to_pass = ordered_items + ordering_param = None + elif ordering is not None: + # 检查 ordering 中的值是否是字符串(从 JSON 反序列化时的情况) + # 如果是字符串,说明这是位置名称,需要让 TubeRack 自己创建 Tube 对象 + # 我们只传递位置信息(键),不传递值,使用 ordering 参数 + if ordering and isinstance(next(iter(ordering.values()), None), str): + # ordering 的值是字符串,只使用键(位置信息)创建新的 OrderedDict + # 传递 ordering 参数而不是 ordered_items,让 TubeRack 自己创建 Tube 对象 + items_to_pass = None + # 使用 ordering 参数,只包含位置信息(键) + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + else: + # ordering 的值已经是对象,需要过滤掉 None 值 + # 只保留有效的对象,用于 ordered_items 参数 + valid_items = {k: v for k, v in ordering.items() if v is not None} + if valid_items: + items_to_pass = valid_items + ordering_param = None + else: + # 如果没有有效对象,使用 ordering 参数 + items_to_pass = None + ordering_param = collections.OrderedDict((k, None) for k in ordering.keys()) + elif items is not None: + # 兼容旧的 items 参数 + items_to_pass = items + ordering_param = None + else: + items_to_pass = None + ordering_param = None + + # 根据情况传递不同的参数 + if items_to_pass is not None: + super().__init__(name, size_x, size_y, size_z, + ordered_items=items_to_pass, + model=model, + **kwargs) + elif ordering_param is not None: + # 传递 ordering 参数,让 TubeRack 自己创建 Tube 对象 + super().__init__(name, size_x, size_y, size_z, + ordering=ordering_param, + model=model, + **kwargs) + else: + super().__init__(name, size_x, size_y, size_z, + model=model, + **kwargs) self._unilabos_state = {} if material_info: @@ -770,6 +903,43 @@ class PRCXI9300Handler(LiquidHandlerAbstract): async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0): return await super().move_to(well, dis_to_top, channel) + async def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool): + return await self._unilabos_backend.shaker_action(time, module_no, amplitude, is_wait) + + async def heater_action(self, temperature: float, time: int): + return await self._unilabos_backend.heater_action(temperature, time) + async def move_plate( + self, + plate: Plate, + to: Resource, + intermediate_locations: Optional[List[Coordinate]] = None, + pickup_offset: Coordinate = Coordinate.zero(), + destination_offset: Coordinate = Coordinate.zero(), + drop_direction: GripDirection = GripDirection.FRONT, + pickup_direction: GripDirection = GripDirection.FRONT, + pickup_distance_from_top: float = 13.2 - 3.33, + **backend_kwargs, + ): + + res = await super().move_plate( + plate, + to, + intermediate_locations, + pickup_offset, + destination_offset, + drop_direction, + pickup_direction, + pickup_distance_from_top, + target_plate_number = to, + **backend_kwargs, + ) + plate.unassign() + to.assign_child_resource(plate, location=Coordinate(0, 0, 0)) + ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{ + "resources": [self.deck] + }) + return res + class PRCXI9300Backend(LiquidHandlerBackend): """PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。