From 9c4fdd800192cafc197cda1e306b1016abcc12be Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 16 Jul 2025 20:07:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Esimulator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liquid_handler_abstract.py | 553 +++++++++++++++++- .../devices/liquid_handling/prcxi/prcxi.py | 4 +- 2 files changed, 543 insertions(+), 14 deletions(-) diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 9795dc8..cf4a625 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -1,18 +1,547 @@ from __future__ import annotations import traceback -from typing import List, Sequence, Optional, Literal, Union, Iterator +from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set import asyncio import time -from pylabrobot.liquid_handling import LiquidHandler -from pylabrobot.resources import Resource, TipRack, Container, Coordinate, Well +from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness +from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod +from pylabrobot.liquid_handling.standard import GripDirection +from pylabrobot.resources import ( + Resource, + TipRack, + Container, + Coordinate, + Well, + Deck, + TipSpot, + Plate, + ResourceStack, + ResourceHolder, + Lid, + Trash, + Tip, +) -class LiquidHandlerAbstract(LiquidHandler): +class LiquidHandlerMiddleware(LiquidHandler): + def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False): + self._simulator = simulator + if simulator: + self._simulate_backend = LiquidHandlerChatterboxBackend(8) + self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False) + super().__init__(backend, deck) + + async def setup(self, **backend_kwargs): + if self._simulator: + await self._simulate_handler.setup(**backend_kwargs) + return await super().setup(**backend_kwargs) + + def serialize_state(self) -> Dict[str, Any]: + if self._simulator: + self._simulate_handler.serialize_state() + return super().serialize_state() + + def load_state(self, state: Dict[str, Any]): + if self._simulator: + self._simulate_handler.load_state(state) + super().load_state(state) + + def update_head_state(self, state: Dict[int, Optional[Tip]]): + if self._simulator: + self._simulate_handler.update_head_state(state) + super().update_head_state(state) + + def clear_head_state(self): + if self._simulator: + self._simulate_handler.clear_head_state() + super().clear_head_state() + + def _run_async_in_thread(self, func, *args, **kwargs): + super()._run_async_in_thread(func, *args, **kwargs) + + def _send_assigned_resource_to_backend(self, resource: Resource): + if self._simulator: + self._simulate_handler._send_assigned_resource_to_backend(resource) + super()._send_assigned_resource_to_backend(resource) + + def _send_unassigned_resource_to_backend(self, resource: Resource): + if self._simulator: + self._simulate_handler._send_unassigned_resource_to_backend(resource) + super()._send_unassigned_resource_to_backend(resource) + + def summary(self): + if self._simulator: + self._simulate_handler.summary() + super().summary() + + def _assert_positions_unique(self, positions: List[str]): + super()._assert_positions_unique(positions) + + def _assert_resources_exist(self, resources: Sequence[Resource]): + super()._assert_resources_exist(resources) + + def _check_args( + self, method: Callable, backend_kwargs: Dict[str, Any], default: Set[str], strictness: Strictness + ) -> Set[str]: + return super()._check_args(method, backend_kwargs, default, strictness) + + def _make_sure_channels_exist(self, channels: List[int]): + super()._make_sure_channels_exist(channels) + + def _format_param(self, value: Any) -> Any: + return super()._format_param(value) + + def _log_command(self, name: str, **kwargs) -> None: + super()._log_command(name, **kwargs) + + async def pick_up_tips( + self, + tip_spots: List[TipSpot], + use_channels: Optional[List[int]] = None, + offsets: Optional[List[Coordinate]] = None, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs) + return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs) + + async def drop_tips( + self, + tip_spots: Sequence[Union[TipSpot, Trash]], + use_channels: Optional[List[int]] = None, + offsets: Optional[List[Coordinate]] = None, + allow_nonzero_volume: bool = False, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.drop_tips( + tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs + ) + return await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs) + + async def return_tips( + self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs + ): + if self._simulator: + await self._simulate_handler.return_tips(use_channels, allow_nonzero_volume, **backend_kwargs) + return await super().return_tips(use_channels, allow_nonzero_volume, **backend_kwargs) + + async def discard_tips( + self, + use_channels: Optional[List[int]] = None, + allow_nonzero_volume: bool = True, + offsets: Optional[List[Coordinate]] = None, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs) + return await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs) + + def _check_containers(self, resources: Sequence[Resource]): + super()._check_containers(resources) + + async def aspirate( + self, + resources: Sequence[Container], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Optional[List[Coordinate]] = None, + liquid_height: Optional[List[Optional[float]]] = None, + blow_out_air_volume: Optional[List[Optional[float]]] = None, + spread: Literal["wide", "tight", "custom"] = "wide", + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.aspirate( + resources, + vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + return await super().aspirate( + resources, + vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + + async def dispense( + self, + resources: Sequence[Container], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Optional[List[Coordinate]] = None, + liquid_height: Optional[List[Optional[float]]] = None, + blow_out_air_volume: Optional[List[Optional[float]]] = None, + spread: Literal["wide", "tight", "custom"] = "wide", + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.dispense( + resources, + vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + return await super().dispense( + resources, + vols, + use_channels, + flow_rates, + offsets, + liquid_height, + blow_out_air_volume, + spread, + **backend_kwargs, + ) + + async def transfer( + self, + source: Well, + targets: List[Well], + source_vol: Optional[float] = None, + ratios: Optional[List[float]] = None, + target_vols: Optional[List[float]] = None, + aspiration_flow_rate: Optional[float] = None, + dispense_flow_rates: Optional[List[Optional[float]]] = None, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.transfer( + source, + targets, + source_vol, + ratios, + target_vols, + aspiration_flow_rate, + dispense_flow_rates, + **backend_kwargs, + ) + return await super().transfer( + source, + targets, + source_vol, + ratios, + target_vols, + aspiration_flow_rate, + dispense_flow_rates, + **backend_kwargs, + ) + + def use_channels(self, channels: List[int]): + if self._simulator: + self._simulate_handler.use_channels(channels) + return super().use_channels(channels) + + async def pick_up_tips96(self, tip_rack: TipRack, offset: Coordinate = Coordinate.zero(), **backend_kwargs): + if self._simulator: + await self._simulate_handler.pick_up_tips96(tip_rack, offset, **backend_kwargs) + return await super().pick_up_tips96(tip_rack, offset, **backend_kwargs) + + async def drop_tips96( + self, + resource: Union[TipRack, Trash], + offset: Coordinate = Coordinate.zero(), + allow_nonzero_volume: bool = False, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs) + return await super().drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs) + + def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]: + return super()._get_96_head_origin_tip_rack() + + async def return_tips96(self, allow_nonzero_volume: bool = False, **backend_kwargs): + if self._simulator: + await self._simulate_handler.return_tips96(allow_nonzero_volume, **backend_kwargs) + return await super().return_tips96(allow_nonzero_volume, **backend_kwargs) + + async def discard_tips96(self, allow_nonzero_volume: bool = True, **backend_kwargs): + if self._simulator: + await self._simulate_handler.discard_tips96(allow_nonzero_volume, **backend_kwargs) + return await super().discard_tips96(allow_nonzero_volume, **backend_kwargs) + + async def aspirate96( + self, + resource: Union[Plate, Container, List[Well]], + volume: float, + offset: Coordinate = Coordinate.zero(), + flow_rate: Optional[float] = None, + blow_out_air_volume: Optional[float] = None, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.aspirate96( + resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs + ) + return await super().aspirate96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs) + + async def dispense96( + self, + resource: Union[Plate, Container, List[Well]], + volume: float, + offset: Coordinate = Coordinate.zero(), + flow_rate: Optional[float] = None, + blow_out_air_volume: Optional[float] = None, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.dispense96( + resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs + ) + return await super().dispense96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs) + + async def stamp( + self, + source: Plate, + target: Plate, + volume: float, + aspiration_flow_rate: Optional[float] = None, + dispense_flow_rate: Optional[float] = None, + ): + if self._simulator: + await self._simulate_handler.stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate) + return await super().stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate) + + async def pick_up_resource( + self, + resource: Resource, + offset: Coordinate = Coordinate.zero(), + pickup_distance_from_top: float = 0, + direction: GripDirection = GripDirection.FRONT, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.pick_up_resource( + resource, offset, pickup_distance_from_top, direction, **backend_kwargs + ) + return await super().pick_up_resource(resource, offset, pickup_distance_from_top, direction, **backend_kwargs) + + async def move_picked_up_resource( + self, + to: Coordinate, + offset: Coordinate = Coordinate.zero(), + direction: Optional[GripDirection] = None, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.move_picked_up_resource(to, offset, direction, **backend_kwargs) + return await super().move_picked_up_resource(to, offset, direction, **backend_kwargs) + + async def drop_resource( + self, + destination: Union[ResourceStack, ResourceHolder, Resource, Coordinate], + offset: Coordinate = Coordinate.zero(), + direction: GripDirection = GripDirection.FRONT, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.drop_resource(destination, offset, direction, **backend_kwargs) + return await super().drop_resource(destination, offset, direction, **backend_kwargs) + + async def move_resource( + self, + resource: Resource, + to: Union[ResourceStack, ResourceHolder, Resource, Coordinate], + intermediate_locations: Optional[List[Coordinate]] = None, + pickup_offset: Coordinate = Coordinate.zero(), + destination_offset: Coordinate = Coordinate.zero(), + pickup_distance_from_top: float = 0, + pickup_direction: GripDirection = GripDirection.FRONT, + drop_direction: GripDirection = GripDirection.FRONT, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.move_resource( + resource, + to, + intermediate_locations, + pickup_offset, + destination_offset, + pickup_distance_from_top, + pickup_direction, + drop_direction, + **backend_kwargs, + ) + return await super().move_resource( + resource, + to, + intermediate_locations, + pickup_offset, + destination_offset, + pickup_distance_from_top, + pickup_direction, + drop_direction, + **backend_kwargs, + ) + + async def move_lid( + self, + lid: Lid, + to: Union[Plate, ResourceStack, Coordinate], + intermediate_locations: Optional[List[Coordinate]] = None, + pickup_offset: Coordinate = Coordinate.zero(), + destination_offset: Coordinate = Coordinate.zero(), + pickup_direction: GripDirection = GripDirection.FRONT, + drop_direction: GripDirection = GripDirection.FRONT, + pickup_distance_from_top: float = 5.7 - 3.33, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.move_lid( + lid, + to, + intermediate_locations, + pickup_offset, + destination_offset, + pickup_direction, + drop_direction, + pickup_distance_from_top, + **backend_kwargs, + ) + return await super().move_lid( + lid, + to, + intermediate_locations, + pickup_offset, + destination_offset, + pickup_direction, + drop_direction, + pickup_distance_from_top, + **backend_kwargs, + ) + + async def move_plate( + self, + plate: Plate, + to: Union[ResourceStack, ResourceHolder, Resource, Coordinate], + intermediate_locations: Optional[List[Coordinate]] = None, + pickup_offset: Coordinate = Coordinate.zero(), + destination_offset: Coordinate = Coordinate.zero(), + drop_direction: GripDirection = GripDirection.FRONT, + pickup_direction: GripDirection = GripDirection.FRONT, + pickup_distance_from_top: float = 13.2 - 3.33, + **backend_kwargs, + ): + if self._simulator: + await self._simulate_handler.move_plate( + plate, + to, + intermediate_locations, + pickup_offset, + destination_offset, + drop_direction, + pickup_direction, + pickup_distance_from_top, + **backend_kwargs, + ) + return await super().move_plate( + plate, + to, + intermediate_locations, + pickup_offset, + destination_offset, + drop_direction, + pickup_direction, + pickup_distance_from_top, + **backend_kwargs, + ) + + def serialize(self): + if self._simulator: + self._simulate_handler.serialize() + return super().serialize() + + @classmethod + def deserialize(cls, data: dict, allow_marshal: bool = False) -> LiquidHandler: + return super().deserialize(data, allow_marshal) + + @classmethod + def load(cls, path: str) -> LiquidHandler: + return super().load(path) + + async def prepare_for_manual_channel_operation(self, channel: int): + if self._simulator: + await self._simulate_handler.prepare_for_manual_channel_operation(channel) + return await super().prepare_for_manual_channel_operation(channel) + + async def move_channel_x(self, channel: int, x: float): + if self._simulator: + await self._simulate_handler.move_channel_x(channel, x) + return await super().move_channel_x(channel, x) + + async def move_channel_y(self, channel: int, y: float): + if self._simulator: + await self._simulate_handler.move_channel_y(channel, y) + return await super().move_channel_y(channel, y) + + async def move_channel_z(self, channel: int, z: float): + if self._simulator: + await self._simulate_handler.move_channel_z(channel, z) + return await super().move_channel_z(channel, z) + + def assign_child_resource(self, resource: Resource, location: Optional[Coordinate], reassign: bool = True): + if self._simulator: + self._simulate_handler.assign_child_resource(resource, location, reassign) + pass + + async def probe_tip_presence_via_pickup( + self, tip_spots: List[TipSpot], use_channels: Optional[List[int]] = None + ) -> Dict[str, bool]: + if self._simulator: + await self._simulate_handler.probe_tip_presence_via_pickup(tip_spots, use_channels) + return await super().probe_tip_presence_via_pickup(tip_spots, use_channels) + + async def probe_tip_inventory( + self, + tip_spots: List[TipSpot], + probing_fn: Optional[TipPresenceProbingMethod] = None, + use_channels: Optional[List[int]] = None, + ) -> Dict[str, bool]: + if self._simulator: + await self._simulate_handler.probe_tip_inventory(tip_spots, probing_fn, use_channels) + return await super().probe_tip_inventory(tip_spots, probing_fn, use_channels) + + async def consolidate_tip_inventory(self, tip_racks: List[TipRack], use_channels: Optional[List[int]] = None): + if self._simulator: + await self._simulate_handler.consolidate_tip_inventory(tip_racks, use_channels) + return await super().consolidate_tip_inventory(tip_racks, use_channels) + + +class LiquidHandlerAbstract(LiquidHandlerMiddleware): """Extended LiquidHandler with additional operations.""" + def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool): + """Initialize a LiquidHandler. + + Args: + backend: Backend to use. + deck: Deck to use. + """ + self._simulator = simulator + super().__init__(backend, deck, simulator) + # --------------------------------------------------------------- # REMOVE LIQUID -------------------------------------------------- # --------------------------------------------------------------- @@ -52,7 +581,7 @@ class LiquidHandlerAbstract(LiquidHandler): try: if is_96_well: pass # This mode is not verified. - else: + else: # 首先应该对任务分组,然后每次1个/8个进行操作处理 if len(use_channels) == 1: tip = [] @@ -175,7 +704,7 @@ class LiquidHandlerAbstract(LiquidHandler): else: if len(asp_vols) != len(targets): raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") - + # 首先应该对任务分组,然后每次1个/8个进行操作处理 if len(use_channels) == 1: tip = [] @@ -229,7 +758,7 @@ class LiquidHandlerAbstract(LiquidHandler): # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 if len(targets) % 8 != 0: raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") - + # 8个8个来取任务序列 for i in range(0, len(targets), 8): @@ -337,7 +866,7 @@ class LiquidHandlerAbstract(LiquidHandler): else: if len(asp_vols) != len(targets): raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") - + # 首先应该对任务分组,然后每次1个/8个进行操作处理 if len(use_channels) == 1: tip = [] @@ -392,7 +921,7 @@ class LiquidHandlerAbstract(LiquidHandler): # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 if len(targets) % 8 != 0: raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") - + # 8个8个来取任务序列 for i in range(0, len(targets), 8): @@ -401,7 +930,7 @@ class LiquidHandlerAbstract(LiquidHandler): current_reagent_sources = sources[i:i + 8] current_asp_vols = asp_vols[i:i + 8] current_dis_vols = dis_vols[i:i + 8] - current_asp_flow_rates = asp_flow_rates[i:i + 8] + current_asp_flow_rates = asp_flow_rates[i:i + 8] current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8 current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 @@ -420,7 +949,7 @@ class LiquidHandlerAbstract(LiquidHandler): liquid_height=current_asp_liquid_height, spread=spread, ) - + if delays is not None: await self.custom_delay(seconds=delays[0]) await self.dispense( @@ -446,7 +975,7 @@ class LiquidHandlerAbstract(LiquidHandler): ) if delays is not None: await self.custom_delay(seconds=delays[1]) - + #await self.touch_tip(current_targets) except Exception as e: traceback.print_exc() diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 75dd31a..122607b 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -14,7 +14,7 @@ from pylabrobot.liquid_handling import ( SingleChannelDispense, PickupTipRack, DropTipRack, - MultiHeadAspirationPlate, + MultiHeadAspirationPlate, ChatterBoxBackend, LiquidHandlerChatterboxBackend, ) from pylabrobot.liquid_handling.standard import ( MultiHeadAspirationContainer, @@ -105,7 +105,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"]) ) self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup, debug) - super().__init__(backend=self._unilabos_backend, deck=deck) + super().__init__(backend=self._unilabos_backend, deck=deck, simulator=True) async def create_protocol( self,