mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-12 10:45:10 +00:00
Compare commits
4 Commits
d81297d699
...
f90be18926
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f90be18926 | ||
|
|
604d82140d | ||
|
|
9c4fdd8001 | ||
|
|
71f6deda6b |
@@ -1,17 +1,547 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import traceback
|
||||
from typing import List, Sequence, Optional, Literal, Union, Iterator
|
||||
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from pylabrobot.liquid_handling import LiquidHandler
|
||||
from pylabrobot.resources import Resource, TipRack, Container, Coordinate, Well
|
||||
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
|
||||
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
|
||||
from pylabrobot.liquid_handling.standard import GripDirection
|
||||
from pylabrobot.resources import (
|
||||
Resource,
|
||||
TipRack,
|
||||
Container,
|
||||
Coordinate,
|
||||
Well,
|
||||
Deck,
|
||||
TipSpot,
|
||||
Plate,
|
||||
ResourceStack,
|
||||
ResourceHolder,
|
||||
Lid,
|
||||
Trash,
|
||||
Tip,
|
||||
)
|
||||
|
||||
|
||||
class LiquidHandlerAbstract(LiquidHandler):
|
||||
class LiquidHandlerMiddleware(LiquidHandler):
|
||||
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False):
|
||||
self._simulator = simulator
|
||||
if simulator:
|
||||
self._simulate_backend = LiquidHandlerChatterboxBackend(8)
|
||||
self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False)
|
||||
super().__init__(backend, deck)
|
||||
|
||||
async def setup(self, **backend_kwargs):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.setup(**backend_kwargs)
|
||||
return await super().setup(**backend_kwargs)
|
||||
|
||||
def serialize_state(self) -> Dict[str, Any]:
|
||||
if self._simulator:
|
||||
self._simulate_handler.serialize_state()
|
||||
return super().serialize_state()
|
||||
|
||||
def load_state(self, state: Dict[str, Any]):
|
||||
if self._simulator:
|
||||
self._simulate_handler.load_state(state)
|
||||
super().load_state(state)
|
||||
|
||||
def update_head_state(self, state: Dict[int, Optional[Tip]]):
|
||||
if self._simulator:
|
||||
self._simulate_handler.update_head_state(state)
|
||||
super().update_head_state(state)
|
||||
|
||||
def clear_head_state(self):
|
||||
if self._simulator:
|
||||
self._simulate_handler.clear_head_state()
|
||||
super().clear_head_state()
|
||||
|
||||
def _run_async_in_thread(self, func, *args, **kwargs):
|
||||
super()._run_async_in_thread(func, *args, **kwargs)
|
||||
|
||||
def _send_assigned_resource_to_backend(self, resource: Resource):
|
||||
if self._simulator:
|
||||
self._simulate_handler._send_assigned_resource_to_backend(resource)
|
||||
super()._send_assigned_resource_to_backend(resource)
|
||||
|
||||
def _send_unassigned_resource_to_backend(self, resource: Resource):
|
||||
if self._simulator:
|
||||
self._simulate_handler._send_unassigned_resource_to_backend(resource)
|
||||
super()._send_unassigned_resource_to_backend(resource)
|
||||
|
||||
def summary(self):
|
||||
if self._simulator:
|
||||
self._simulate_handler.summary()
|
||||
super().summary()
|
||||
|
||||
def _assert_positions_unique(self, positions: List[str]):
|
||||
super()._assert_positions_unique(positions)
|
||||
|
||||
def _assert_resources_exist(self, resources: Sequence[Resource]):
|
||||
super()._assert_resources_exist(resources)
|
||||
|
||||
def _check_args(
|
||||
self, method: Callable, backend_kwargs: Dict[str, Any], default: Set[str], strictness: Strictness
|
||||
) -> Set[str]:
|
||||
return super()._check_args(method, backend_kwargs, default, strictness)
|
||||
|
||||
def _make_sure_channels_exist(self, channels: List[int]):
|
||||
super()._make_sure_channels_exist(channels)
|
||||
|
||||
def _format_param(self, value: Any) -> Any:
|
||||
return super()._format_param(value)
|
||||
|
||||
def _log_command(self, name: str, **kwargs) -> None:
|
||||
super()._log_command(name, **kwargs)
|
||||
|
||||
async def pick_up_tips(
|
||||
self,
|
||||
tip_spots: List[TipSpot],
|
||||
use_channels: Optional[List[int]] = None,
|
||||
offsets: Optional[List[Coordinate]] = None,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
|
||||
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
|
||||
|
||||
async def drop_tips(
|
||||
self,
|
||||
tip_spots: Sequence[Union[TipSpot, Trash]],
|
||||
use_channels: Optional[List[int]] = None,
|
||||
offsets: Optional[List[Coordinate]] = None,
|
||||
allow_nonzero_volume: bool = False,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.drop_tips(
|
||||
tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs
|
||||
)
|
||||
return await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs)
|
||||
|
||||
async def return_tips(
|
||||
self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
|
||||
return await super().return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
|
||||
|
||||
async def discard_tips(
|
||||
self,
|
||||
use_channels: Optional[List[int]] = None,
|
||||
allow_nonzero_volume: bool = True,
|
||||
offsets: Optional[List[Coordinate]] = None,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
|
||||
return await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
|
||||
|
||||
def _check_containers(self, resources: Sequence[Resource]):
|
||||
super()._check_containers(resources)
|
||||
|
||||
async def aspirate(
|
||||
self,
|
||||
resources: Sequence[Container],
|
||||
vols: List[float],
|
||||
use_channels: Optional[List[int]] = None,
|
||||
flow_rates: Optional[List[Optional[float]]] = None,
|
||||
offsets: Optional[List[Coordinate]] = None,
|
||||
liquid_height: Optional[List[Optional[float]]] = None,
|
||||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||||
spread: Literal["wide", "tight", "custom"] = "wide",
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.aspirate(
|
||||
resources,
|
||||
vols,
|
||||
use_channels,
|
||||
flow_rates,
|
||||
offsets,
|
||||
liquid_height,
|
||||
blow_out_air_volume,
|
||||
spread,
|
||||
**backend_kwargs,
|
||||
)
|
||||
return await super().aspirate(
|
||||
resources,
|
||||
vols,
|
||||
use_channels,
|
||||
flow_rates,
|
||||
offsets,
|
||||
liquid_height,
|
||||
blow_out_air_volume,
|
||||
spread,
|
||||
**backend_kwargs,
|
||||
)
|
||||
|
||||
async def dispense(
|
||||
self,
|
||||
resources: Sequence[Container],
|
||||
vols: List[float],
|
||||
use_channels: Optional[List[int]] = None,
|
||||
flow_rates: Optional[List[Optional[float]]] = None,
|
||||
offsets: Optional[List[Coordinate]] = None,
|
||||
liquid_height: Optional[List[Optional[float]]] = None,
|
||||
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||||
spread: Literal["wide", "tight", "custom"] = "wide",
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.dispense(
|
||||
resources,
|
||||
vols,
|
||||
use_channels,
|
||||
flow_rates,
|
||||
offsets,
|
||||
liquid_height,
|
||||
blow_out_air_volume,
|
||||
spread,
|
||||
**backend_kwargs,
|
||||
)
|
||||
return await super().dispense(
|
||||
resources,
|
||||
vols,
|
||||
use_channels,
|
||||
flow_rates,
|
||||
offsets,
|
||||
liquid_height,
|
||||
blow_out_air_volume,
|
||||
spread,
|
||||
**backend_kwargs,
|
||||
)
|
||||
|
||||
async def transfer(
|
||||
self,
|
||||
source: Well,
|
||||
targets: List[Well],
|
||||
source_vol: Optional[float] = None,
|
||||
ratios: Optional[List[float]] = None,
|
||||
target_vols: Optional[List[float]] = None,
|
||||
aspiration_flow_rate: Optional[float] = None,
|
||||
dispense_flow_rates: Optional[List[Optional[float]]] = None,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.transfer(
|
||||
source,
|
||||
targets,
|
||||
source_vol,
|
||||
ratios,
|
||||
target_vols,
|
||||
aspiration_flow_rate,
|
||||
dispense_flow_rates,
|
||||
**backend_kwargs,
|
||||
)
|
||||
return await super().transfer(
|
||||
source,
|
||||
targets,
|
||||
source_vol,
|
||||
ratios,
|
||||
target_vols,
|
||||
aspiration_flow_rate,
|
||||
dispense_flow_rates,
|
||||
**backend_kwargs,
|
||||
)
|
||||
|
||||
def use_channels(self, channels: List[int]):
|
||||
if self._simulator:
|
||||
self._simulate_handler.use_channels(channels)
|
||||
return super().use_channels(channels)
|
||||
|
||||
async def pick_up_tips96(self, tip_rack: TipRack, offset: Coordinate = Coordinate.zero(), **backend_kwargs):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.pick_up_tips96(tip_rack, offset, **backend_kwargs)
|
||||
return await super().pick_up_tips96(tip_rack, offset, **backend_kwargs)
|
||||
|
||||
async def drop_tips96(
|
||||
self,
|
||||
resource: Union[TipRack, Trash],
|
||||
offset: Coordinate = Coordinate.zero(),
|
||||
allow_nonzero_volume: bool = False,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
|
||||
return await super().drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
|
||||
|
||||
def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]:
|
||||
return super()._get_96_head_origin_tip_rack()
|
||||
|
||||
async def return_tips96(self, allow_nonzero_volume: bool = False, **backend_kwargs):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.return_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||
return await super().return_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||
|
||||
async def discard_tips96(self, allow_nonzero_volume: bool = True, **backend_kwargs):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.discard_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||
return await super().discard_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||
|
||||
async def aspirate96(
|
||||
self,
|
||||
resource: Union[Plate, Container, List[Well]],
|
||||
volume: float,
|
||||
offset: Coordinate = Coordinate.zero(),
|
||||
flow_rate: Optional[float] = None,
|
||||
blow_out_air_volume: Optional[float] = None,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.aspirate96(
|
||||
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
|
||||
)
|
||||
return await super().aspirate96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
|
||||
|
||||
async def dispense96(
|
||||
self,
|
||||
resource: Union[Plate, Container, List[Well]],
|
||||
volume: float,
|
||||
offset: Coordinate = Coordinate.zero(),
|
||||
flow_rate: Optional[float] = None,
|
||||
blow_out_air_volume: Optional[float] = None,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.dispense96(
|
||||
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
|
||||
)
|
||||
return await super().dispense96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
|
||||
|
||||
async def stamp(
|
||||
self,
|
||||
source: Plate,
|
||||
target: Plate,
|
||||
volume: float,
|
||||
aspiration_flow_rate: Optional[float] = None,
|
||||
dispense_flow_rate: Optional[float] = None,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
|
||||
return await super().stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
|
||||
|
||||
async def pick_up_resource(
|
||||
self,
|
||||
resource: Resource,
|
||||
offset: Coordinate = Coordinate.zero(),
|
||||
pickup_distance_from_top: float = 0,
|
||||
direction: GripDirection = GripDirection.FRONT,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.pick_up_resource(
|
||||
resource, offset, pickup_distance_from_top, direction, **backend_kwargs
|
||||
)
|
||||
return await super().pick_up_resource(resource, offset, pickup_distance_from_top, direction, **backend_kwargs)
|
||||
|
||||
async def move_picked_up_resource(
|
||||
self,
|
||||
to: Coordinate,
|
||||
offset: Coordinate = Coordinate.zero(),
|
||||
direction: Optional[GripDirection] = None,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_picked_up_resource(to, offset, direction, **backend_kwargs)
|
||||
return await super().move_picked_up_resource(to, offset, direction, **backend_kwargs)
|
||||
|
||||
async def drop_resource(
|
||||
self,
|
||||
destination: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||||
offset: Coordinate = Coordinate.zero(),
|
||||
direction: GripDirection = GripDirection.FRONT,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.drop_resource(destination, offset, direction, **backend_kwargs)
|
||||
return await super().drop_resource(destination, offset, direction, **backend_kwargs)
|
||||
|
||||
async def move_resource(
|
||||
self,
|
||||
resource: Resource,
|
||||
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||||
intermediate_locations: Optional[List[Coordinate]] = None,
|
||||
pickup_offset: Coordinate = Coordinate.zero(),
|
||||
destination_offset: Coordinate = Coordinate.zero(),
|
||||
pickup_distance_from_top: float = 0,
|
||||
pickup_direction: GripDirection = GripDirection.FRONT,
|
||||
drop_direction: GripDirection = GripDirection.FRONT,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_resource(
|
||||
resource,
|
||||
to,
|
||||
intermediate_locations,
|
||||
pickup_offset,
|
||||
destination_offset,
|
||||
pickup_distance_from_top,
|
||||
pickup_direction,
|
||||
drop_direction,
|
||||
**backend_kwargs,
|
||||
)
|
||||
return await super().move_resource(
|
||||
resource,
|
||||
to,
|
||||
intermediate_locations,
|
||||
pickup_offset,
|
||||
destination_offset,
|
||||
pickup_distance_from_top,
|
||||
pickup_direction,
|
||||
drop_direction,
|
||||
**backend_kwargs,
|
||||
)
|
||||
|
||||
async def move_lid(
|
||||
self,
|
||||
lid: Lid,
|
||||
to: Union[Plate, ResourceStack, Coordinate],
|
||||
intermediate_locations: Optional[List[Coordinate]] = None,
|
||||
pickup_offset: Coordinate = Coordinate.zero(),
|
||||
destination_offset: Coordinate = Coordinate.zero(),
|
||||
pickup_direction: GripDirection = GripDirection.FRONT,
|
||||
drop_direction: GripDirection = GripDirection.FRONT,
|
||||
pickup_distance_from_top: float = 5.7 - 3.33,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_lid(
|
||||
lid,
|
||||
to,
|
||||
intermediate_locations,
|
||||
pickup_offset,
|
||||
destination_offset,
|
||||
pickup_direction,
|
||||
drop_direction,
|
||||
pickup_distance_from_top,
|
||||
**backend_kwargs,
|
||||
)
|
||||
return await super().move_lid(
|
||||
lid,
|
||||
to,
|
||||
intermediate_locations,
|
||||
pickup_offset,
|
||||
destination_offset,
|
||||
pickup_direction,
|
||||
drop_direction,
|
||||
pickup_distance_from_top,
|
||||
**backend_kwargs,
|
||||
)
|
||||
|
||||
async def move_plate(
|
||||
self,
|
||||
plate: Plate,
|
||||
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||||
intermediate_locations: Optional[List[Coordinate]] = None,
|
||||
pickup_offset: Coordinate = Coordinate.zero(),
|
||||
destination_offset: Coordinate = Coordinate.zero(),
|
||||
drop_direction: GripDirection = GripDirection.FRONT,
|
||||
pickup_direction: GripDirection = GripDirection.FRONT,
|
||||
pickup_distance_from_top: float = 13.2 - 3.33,
|
||||
**backend_kwargs,
|
||||
):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_plate(
|
||||
plate,
|
||||
to,
|
||||
intermediate_locations,
|
||||
pickup_offset,
|
||||
destination_offset,
|
||||
drop_direction,
|
||||
pickup_direction,
|
||||
pickup_distance_from_top,
|
||||
**backend_kwargs,
|
||||
)
|
||||
return await super().move_plate(
|
||||
plate,
|
||||
to,
|
||||
intermediate_locations,
|
||||
pickup_offset,
|
||||
destination_offset,
|
||||
drop_direction,
|
||||
pickup_direction,
|
||||
pickup_distance_from_top,
|
||||
**backend_kwargs,
|
||||
)
|
||||
|
||||
def serialize(self):
|
||||
if self._simulator:
|
||||
self._simulate_handler.serialize()
|
||||
return super().serialize()
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, data: dict, allow_marshal: bool = False) -> LiquidHandler:
|
||||
return super().deserialize(data, allow_marshal)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: str) -> LiquidHandler:
|
||||
return super().load(path)
|
||||
|
||||
async def prepare_for_manual_channel_operation(self, channel: int):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.prepare_for_manual_channel_operation(channel)
|
||||
return await super().prepare_for_manual_channel_operation(channel)
|
||||
|
||||
async def move_channel_x(self, channel: int, x: float):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_channel_x(channel, x)
|
||||
return await super().move_channel_x(channel, x)
|
||||
|
||||
async def move_channel_y(self, channel: int, y: float):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_channel_y(channel, y)
|
||||
return await super().move_channel_y(channel, y)
|
||||
|
||||
async def move_channel_z(self, channel: int, z: float):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.move_channel_z(channel, z)
|
||||
return await super().move_channel_z(channel, z)
|
||||
|
||||
def assign_child_resource(self, resource: Resource, location: Optional[Coordinate], reassign: bool = True):
|
||||
if self._simulator:
|
||||
self._simulate_handler.assign_child_resource(resource, location, reassign)
|
||||
pass
|
||||
|
||||
async def probe_tip_presence_via_pickup(
|
||||
self, tip_spots: List[TipSpot], use_channels: Optional[List[int]] = None
|
||||
) -> Dict[str, bool]:
|
||||
if self._simulator:
|
||||
await self._simulate_handler.probe_tip_presence_via_pickup(tip_spots, use_channels)
|
||||
return await super().probe_tip_presence_via_pickup(tip_spots, use_channels)
|
||||
|
||||
async def probe_tip_inventory(
|
||||
self,
|
||||
tip_spots: List[TipSpot],
|
||||
probing_fn: Optional[TipPresenceProbingMethod] = None,
|
||||
use_channels: Optional[List[int]] = None,
|
||||
) -> Dict[str, bool]:
|
||||
if self._simulator:
|
||||
await self._simulate_handler.probe_tip_inventory(tip_spots, probing_fn, use_channels)
|
||||
return await super().probe_tip_inventory(tip_spots, probing_fn, use_channels)
|
||||
|
||||
async def consolidate_tip_inventory(self, tip_racks: List[TipRack], use_channels: Optional[List[int]] = None):
|
||||
if self._simulator:
|
||||
await self._simulate_handler.consolidate_tip_inventory(tip_racks, use_channels)
|
||||
return await super().consolidate_tip_inventory(tip_racks, use_channels)
|
||||
|
||||
|
||||
class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
||||
"""Extended LiquidHandler with additional operations."""
|
||||
support_touch_tip = True
|
||||
|
||||
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool):
|
||||
"""Initialize a LiquidHandler.
|
||||
|
||||
Args:
|
||||
backend: Backend to use.
|
||||
deck: Deck to use.
|
||||
"""
|
||||
self._simulator = simulator
|
||||
super().__init__(backend, deck, simulator)
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# REMOVE LIQUID --------------------------------------------------
|
||||
@@ -48,43 +578,99 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
none_keys: List[str] = [],
|
||||
):
|
||||
"""A complete *remove* (aspirate → waste) operation."""
|
||||
trash = self.deck.get_trash_area()
|
||||
|
||||
try:
|
||||
if is_96_well:
|
||||
pass # This mode is not verified
|
||||
pass # This mode is not verified.
|
||||
else:
|
||||
if len(vols) != len(sources):
|
||||
raise ValueError("Length of `vols` must match `sources`.")
|
||||
|
||||
for src, vol in zip(sources, vols):
|
||||
await self.move_to(src, dis_to_top=top[0] if top else 0)
|
||||
tip = next(self.current_tip)
|
||||
# 首先应该对任务分组,然后每次1个/8个进行操作处理
|
||||
if len(use_channels) == 1:
|
||||
tip = []
|
||||
for _ in range(len(use_channels)):
|
||||
tip.extend(next(self.current_tip))
|
||||
await self.pick_up_tips(tip)
|
||||
await self.aspirate(
|
||||
resources=[src],
|
||||
vols=[vol],
|
||||
use_channels=use_channels, # only aspirate96 used, default to None
|
||||
flow_rates=[flow_rates[0]] if flow_rates else None,
|
||||
offsets=[offsets[0]] if offsets else None,
|
||||
liquid_height=[liquid_height[0]] if liquid_height else None,
|
||||
blow_out_air_volume=blow_out_air_volume[0] if blow_out_air_volume else None,
|
||||
spread=spread,
|
||||
)
|
||||
await self.custom_delay(seconds=delays[0] if delays else 0)
|
||||
await self.dispense(
|
||||
resources=waste_liquid,
|
||||
vols=[vol],
|
||||
use_channels=use_channels,
|
||||
flow_rates=[flow_rates[1]] if flow_rates else None,
|
||||
offsets=[offsets[1]] if offsets else None,
|
||||
liquid_height=[liquid_height[1]] if liquid_height else None,
|
||||
blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None,
|
||||
spread=spread,
|
||||
)
|
||||
await self.discard_tips() # For now, each of tips is discarded after use
|
||||
|
||||
for _ in range(len(waste_liquid)):
|
||||
await self.aspirate(
|
||||
resources=sources,
|
||||
vols=[vols[_]],
|
||||
use_channels=use_channels,
|
||||
flow_rates=[flow_rates[0]] if flow_rates else None,
|
||||
offsets=[offsets[0]] if offsets else None,
|
||||
liquid_height=[liquid_height[0]] if liquid_height else None,
|
||||
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
|
||||
spread=spread,
|
||||
)
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[0])
|
||||
await self.dispense(
|
||||
resources=[waste_liquid[_]],
|
||||
vols=[vols[_]],
|
||||
use_channels=use_channels,
|
||||
flow_rates=[flow_rates[1]] if flow_rates else None,
|
||||
offsets=[offsets[1]] if offsets else None,
|
||||
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None,
|
||||
liquid_height=[liquid_height[1]] if liquid_height else None,
|
||||
spread=spread,
|
||||
)
|
||||
await self.discard_tips()
|
||||
elif len(use_channels) == 8:
|
||||
tip = []
|
||||
for _ in range(len(use_channels)):
|
||||
tip.extend(next(self.current_tip))
|
||||
await self.pick_up_tips(tip)
|
||||
|
||||
# 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理
|
||||
if len(sources) % 8 != 0:
|
||||
raise ValueError(f"Length of `sources` {len(sources)} must be a multiple of 8 for 8-channel mode.")
|
||||
|
||||
# 8个8个来取任务序列
|
||||
|
||||
for i in range(0, len(sources), 8):
|
||||
# 取出8个任务
|
||||
current_targets = waste_liquid[i:i + 8]
|
||||
current_reagent_sources = sources[i:i + 8]
|
||||
current_asp_vols = vols[i:i + 8]
|
||||
current_dis_vols = vols[i:i + 8]
|
||||
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
|
||||
current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8
|
||||
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
||||
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
|
||||
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
||||
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
|
||||
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
|
||||
|
||||
await self.aspirate(
|
||||
resources=current_reagent_sources,
|
||||
vols=current_asp_vols,
|
||||
use_channels=use_channels,
|
||||
flow_rates=current_asp_flow_rates,
|
||||
offsets=current_asp_offset,
|
||||
liquid_height=current_asp_liquid_height,
|
||||
blow_out_air_volume=current_asp_blow_out_air_volume,
|
||||
spread=spread,
|
||||
)
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[0])
|
||||
await self.dispense(
|
||||
resources=current_targets,
|
||||
vols=current_dis_vols,
|
||||
use_channels=use_channels,
|
||||
flow_rates=current_dis_flow_rates,
|
||||
offsets=current_dis_offset,
|
||||
liquid_height=current_dis_liquid_height,
|
||||
blow_out_air_volume=current_dis_blow_out_air_volume,
|
||||
spread=spread,
|
||||
)
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[1])
|
||||
await self.touch_tip(current_targets)
|
||||
await self.discard_tips()
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Liquid removal failed: {e}") from e
|
||||
traceback.print_exc()
|
||||
raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# ADD LIQUID -----------------------------------------------------
|
||||
@@ -119,7 +705,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
else:
|
||||
if len(asp_vols) != len(targets):
|
||||
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
|
||||
|
||||
|
||||
# 首先应该对任务分组,然后每次1个/8个进行操作处理
|
||||
if len(use_channels) == 1:
|
||||
tip = []
|
||||
@@ -163,7 +749,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[1])
|
||||
await self.touch_tip(targets[_])
|
||||
await self.discard_tips()
|
||||
|
||||
elif len(use_channels) == 8:
|
||||
tip = []
|
||||
for _ in range(len(use_channels)):
|
||||
@@ -173,7 +759,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
# 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理
|
||||
if len(targets) % 8 != 0:
|
||||
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
|
||||
|
||||
|
||||
# 8个8个来取任务序列
|
||||
|
||||
for i in range(0, len(targets), 8):
|
||||
@@ -183,14 +769,14 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
current_asp_vols = asp_vols[i:i + 8]
|
||||
current_dis_vols = dis_vols[i:i + 8]
|
||||
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
|
||||
current_dis_flow_rates = flow_rates[i+8 :i + 16] if flow_rates else [None] * 8
|
||||
current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8
|
||||
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
||||
current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8
|
||||
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
|
||||
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
||||
current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8
|
||||
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
|
||||
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||
current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8
|
||||
|
||||
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
|
||||
|
||||
await self.aspirate(
|
||||
resources=current_reagent_sources,
|
||||
vols=current_asp_vols,
|
||||
@@ -226,7 +812,8 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
)
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[1])
|
||||
#await self.touch_tip(current_targets)
|
||||
await self.touch_tip(current_targets)
|
||||
await self.discard_tips()
|
||||
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
@@ -281,7 +868,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
else:
|
||||
if len(asp_vols) != len(targets):
|
||||
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
|
||||
|
||||
|
||||
# 首先应该对任务分组,然后每次1个/8个进行操作处理
|
||||
if len(use_channels) == 1:
|
||||
tip = []
|
||||
@@ -336,7 +923,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
# 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理
|
||||
if len(targets) % 8 != 0:
|
||||
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
|
||||
|
||||
|
||||
# 8个8个来取任务序列
|
||||
|
||||
for i in range(0, len(targets), 8):
|
||||
@@ -345,13 +932,13 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
current_reagent_sources = sources[i:i + 8]
|
||||
current_asp_vols = asp_vols[i:i + 8]
|
||||
current_dis_vols = dis_vols[i:i + 8]
|
||||
current_asp_flow_rates = asp_flow_rates[i:i + 8]
|
||||
current_asp_flow_rates = asp_flow_rates[i:i + 8]
|
||||
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
||||
current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8
|
||||
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
|
||||
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
||||
current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8
|
||||
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
|
||||
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||
current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8
|
||||
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
|
||||
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8
|
||||
|
||||
await self.aspirate(
|
||||
@@ -364,7 +951,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
liquid_height=current_asp_liquid_height,
|
||||
spread=spread,
|
||||
)
|
||||
|
||||
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[0])
|
||||
await self.dispense(
|
||||
@@ -390,8 +977,9 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
)
|
||||
if delays is not None:
|
||||
await self.custom_delay(seconds=delays[1])
|
||||
await self.touch_tip(current_targets)
|
||||
await self.discard_tips()
|
||||
|
||||
#await self.touch_tip(current_targets)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||||
@@ -417,7 +1005,11 @@ class LiquidHandlerAbstract(LiquidHandler):
|
||||
print(f"Current time: {time.strftime('%H:%M:%S')}")
|
||||
|
||||
async def touch_tip(self, targets: Sequence[Container]):
|
||||
|
||||
"""Touch the tip to the side of the well."""
|
||||
|
||||
if not self.support_touch_tip:
|
||||
return
|
||||
await self.aspirate(
|
||||
resources=[targets],
|
||||
vols=[0],
|
||||
|
||||
@@ -14,7 +14,7 @@ from pylabrobot.liquid_handling import (
|
||||
SingleChannelDispense,
|
||||
PickupTipRack,
|
||||
DropTipRack,
|
||||
MultiHeadAspirationPlate,
|
||||
MultiHeadAspirationPlate, ChatterBoxBackend, LiquidHandlerChatterboxBackend,
|
||||
)
|
||||
from pylabrobot.liquid_handling.standard import (
|
||||
MultiHeadAspirationContainer,
|
||||
@@ -87,7 +87,31 @@ class PRCXI9300Container(Plate):
|
||||
return data
|
||||
|
||||
|
||||
class PRCXI9300Trash(Trash):
|
||||
"""PRCXI 9300 的专用 Trash 类,继承自 Trash。
|
||||
|
||||
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str):
|
||||
super().__init__(name, size_x, size_y, size_z, category=category)
|
||||
self._unilabos_state = {}
|
||||
|
||||
def load_state(self, state: Dict[str, Any]) -> None:
|
||||
"""从给定的状态加载工作台信息。"""
|
||||
#super().load_state(state)
|
||||
self._unilabos_state = state
|
||||
|
||||
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
|
||||
data = super().serialize_state()
|
||||
data.update(self._unilabos_state)
|
||||
return data
|
||||
|
||||
|
||||
|
||||
|
||||
class PRCXI9300Handler(LiquidHandlerAbstract):
|
||||
support_touch_tip = False
|
||||
@property
|
||||
def reset_ok(self) -> bool:
|
||||
"""检查设备是否已重置成功。"""
|
||||
@@ -105,7 +129,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
||||
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
|
||||
)
|
||||
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup, debug)
|
||||
super().__init__(backend=self._unilabos_backend, deck=deck)
|
||||
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=True)
|
||||
|
||||
async def create_protocol(
|
||||
self,
|
||||
@@ -434,6 +458,35 @@ class PRCXI9300Backend(LiquidHandlerBackend):
|
||||
|
||||
"""Pick up tips from the specified resource."""
|
||||
|
||||
|
||||
plate = ops[0].resource.parent.parent
|
||||
deck = plate.parent
|
||||
plate_index = deck.children.index(plate)
|
||||
|
||||
if deck.children[plate_index].name == "trash":
|
||||
step = self.api_client.UnLoad(
|
||||
"Left",
|
||||
dosage=0,
|
||||
plate_no=plate_index+1,
|
||||
is_whole_plate=False,
|
||||
hole_row=1,
|
||||
hole_col=2,# 强制投放第二列了
|
||||
blending_times=0,
|
||||
balance_height=0,
|
||||
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
|
||||
hole_numbers="1,2,3,4,5,6,7,8",
|
||||
)
|
||||
self.steps_todo_list.append(step)
|
||||
|
||||
|
||||
return
|
||||
|
||||
# 判断是不是个trash?
|
||||
# if all(isinstance(op.resource, Trash) for op in ops):
|
||||
# print("All drop operations are for trash.")
|
||||
# await self.discard_tips()
|
||||
# return
|
||||
|
||||
if len(ops) != 8:
|
||||
raise ValueError(f"PRCXI9300Backend drop_tips: Expected 8 pickups, got {len(ops)}")
|
||||
|
||||
@@ -1001,7 +1054,7 @@ if __name__ == "__main__":
|
||||
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
|
||||
}
|
||||
})
|
||||
plate6 = PRCXI9300Container(name="plateT6", size_x=50, size_y=50, size_z=10, category="plate")
|
||||
plate6 = PRCXI9300Trash(name="trash", size_x=50, size_y=50, size_z=10, category="trash")
|
||||
plate6.load_state({
|
||||
"Material": {
|
||||
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
|
||||
@@ -1009,13 +1062,15 @@ if __name__ == "__main__":
|
||||
})
|
||||
|
||||
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul
|
||||
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
|
||||
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
|
||||
|
||||
tip_rack = tipone_96_tiprack_200ul("TipRack")
|
||||
well_containers = corning_96_wellplate_360ul_flat("Plate")
|
||||
# from pprint import pprint
|
||||
# pprint(well_containers.children)
|
||||
plate1.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
|
||||
plate2.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
|
||||
|
||||
deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
|
||||
deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
|
||||
deck.assign_child_resource(plate3, location=Coordinate(0, 0, 0))
|
||||
@@ -1027,10 +1082,10 @@ if __name__ == "__main__":
|
||||
handler.set_tiprack([tip_rack]) # Set the tip rack for the handler
|
||||
asyncio.run(handler.setup()) # Initialize the handler and setup the connection
|
||||
asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection
|
||||
# asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7]))
|
||||
#asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7]))
|
||||
# asyncio.run(handler.aspirate(well_containers.children[:8],[50]*8, [0,1,2,3,4,5,6,7]))
|
||||
# asyncio.run(handler.dispense(well_containers.children[:8],[50]*8,[0,1,2,3,4,5,6,7]))
|
||||
# asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
|
||||
#asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
|
||||
# asyncio.run(handler.mix(well_containers.children[:8], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
|
||||
#print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
|
||||
asyncio.run(handler.add_liquid(
|
||||
@@ -1039,8 +1094,8 @@ if __name__ == "__main__":
|
||||
reagent_sources=well_containers.children[-16:],
|
||||
targets=well_containers.children[:16],
|
||||
use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
|
||||
flow_rates=[None] * 16,
|
||||
offsets=[Coordinate(0, 0, 0)] * 16,
|
||||
flow_rates=[None] * 32,
|
||||
offsets=[Coordinate(0, 0, 0)] * 32,
|
||||
liquid_height=[None] * 16,
|
||||
blow_out_air_volume=[None] * 16,
|
||||
delays=None,
|
||||
@@ -1049,6 +1104,17 @@ if __name__ == "__main__":
|
||||
spread="wide",
|
||||
))
|
||||
|
||||
# asyncio.run(handler.remove_liquid(
|
||||
# vols=[100]*16,
|
||||
# sources=well_containers.children[-16:],
|
||||
# waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
|
||||
# use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
|
||||
# flow_rates=[None] * 32,
|
||||
# offsets=[Coordinate(0, 0, 0)] * 32,
|
||||
# liquid_height=[None] * 32,
|
||||
# blow_out_air_volume=[None] * 32,
|
||||
# spread="wide",
|
||||
# ))
|
||||
# asyncio.run(handler.transfer_liquid(
|
||||
# asp_vols=[100]*16,
|
||||
# dis_vols=[100]*16,
|
||||
|
||||
Reference in New Issue
Block a user