Compare commits

...

4 Commits

Author SHA1 Message Date
Guangxin Zhang
f90be18926 Update trash 2025-07-16 21:34:25 +08:00
Guangxin Zhang
604d82140d Update prcxi.py 2025-07-16 21:04:58 +08:00
Xuwznln
9c4fdd8001 新增simulator 2025-07-16 20:07:23 +08:00
Guangxin Zhang
71f6deda6b Update prcxi.py 2025-07-16 18:22:02 +08:00
2 changed files with 718 additions and 60 deletions

View File

@@ -1,17 +1,547 @@
from __future__ import annotations
import traceback
from typing import List, Sequence, Optional, Literal, Union, Iterator
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set
import asyncio
import time
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.resources import Resource, TipRack, Container, Coordinate, Well
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
from pylabrobot.liquid_handling.standard import GripDirection
from pylabrobot.resources import (
Resource,
TipRack,
Container,
Coordinate,
Well,
Deck,
TipSpot,
Plate,
ResourceStack,
ResourceHolder,
Lid,
Trash,
Tip,
)
class LiquidHandlerAbstract(LiquidHandler):
class LiquidHandlerMiddleware(LiquidHandler):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False):
self._simulator = simulator
if simulator:
self._simulate_backend = LiquidHandlerChatterboxBackend(8)
self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False)
super().__init__(backend, deck)
async def setup(self, **backend_kwargs):
if self._simulator:
await self._simulate_handler.setup(**backend_kwargs)
return await super().setup(**backend_kwargs)
def serialize_state(self) -> Dict[str, Any]:
if self._simulator:
self._simulate_handler.serialize_state()
return super().serialize_state()
def load_state(self, state: Dict[str, Any]):
if self._simulator:
self._simulate_handler.load_state(state)
super().load_state(state)
def update_head_state(self, state: Dict[int, Optional[Tip]]):
if self._simulator:
self._simulate_handler.update_head_state(state)
super().update_head_state(state)
def clear_head_state(self):
if self._simulator:
self._simulate_handler.clear_head_state()
super().clear_head_state()
def _run_async_in_thread(self, func, *args, **kwargs):
super()._run_async_in_thread(func, *args, **kwargs)
def _send_assigned_resource_to_backend(self, resource: Resource):
if self._simulator:
self._simulate_handler._send_assigned_resource_to_backend(resource)
super()._send_assigned_resource_to_backend(resource)
def _send_unassigned_resource_to_backend(self, resource: Resource):
if self._simulator:
self._simulate_handler._send_unassigned_resource_to_backend(resource)
super()._send_unassigned_resource_to_backend(resource)
def summary(self):
if self._simulator:
self._simulate_handler.summary()
super().summary()
def _assert_positions_unique(self, positions: List[str]):
super()._assert_positions_unique(positions)
def _assert_resources_exist(self, resources: Sequence[Resource]):
super()._assert_resources_exist(resources)
def _check_args(
self, method: Callable, backend_kwargs: Dict[str, Any], default: Set[str], strictness: Strictness
) -> Set[str]:
return super()._check_args(method, backend_kwargs, default, strictness)
def _make_sure_channels_exist(self, channels: List[int]):
super()._make_sure_channels_exist(channels)
def _format_param(self, value: Any) -> Any:
return super()._format_param(value)
def _log_command(self, name: str, **kwargs) -> None:
super()._log_command(name, **kwargs)
async def pick_up_tips(
self,
tip_spots: List[TipSpot],
use_channels: Optional[List[int]] = None,
offsets: Optional[List[Coordinate]] = None,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
async def drop_tips(
self,
tip_spots: Sequence[Union[TipSpot, Trash]],
use_channels: Optional[List[int]] = None,
offsets: Optional[List[Coordinate]] = None,
allow_nonzero_volume: bool = False,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.drop_tips(
tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs
)
return await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs)
async def return_tips(
self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs
):
if self._simulator:
await self._simulate_handler.return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
return await super().return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
async def discard_tips(
self,
use_channels: Optional[List[int]] = None,
allow_nonzero_volume: bool = True,
offsets: Optional[List[Coordinate]] = None,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
return await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
def _check_containers(self, resources: Sequence[Resource]):
super()._check_containers(resources)
async def aspirate(
self,
resources: Sequence[Container],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None,
liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
return await super().aspirate(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
async def dispense(
self,
resources: Sequence[Container],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None,
liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Literal["wide", "tight", "custom"] = "wide",
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
return await super().dispense(
resources,
vols,
use_channels,
flow_rates,
offsets,
liquid_height,
blow_out_air_volume,
spread,
**backend_kwargs,
)
async def transfer(
self,
source: Well,
targets: List[Well],
source_vol: Optional[float] = None,
ratios: Optional[List[float]] = None,
target_vols: Optional[List[float]] = None,
aspiration_flow_rate: Optional[float] = None,
dispense_flow_rates: Optional[List[Optional[float]]] = None,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.transfer(
source,
targets,
source_vol,
ratios,
target_vols,
aspiration_flow_rate,
dispense_flow_rates,
**backend_kwargs,
)
return await super().transfer(
source,
targets,
source_vol,
ratios,
target_vols,
aspiration_flow_rate,
dispense_flow_rates,
**backend_kwargs,
)
def use_channels(self, channels: List[int]):
if self._simulator:
self._simulate_handler.use_channels(channels)
return super().use_channels(channels)
async def pick_up_tips96(self, tip_rack: TipRack, offset: Coordinate = Coordinate.zero(), **backend_kwargs):
if self._simulator:
await self._simulate_handler.pick_up_tips96(tip_rack, offset, **backend_kwargs)
return await super().pick_up_tips96(tip_rack, offset, **backend_kwargs)
async def drop_tips96(
self,
resource: Union[TipRack, Trash],
offset: Coordinate = Coordinate.zero(),
allow_nonzero_volume: bool = False,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
return await super().drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]:
return super()._get_96_head_origin_tip_rack()
async def return_tips96(self, allow_nonzero_volume: bool = False, **backend_kwargs):
if self._simulator:
await self._simulate_handler.return_tips96(allow_nonzero_volume, **backend_kwargs)
return await super().return_tips96(allow_nonzero_volume, **backend_kwargs)
async def discard_tips96(self, allow_nonzero_volume: bool = True, **backend_kwargs):
if self._simulator:
await self._simulate_handler.discard_tips96(allow_nonzero_volume, **backend_kwargs)
return await super().discard_tips96(allow_nonzero_volume, **backend_kwargs)
async def aspirate96(
self,
resource: Union[Plate, Container, List[Well]],
volume: float,
offset: Coordinate = Coordinate.zero(),
flow_rate: Optional[float] = None,
blow_out_air_volume: Optional[float] = None,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.aspirate96(
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
)
return await super().aspirate96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
async def dispense96(
self,
resource: Union[Plate, Container, List[Well]],
volume: float,
offset: Coordinate = Coordinate.zero(),
flow_rate: Optional[float] = None,
blow_out_air_volume: Optional[float] = None,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.dispense96(
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
)
return await super().dispense96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
async def stamp(
self,
source: Plate,
target: Plate,
volume: float,
aspiration_flow_rate: Optional[float] = None,
dispense_flow_rate: Optional[float] = None,
):
if self._simulator:
await self._simulate_handler.stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
return await super().stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
async def pick_up_resource(
self,
resource: Resource,
offset: Coordinate = Coordinate.zero(),
pickup_distance_from_top: float = 0,
direction: GripDirection = GripDirection.FRONT,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.pick_up_resource(
resource, offset, pickup_distance_from_top, direction, **backend_kwargs
)
return await super().pick_up_resource(resource, offset, pickup_distance_from_top, direction, **backend_kwargs)
async def move_picked_up_resource(
self,
to: Coordinate,
offset: Coordinate = Coordinate.zero(),
direction: Optional[GripDirection] = None,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.move_picked_up_resource(to, offset, direction, **backend_kwargs)
return await super().move_picked_up_resource(to, offset, direction, **backend_kwargs)
async def drop_resource(
self,
destination: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
offset: Coordinate = Coordinate.zero(),
direction: GripDirection = GripDirection.FRONT,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.drop_resource(destination, offset, direction, **backend_kwargs)
return await super().drop_resource(destination, offset, direction, **backend_kwargs)
async def move_resource(
self,
resource: Resource,
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
intermediate_locations: Optional[List[Coordinate]] = None,
pickup_offset: Coordinate = Coordinate.zero(),
destination_offset: Coordinate = Coordinate.zero(),
pickup_distance_from_top: float = 0,
pickup_direction: GripDirection = GripDirection.FRONT,
drop_direction: GripDirection = GripDirection.FRONT,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.move_resource(
resource,
to,
intermediate_locations,
pickup_offset,
destination_offset,
pickup_distance_from_top,
pickup_direction,
drop_direction,
**backend_kwargs,
)
return await super().move_resource(
resource,
to,
intermediate_locations,
pickup_offset,
destination_offset,
pickup_distance_from_top,
pickup_direction,
drop_direction,
**backend_kwargs,
)
async def move_lid(
self,
lid: Lid,
to: Union[Plate, ResourceStack, Coordinate],
intermediate_locations: Optional[List[Coordinate]] = None,
pickup_offset: Coordinate = Coordinate.zero(),
destination_offset: Coordinate = Coordinate.zero(),
pickup_direction: GripDirection = GripDirection.FRONT,
drop_direction: GripDirection = GripDirection.FRONT,
pickup_distance_from_top: float = 5.7 - 3.33,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.move_lid(
lid,
to,
intermediate_locations,
pickup_offset,
destination_offset,
pickup_direction,
drop_direction,
pickup_distance_from_top,
**backend_kwargs,
)
return await super().move_lid(
lid,
to,
intermediate_locations,
pickup_offset,
destination_offset,
pickup_direction,
drop_direction,
pickup_distance_from_top,
**backend_kwargs,
)
async def move_plate(
self,
plate: Plate,
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
intermediate_locations: Optional[List[Coordinate]] = None,
pickup_offset: Coordinate = Coordinate.zero(),
destination_offset: Coordinate = Coordinate.zero(),
drop_direction: GripDirection = GripDirection.FRONT,
pickup_direction: GripDirection = GripDirection.FRONT,
pickup_distance_from_top: float = 13.2 - 3.33,
**backend_kwargs,
):
if self._simulator:
await self._simulate_handler.move_plate(
plate,
to,
intermediate_locations,
pickup_offset,
destination_offset,
drop_direction,
pickup_direction,
pickup_distance_from_top,
**backend_kwargs,
)
return await super().move_plate(
plate,
to,
intermediate_locations,
pickup_offset,
destination_offset,
drop_direction,
pickup_direction,
pickup_distance_from_top,
**backend_kwargs,
)
def serialize(self):
if self._simulator:
self._simulate_handler.serialize()
return super().serialize()
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False) -> LiquidHandler:
return super().deserialize(data, allow_marshal)
@classmethod
def load(cls, path: str) -> LiquidHandler:
return super().load(path)
async def prepare_for_manual_channel_operation(self, channel: int):
if self._simulator:
await self._simulate_handler.prepare_for_manual_channel_operation(channel)
return await super().prepare_for_manual_channel_operation(channel)
async def move_channel_x(self, channel: int, x: float):
if self._simulator:
await self._simulate_handler.move_channel_x(channel, x)
return await super().move_channel_x(channel, x)
async def move_channel_y(self, channel: int, y: float):
if self._simulator:
await self._simulate_handler.move_channel_y(channel, y)
return await super().move_channel_y(channel, y)
async def move_channel_z(self, channel: int, z: float):
if self._simulator:
await self._simulate_handler.move_channel_z(channel, z)
return await super().move_channel_z(channel, z)
def assign_child_resource(self, resource: Resource, location: Optional[Coordinate], reassign: bool = True):
if self._simulator:
self._simulate_handler.assign_child_resource(resource, location, reassign)
pass
async def probe_tip_presence_via_pickup(
self, tip_spots: List[TipSpot], use_channels: Optional[List[int]] = None
) -> Dict[str, bool]:
if self._simulator:
await self._simulate_handler.probe_tip_presence_via_pickup(tip_spots, use_channels)
return await super().probe_tip_presence_via_pickup(tip_spots, use_channels)
async def probe_tip_inventory(
self,
tip_spots: List[TipSpot],
probing_fn: Optional[TipPresenceProbingMethod] = None,
use_channels: Optional[List[int]] = None,
) -> Dict[str, bool]:
if self._simulator:
await self._simulate_handler.probe_tip_inventory(tip_spots, probing_fn, use_channels)
return await super().probe_tip_inventory(tip_spots, probing_fn, use_channels)
async def consolidate_tip_inventory(self, tip_racks: List[TipRack], use_channels: Optional[List[int]] = None):
if self._simulator:
await self._simulate_handler.consolidate_tip_inventory(tip_racks, use_channels)
return await super().consolidate_tip_inventory(tip_racks, use_channels)
class LiquidHandlerAbstract(LiquidHandlerMiddleware):
"""Extended LiquidHandler with additional operations."""
support_touch_tip = True
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool):
"""Initialize a LiquidHandler.
Args:
backend: Backend to use.
deck: Deck to use.
"""
self._simulator = simulator
super().__init__(backend, deck, simulator)
# ---------------------------------------------------------------
# REMOVE LIQUID --------------------------------------------------
@@ -48,43 +578,99 @@ class LiquidHandlerAbstract(LiquidHandler):
none_keys: List[str] = [],
):
"""A complete *remove* (aspirate → waste) operation."""
trash = self.deck.get_trash_area()
try:
if is_96_well:
pass # This mode is not verified
pass # This mode is not verified.
else:
if len(vols) != len(sources):
raise ValueError("Length of `vols` must match `sources`.")
for src, vol in zip(sources, vols):
await self.move_to(src, dis_to_top=top[0] if top else 0)
tip = next(self.current_tip)
# 首先应该对任务分组然后每次1个/8个进行操作处理
if len(use_channels) == 1:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.aspirate(
resources=[src],
vols=[vol],
use_channels=use_channels, # only aspirate96 used, default to None
flow_rates=[flow_rates[0]] if flow_rates else None,
offsets=[offsets[0]] if offsets else None,
liquid_height=[liquid_height[0]] if liquid_height else None,
blow_out_air_volume=blow_out_air_volume[0] if blow_out_air_volume else None,
spread=spread,
)
await self.custom_delay(seconds=delays[0] if delays else 0)
await self.dispense(
resources=waste_liquid,
vols=[vol],
use_channels=use_channels,
flow_rates=[flow_rates[1]] if flow_rates else None,
offsets=[offsets[1]] if offsets else None,
liquid_height=[liquid_height[1]] if liquid_height else None,
blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None,
spread=spread,
)
await self.discard_tips() # For now, each of tips is discarded after use
for _ in range(len(waste_liquid)):
await self.aspirate(
resources=sources,
vols=[vols[_]],
use_channels=use_channels,
flow_rates=[flow_rates[0]] if flow_rates else None,
offsets=[offsets[0]] if offsets else None,
liquid_height=[liquid_height[0]] if liquid_height else None,
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[waste_liquid[_]],
vols=[vols[_]],
use_channels=use_channels,
flow_rates=[flow_rates[1]] if flow_rates else None,
offsets=[offsets[1]] if offsets else None,
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None,
liquid_height=[liquid_height[1]] if liquid_height else None,
spread=spread,
)
await self.discard_tips()
elif len(use_channels) == 8:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
# 对于8个的情况需要判断此时任务是不是能被8通道移液站来成功处理
if len(sources) % 8 != 0:
raise ValueError(f"Length of `sources` {len(sources)} must be a multiple of 8 for 8-channel mode.")
# 8个8个来取任务序列
for i in range(0, len(sources), 8):
# 取出8个任务
current_targets = waste_liquid[i:i + 8]
current_reagent_sources = sources[i:i + 8]
current_asp_vols = vols[i:i + 8]
current_dis_vols = vols[i:i + 8]
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
await self.aspirate(
resources=current_reagent_sources,
vols=current_asp_vols,
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
liquid_height=current_asp_liquid_height,
blow_out_air_volume=current_asp_blow_out_air_volume,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=current_targets,
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
liquid_height=current_dis_liquid_height,
blow_out_air_volume=current_dis_blow_out_air_volume,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips()
except Exception as e:
raise RuntimeError(f"Liquid removal failed: {e}") from e
traceback.print_exc()
raise RuntimeError(f"Liquid addition failed: {e}") from e
# ---------------------------------------------------------------
# ADD LIQUID -----------------------------------------------------
@@ -119,7 +705,7 @@ class LiquidHandlerAbstract(LiquidHandler):
else:
if len(asp_vols) != len(targets):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
# 首先应该对任务分组然后每次1个/8个进行操作处理
if len(use_channels) == 1:
tip = []
@@ -163,7 +749,7 @@ class LiquidHandlerAbstract(LiquidHandler):
if delays is not None:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_])
await self.discard_tips()
elif len(use_channels) == 8:
tip = []
for _ in range(len(use_channels)):
@@ -173,7 +759,7 @@ class LiquidHandlerAbstract(LiquidHandler):
# 对于8个的情况需要判断此时任务是不是能被8通道移液站来成功处理
if len(targets) % 8 != 0:
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
# 8个8个来取任务序列
for i in range(0, len(targets), 8):
@@ -183,14 +769,14 @@ class LiquidHandlerAbstract(LiquidHandler):
current_asp_vols = asp_vols[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
current_dis_flow_rates = flow_rates[i+8 :i + 16] if flow_rates else [None] * 8
current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
await self.aspirate(
resources=current_reagent_sources,
vols=current_asp_vols,
@@ -226,7 +812,8 @@ class LiquidHandlerAbstract(LiquidHandler):
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
#await self.touch_tip(current_targets)
await self.touch_tip(current_targets)
await self.discard_tips()
except Exception as e:
traceback.print_exc()
@@ -281,7 +868,7 @@ class LiquidHandlerAbstract(LiquidHandler):
else:
if len(asp_vols) != len(targets):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
# 首先应该对任务分组然后每次1个/8个进行操作处理
if len(use_channels) == 1:
tip = []
@@ -336,7 +923,7 @@ class LiquidHandlerAbstract(LiquidHandler):
# 对于8个的情况需要判断此时任务是不是能被8通道移液站来成功处理
if len(targets) % 8 != 0:
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
# 8个8个来取任务序列
for i in range(0, len(targets), 8):
@@ -345,13 +932,13 @@ class LiquidHandlerAbstract(LiquidHandler):
current_reagent_sources = sources[i:i + 8]
current_asp_vols = asp_vols[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8]
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8
await self.aspirate(
@@ -364,7 +951,7 @@ class LiquidHandlerAbstract(LiquidHandler):
liquid_height=current_asp_liquid_height,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
@@ -390,8 +977,9 @@ class LiquidHandlerAbstract(LiquidHandler):
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips()
#await self.touch_tip(current_targets)
except Exception as e:
traceback.print_exc()
raise RuntimeError(f"Liquid addition failed: {e}") from e
@@ -417,7 +1005,11 @@ class LiquidHandlerAbstract(LiquidHandler):
print(f"Current time: {time.strftime('%H:%M:%S')}")
async def touch_tip(self, targets: Sequence[Container]):
"""Touch the tip to the side of the well."""
if not self.support_touch_tip:
return
await self.aspirate(
resources=[targets],
vols=[0],

View File

@@ -14,7 +14,7 @@ from pylabrobot.liquid_handling import (
SingleChannelDispense,
PickupTipRack,
DropTipRack,
MultiHeadAspirationPlate,
MultiHeadAspirationPlate, ChatterBoxBackend, LiquidHandlerChatterboxBackend,
)
from pylabrobot.liquid_handling.standard import (
MultiHeadAspirationContainer,
@@ -87,7 +87,31 @@ class PRCXI9300Container(Plate):
return data
class PRCXI9300Trash(Trash):
"""PRCXI 9300 的专用 Trash 类,继承自 Trash。
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
"""
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str):
super().__init__(name, size_x, size_y, size_z, category=category)
self._unilabos_state = {}
def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。"""
#super().load_state(state)
self._unilabos_state = state
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
data = super().serialize_state()
data.update(self._unilabos_state)
return data
class PRCXI9300Handler(LiquidHandlerAbstract):
support_touch_tip = False
@property
def reset_ok(self) -> bool:
"""检查设备是否已重置成功。"""
@@ -105,7 +129,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
)
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup, debug)
super().__init__(backend=self._unilabos_backend, deck=deck)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=True)
async def create_protocol(
self,
@@ -434,6 +458,35 @@ class PRCXI9300Backend(LiquidHandlerBackend):
"""Pick up tips from the specified resource."""
plate = ops[0].resource.parent.parent
deck = plate.parent
plate_index = deck.children.index(plate)
if deck.children[plate_index].name == "trash":
step = self.api_client.UnLoad(
"Left",
dosage=0,
plate_no=plate_index+1,
is_whole_plate=False,
hole_row=1,
hole_col=2,# 强制投放第二列了
blending_times=0,
balance_height=0,
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
hole_numbers="1,2,3,4,5,6,7,8",
)
self.steps_todo_list.append(step)
return
# 判断是不是个trash
# if all(isinstance(op.resource, Trash) for op in ops):
# print("All drop operations are for trash.")
# await self.discard_tips()
# return
if len(ops) != 8:
raise ValueError(f"PRCXI9300Backend drop_tips: Expected 8 pickups, got {len(ops)}")
@@ -1001,7 +1054,7 @@ if __name__ == "__main__":
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
}
})
plate6 = PRCXI9300Container(name="plateT6", size_x=50, size_y=50, size_z=10, category="plate")
plate6 = PRCXI9300Trash(name="trash", size_x=50, size_y=50, size_z=10, category="trash")
plate6.load_state({
"Material": {
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
@@ -1009,13 +1062,15 @@ if __name__ == "__main__":
})
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
tip_rack = tipone_96_tiprack_200ul("TipRack")
well_containers = corning_96_wellplate_360ul_flat("Plate")
# from pprint import pprint
# pprint(well_containers.children)
plate1.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
plate2.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
deck.assign_child_resource(plate3, location=Coordinate(0, 0, 0))
@@ -1027,10 +1082,10 @@ if __name__ == "__main__":
handler.set_tiprack([tip_rack]) # Set the tip rack for the handler
asyncio.run(handler.setup()) # Initialize the handler and setup the connection
asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection
# asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7]))
#asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.aspirate(well_containers.children[:8],[50]*8, [0,1,2,3,4,5,6,7]))
# asyncio.run(handler.dispense(well_containers.children[:8],[50]*8,[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
#asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7]))
# asyncio.run(handler.mix(well_containers.children[:8], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
#print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
asyncio.run(handler.add_liquid(
@@ -1039,8 +1094,8 @@ if __name__ == "__main__":
reagent_sources=well_containers.children[-16:],
targets=well_containers.children[:16],
use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
flow_rates=[None] * 16,
offsets=[Coordinate(0, 0, 0)] * 16,
flow_rates=[None] * 32,
offsets=[Coordinate(0, 0, 0)] * 32,
liquid_height=[None] * 16,
blow_out_air_volume=[None] * 16,
delays=None,
@@ -1049,6 +1104,17 @@ if __name__ == "__main__":
spread="wide",
))
# asyncio.run(handler.remove_liquid(
# vols=[100]*16,
# sources=well_containers.children[-16:],
# waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
# use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# flow_rates=[None] * 32,
# offsets=[Coordinate(0, 0, 0)] * 32,
# liquid_height=[None] * 32,
# blow_out_air_volume=[None] * 32,
# spread="wide",
# ))
# asyncio.run(handler.transfer_liquid(
# asp_vols=[100]*16,
# dis_vols=[100]*16,