mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-22 17:45:16 +00:00
Compare commits
4 Commits
d81297d699
...
f90be18926
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f90be18926 | ||
|
|
604d82140d | ||
|
|
9c4fdd8001 | ||
|
|
71f6deda6b |
@@ -1,17 +1,547 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import traceback
|
import traceback
|
||||||
from typing import List, Sequence, Optional, Literal, Union, Iterator
|
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from pylabrobot.liquid_handling import LiquidHandler
|
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
|
||||||
from pylabrobot.resources import Resource, TipRack, Container, Coordinate, Well
|
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
|
||||||
|
from pylabrobot.liquid_handling.standard import GripDirection
|
||||||
|
from pylabrobot.resources import (
|
||||||
|
Resource,
|
||||||
|
TipRack,
|
||||||
|
Container,
|
||||||
|
Coordinate,
|
||||||
|
Well,
|
||||||
|
Deck,
|
||||||
|
TipSpot,
|
||||||
|
Plate,
|
||||||
|
ResourceStack,
|
||||||
|
ResourceHolder,
|
||||||
|
Lid,
|
||||||
|
Trash,
|
||||||
|
Tip,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class LiquidHandlerAbstract(LiquidHandler):
|
class LiquidHandlerMiddleware(LiquidHandler):
|
||||||
|
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False):
|
||||||
|
self._simulator = simulator
|
||||||
|
if simulator:
|
||||||
|
self._simulate_backend = LiquidHandlerChatterboxBackend(8)
|
||||||
|
self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False)
|
||||||
|
super().__init__(backend, deck)
|
||||||
|
|
||||||
|
async def setup(self, **backend_kwargs):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.setup(**backend_kwargs)
|
||||||
|
return await super().setup(**backend_kwargs)
|
||||||
|
|
||||||
|
def serialize_state(self) -> Dict[str, Any]:
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.serialize_state()
|
||||||
|
return super().serialize_state()
|
||||||
|
|
||||||
|
def load_state(self, state: Dict[str, Any]):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.load_state(state)
|
||||||
|
super().load_state(state)
|
||||||
|
|
||||||
|
def update_head_state(self, state: Dict[int, Optional[Tip]]):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.update_head_state(state)
|
||||||
|
super().update_head_state(state)
|
||||||
|
|
||||||
|
def clear_head_state(self):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.clear_head_state()
|
||||||
|
super().clear_head_state()
|
||||||
|
|
||||||
|
def _run_async_in_thread(self, func, *args, **kwargs):
|
||||||
|
super()._run_async_in_thread(func, *args, **kwargs)
|
||||||
|
|
||||||
|
def _send_assigned_resource_to_backend(self, resource: Resource):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler._send_assigned_resource_to_backend(resource)
|
||||||
|
super()._send_assigned_resource_to_backend(resource)
|
||||||
|
|
||||||
|
def _send_unassigned_resource_to_backend(self, resource: Resource):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler._send_unassigned_resource_to_backend(resource)
|
||||||
|
super()._send_unassigned_resource_to_backend(resource)
|
||||||
|
|
||||||
|
def summary(self):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.summary()
|
||||||
|
super().summary()
|
||||||
|
|
||||||
|
def _assert_positions_unique(self, positions: List[str]):
|
||||||
|
super()._assert_positions_unique(positions)
|
||||||
|
|
||||||
|
def _assert_resources_exist(self, resources: Sequence[Resource]):
|
||||||
|
super()._assert_resources_exist(resources)
|
||||||
|
|
||||||
|
def _check_args(
|
||||||
|
self, method: Callable, backend_kwargs: Dict[str, Any], default: Set[str], strictness: Strictness
|
||||||
|
) -> Set[str]:
|
||||||
|
return super()._check_args(method, backend_kwargs, default, strictness)
|
||||||
|
|
||||||
|
def _make_sure_channels_exist(self, channels: List[int]):
|
||||||
|
super()._make_sure_channels_exist(channels)
|
||||||
|
|
||||||
|
def _format_param(self, value: Any) -> Any:
|
||||||
|
return super()._format_param(value)
|
||||||
|
|
||||||
|
def _log_command(self, name: str, **kwargs) -> None:
|
||||||
|
super()._log_command(name, **kwargs)
|
||||||
|
|
||||||
|
async def pick_up_tips(
|
||||||
|
self,
|
||||||
|
tip_spots: List[TipSpot],
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
offsets: Optional[List[Coordinate]] = None,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
|
||||||
|
return await super().pick_up_tips(tip_spots, use_channels, offsets, **backend_kwargs)
|
||||||
|
|
||||||
|
async def drop_tips(
|
||||||
|
self,
|
||||||
|
tip_spots: Sequence[Union[TipSpot, Trash]],
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
offsets: Optional[List[Coordinate]] = None,
|
||||||
|
allow_nonzero_volume: bool = False,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.drop_tips(
|
||||||
|
tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs
|
||||||
|
)
|
||||||
|
return await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
async def return_tips(
|
||||||
|
self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
|
||||||
|
return await super().return_tips(use_channels, allow_nonzero_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
async def discard_tips(
|
||||||
|
self,
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
allow_nonzero_volume: bool = True,
|
||||||
|
offsets: Optional[List[Coordinate]] = None,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
|
||||||
|
return await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs)
|
||||||
|
|
||||||
|
def _check_containers(self, resources: Sequence[Resource]):
|
||||||
|
super()._check_containers(resources)
|
||||||
|
|
||||||
|
async def aspirate(
|
||||||
|
self,
|
||||||
|
resources: Sequence[Container],
|
||||||
|
vols: List[float],
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
flow_rates: Optional[List[Optional[float]]] = None,
|
||||||
|
offsets: Optional[List[Coordinate]] = None,
|
||||||
|
liquid_height: Optional[List[Optional[float]]] = None,
|
||||||
|
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||||||
|
spread: Literal["wide", "tight", "custom"] = "wide",
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.aspirate(
|
||||||
|
resources,
|
||||||
|
vols,
|
||||||
|
use_channels,
|
||||||
|
flow_rates,
|
||||||
|
offsets,
|
||||||
|
liquid_height,
|
||||||
|
blow_out_air_volume,
|
||||||
|
spread,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
return await super().aspirate(
|
||||||
|
resources,
|
||||||
|
vols,
|
||||||
|
use_channels,
|
||||||
|
flow_rates,
|
||||||
|
offsets,
|
||||||
|
liquid_height,
|
||||||
|
blow_out_air_volume,
|
||||||
|
spread,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def dispense(
|
||||||
|
self,
|
||||||
|
resources: Sequence[Container],
|
||||||
|
vols: List[float],
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
flow_rates: Optional[List[Optional[float]]] = None,
|
||||||
|
offsets: Optional[List[Coordinate]] = None,
|
||||||
|
liquid_height: Optional[List[Optional[float]]] = None,
|
||||||
|
blow_out_air_volume: Optional[List[Optional[float]]] = None,
|
||||||
|
spread: Literal["wide", "tight", "custom"] = "wide",
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.dispense(
|
||||||
|
resources,
|
||||||
|
vols,
|
||||||
|
use_channels,
|
||||||
|
flow_rates,
|
||||||
|
offsets,
|
||||||
|
liquid_height,
|
||||||
|
blow_out_air_volume,
|
||||||
|
spread,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
return await super().dispense(
|
||||||
|
resources,
|
||||||
|
vols,
|
||||||
|
use_channels,
|
||||||
|
flow_rates,
|
||||||
|
offsets,
|
||||||
|
liquid_height,
|
||||||
|
blow_out_air_volume,
|
||||||
|
spread,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def transfer(
|
||||||
|
self,
|
||||||
|
source: Well,
|
||||||
|
targets: List[Well],
|
||||||
|
source_vol: Optional[float] = None,
|
||||||
|
ratios: Optional[List[float]] = None,
|
||||||
|
target_vols: Optional[List[float]] = None,
|
||||||
|
aspiration_flow_rate: Optional[float] = None,
|
||||||
|
dispense_flow_rates: Optional[List[Optional[float]]] = None,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.transfer(
|
||||||
|
source,
|
||||||
|
targets,
|
||||||
|
source_vol,
|
||||||
|
ratios,
|
||||||
|
target_vols,
|
||||||
|
aspiration_flow_rate,
|
||||||
|
dispense_flow_rates,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
return await super().transfer(
|
||||||
|
source,
|
||||||
|
targets,
|
||||||
|
source_vol,
|
||||||
|
ratios,
|
||||||
|
target_vols,
|
||||||
|
aspiration_flow_rate,
|
||||||
|
dispense_flow_rates,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
def use_channels(self, channels: List[int]):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.use_channels(channels)
|
||||||
|
return super().use_channels(channels)
|
||||||
|
|
||||||
|
async def pick_up_tips96(self, tip_rack: TipRack, offset: Coordinate = Coordinate.zero(), **backend_kwargs):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.pick_up_tips96(tip_rack, offset, **backend_kwargs)
|
||||||
|
return await super().pick_up_tips96(tip_rack, offset, **backend_kwargs)
|
||||||
|
|
||||||
|
async def drop_tips96(
|
||||||
|
self,
|
||||||
|
resource: Union[TipRack, Trash],
|
||||||
|
offset: Coordinate = Coordinate.zero(),
|
||||||
|
allow_nonzero_volume: bool = False,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
|
||||||
|
return await super().drop_tips96(resource, offset, allow_nonzero_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]:
|
||||||
|
return super()._get_96_head_origin_tip_rack()
|
||||||
|
|
||||||
|
async def return_tips96(self, allow_nonzero_volume: bool = False, **backend_kwargs):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.return_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||||
|
return await super().return_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
async def discard_tips96(self, allow_nonzero_volume: bool = True, **backend_kwargs):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.discard_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||||
|
return await super().discard_tips96(allow_nonzero_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
async def aspirate96(
|
||||||
|
self,
|
||||||
|
resource: Union[Plate, Container, List[Well]],
|
||||||
|
volume: float,
|
||||||
|
offset: Coordinate = Coordinate.zero(),
|
||||||
|
flow_rate: Optional[float] = None,
|
||||||
|
blow_out_air_volume: Optional[float] = None,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.aspirate96(
|
||||||
|
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
|
||||||
|
)
|
||||||
|
return await super().aspirate96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
async def dispense96(
|
||||||
|
self,
|
||||||
|
resource: Union[Plate, Container, List[Well]],
|
||||||
|
volume: float,
|
||||||
|
offset: Coordinate = Coordinate.zero(),
|
||||||
|
flow_rate: Optional[float] = None,
|
||||||
|
blow_out_air_volume: Optional[float] = None,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.dispense96(
|
||||||
|
resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs
|
||||||
|
)
|
||||||
|
return await super().dispense96(resource, volume, offset, flow_rate, blow_out_air_volume, **backend_kwargs)
|
||||||
|
|
||||||
|
async def stamp(
|
||||||
|
self,
|
||||||
|
source: Plate,
|
||||||
|
target: Plate,
|
||||||
|
volume: float,
|
||||||
|
aspiration_flow_rate: Optional[float] = None,
|
||||||
|
dispense_flow_rate: Optional[float] = None,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
|
||||||
|
return await super().stamp(source, target, volume, aspiration_flow_rate, dispense_flow_rate)
|
||||||
|
|
||||||
|
async def pick_up_resource(
|
||||||
|
self,
|
||||||
|
resource: Resource,
|
||||||
|
offset: Coordinate = Coordinate.zero(),
|
||||||
|
pickup_distance_from_top: float = 0,
|
||||||
|
direction: GripDirection = GripDirection.FRONT,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.pick_up_resource(
|
||||||
|
resource, offset, pickup_distance_from_top, direction, **backend_kwargs
|
||||||
|
)
|
||||||
|
return await super().pick_up_resource(resource, offset, pickup_distance_from_top, direction, **backend_kwargs)
|
||||||
|
|
||||||
|
async def move_picked_up_resource(
|
||||||
|
self,
|
||||||
|
to: Coordinate,
|
||||||
|
offset: Coordinate = Coordinate.zero(),
|
||||||
|
direction: Optional[GripDirection] = None,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_picked_up_resource(to, offset, direction, **backend_kwargs)
|
||||||
|
return await super().move_picked_up_resource(to, offset, direction, **backend_kwargs)
|
||||||
|
|
||||||
|
async def drop_resource(
|
||||||
|
self,
|
||||||
|
destination: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||||||
|
offset: Coordinate = Coordinate.zero(),
|
||||||
|
direction: GripDirection = GripDirection.FRONT,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.drop_resource(destination, offset, direction, **backend_kwargs)
|
||||||
|
return await super().drop_resource(destination, offset, direction, **backend_kwargs)
|
||||||
|
|
||||||
|
async def move_resource(
|
||||||
|
self,
|
||||||
|
resource: Resource,
|
||||||
|
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||||||
|
intermediate_locations: Optional[List[Coordinate]] = None,
|
||||||
|
pickup_offset: Coordinate = Coordinate.zero(),
|
||||||
|
destination_offset: Coordinate = Coordinate.zero(),
|
||||||
|
pickup_distance_from_top: float = 0,
|
||||||
|
pickup_direction: GripDirection = GripDirection.FRONT,
|
||||||
|
drop_direction: GripDirection = GripDirection.FRONT,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_resource(
|
||||||
|
resource,
|
||||||
|
to,
|
||||||
|
intermediate_locations,
|
||||||
|
pickup_offset,
|
||||||
|
destination_offset,
|
||||||
|
pickup_distance_from_top,
|
||||||
|
pickup_direction,
|
||||||
|
drop_direction,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
return await super().move_resource(
|
||||||
|
resource,
|
||||||
|
to,
|
||||||
|
intermediate_locations,
|
||||||
|
pickup_offset,
|
||||||
|
destination_offset,
|
||||||
|
pickup_distance_from_top,
|
||||||
|
pickup_direction,
|
||||||
|
drop_direction,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def move_lid(
|
||||||
|
self,
|
||||||
|
lid: Lid,
|
||||||
|
to: Union[Plate, ResourceStack, Coordinate],
|
||||||
|
intermediate_locations: Optional[List[Coordinate]] = None,
|
||||||
|
pickup_offset: Coordinate = Coordinate.zero(),
|
||||||
|
destination_offset: Coordinate = Coordinate.zero(),
|
||||||
|
pickup_direction: GripDirection = GripDirection.FRONT,
|
||||||
|
drop_direction: GripDirection = GripDirection.FRONT,
|
||||||
|
pickup_distance_from_top: float = 5.7 - 3.33,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_lid(
|
||||||
|
lid,
|
||||||
|
to,
|
||||||
|
intermediate_locations,
|
||||||
|
pickup_offset,
|
||||||
|
destination_offset,
|
||||||
|
pickup_direction,
|
||||||
|
drop_direction,
|
||||||
|
pickup_distance_from_top,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
return await super().move_lid(
|
||||||
|
lid,
|
||||||
|
to,
|
||||||
|
intermediate_locations,
|
||||||
|
pickup_offset,
|
||||||
|
destination_offset,
|
||||||
|
pickup_direction,
|
||||||
|
drop_direction,
|
||||||
|
pickup_distance_from_top,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def move_plate(
|
||||||
|
self,
|
||||||
|
plate: Plate,
|
||||||
|
to: Union[ResourceStack, ResourceHolder, Resource, Coordinate],
|
||||||
|
intermediate_locations: Optional[List[Coordinate]] = None,
|
||||||
|
pickup_offset: Coordinate = Coordinate.zero(),
|
||||||
|
destination_offset: Coordinate = Coordinate.zero(),
|
||||||
|
drop_direction: GripDirection = GripDirection.FRONT,
|
||||||
|
pickup_direction: GripDirection = GripDirection.FRONT,
|
||||||
|
pickup_distance_from_top: float = 13.2 - 3.33,
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_plate(
|
||||||
|
plate,
|
||||||
|
to,
|
||||||
|
intermediate_locations,
|
||||||
|
pickup_offset,
|
||||||
|
destination_offset,
|
||||||
|
drop_direction,
|
||||||
|
pickup_direction,
|
||||||
|
pickup_distance_from_top,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
return await super().move_plate(
|
||||||
|
plate,
|
||||||
|
to,
|
||||||
|
intermediate_locations,
|
||||||
|
pickup_offset,
|
||||||
|
destination_offset,
|
||||||
|
drop_direction,
|
||||||
|
pickup_direction,
|
||||||
|
pickup_distance_from_top,
|
||||||
|
**backend_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.serialize()
|
||||||
|
return super().serialize()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def deserialize(cls, data: dict, allow_marshal: bool = False) -> LiquidHandler:
|
||||||
|
return super().deserialize(data, allow_marshal)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls, path: str) -> LiquidHandler:
|
||||||
|
return super().load(path)
|
||||||
|
|
||||||
|
async def prepare_for_manual_channel_operation(self, channel: int):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.prepare_for_manual_channel_operation(channel)
|
||||||
|
return await super().prepare_for_manual_channel_operation(channel)
|
||||||
|
|
||||||
|
async def move_channel_x(self, channel: int, x: float):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_channel_x(channel, x)
|
||||||
|
return await super().move_channel_x(channel, x)
|
||||||
|
|
||||||
|
async def move_channel_y(self, channel: int, y: float):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_channel_y(channel, y)
|
||||||
|
return await super().move_channel_y(channel, y)
|
||||||
|
|
||||||
|
async def move_channel_z(self, channel: int, z: float):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.move_channel_z(channel, z)
|
||||||
|
return await super().move_channel_z(channel, z)
|
||||||
|
|
||||||
|
def assign_child_resource(self, resource: Resource, location: Optional[Coordinate], reassign: bool = True):
|
||||||
|
if self._simulator:
|
||||||
|
self._simulate_handler.assign_child_resource(resource, location, reassign)
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def probe_tip_presence_via_pickup(
|
||||||
|
self, tip_spots: List[TipSpot], use_channels: Optional[List[int]] = None
|
||||||
|
) -> Dict[str, bool]:
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.probe_tip_presence_via_pickup(tip_spots, use_channels)
|
||||||
|
return await super().probe_tip_presence_via_pickup(tip_spots, use_channels)
|
||||||
|
|
||||||
|
async def probe_tip_inventory(
|
||||||
|
self,
|
||||||
|
tip_spots: List[TipSpot],
|
||||||
|
probing_fn: Optional[TipPresenceProbingMethod] = None,
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
) -> Dict[str, bool]:
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.probe_tip_inventory(tip_spots, probing_fn, use_channels)
|
||||||
|
return await super().probe_tip_inventory(tip_spots, probing_fn, use_channels)
|
||||||
|
|
||||||
|
async def consolidate_tip_inventory(self, tip_racks: List[TipRack], use_channels: Optional[List[int]] = None):
|
||||||
|
if self._simulator:
|
||||||
|
await self._simulate_handler.consolidate_tip_inventory(tip_racks, use_channels)
|
||||||
|
return await super().consolidate_tip_inventory(tip_racks, use_channels)
|
||||||
|
|
||||||
|
|
||||||
|
class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
||||||
"""Extended LiquidHandler with additional operations."""
|
"""Extended LiquidHandler with additional operations."""
|
||||||
|
support_touch_tip = True
|
||||||
|
|
||||||
|
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool):
|
||||||
|
"""Initialize a LiquidHandler.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
backend: Backend to use.
|
||||||
|
deck: Deck to use.
|
||||||
|
"""
|
||||||
|
self._simulator = simulator
|
||||||
|
super().__init__(backend, deck, simulator)
|
||||||
|
|
||||||
# ---------------------------------------------------------------
|
# ---------------------------------------------------------------
|
||||||
# REMOVE LIQUID --------------------------------------------------
|
# REMOVE LIQUID --------------------------------------------------
|
||||||
@@ -48,43 +578,99 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
none_keys: List[str] = [],
|
none_keys: List[str] = [],
|
||||||
):
|
):
|
||||||
"""A complete *remove* (aspirate → waste) operation."""
|
"""A complete *remove* (aspirate → waste) operation."""
|
||||||
trash = self.deck.get_trash_area()
|
|
||||||
try:
|
try:
|
||||||
if is_96_well:
|
if is_96_well:
|
||||||
pass # This mode is not verified
|
pass # This mode is not verified.
|
||||||
else:
|
else:
|
||||||
if len(vols) != len(sources):
|
# 首先应该对任务分组,然后每次1个/8个进行操作处理
|
||||||
raise ValueError("Length of `vols` must match `sources`.")
|
if len(use_channels) == 1:
|
||||||
|
tip = []
|
||||||
for src, vol in zip(sources, vols):
|
for _ in range(len(use_channels)):
|
||||||
await self.move_to(src, dis_to_top=top[0] if top else 0)
|
tip.extend(next(self.current_tip))
|
||||||
tip = next(self.current_tip)
|
|
||||||
await self.pick_up_tips(tip)
|
await self.pick_up_tips(tip)
|
||||||
|
|
||||||
|
for _ in range(len(waste_liquid)):
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=[src],
|
resources=sources,
|
||||||
vols=[vol],
|
vols=[vols[_]],
|
||||||
use_channels=use_channels, # only aspirate96 used, default to None
|
use_channels=use_channels,
|
||||||
flow_rates=[flow_rates[0]] if flow_rates else None,
|
flow_rates=[flow_rates[0]] if flow_rates else None,
|
||||||
offsets=[offsets[0]] if offsets else None,
|
offsets=[offsets[0]] if offsets else None,
|
||||||
liquid_height=[liquid_height[0]] if liquid_height else None,
|
liquid_height=[liquid_height[0]] if liquid_height else None,
|
||||||
blow_out_air_volume=blow_out_air_volume[0] if blow_out_air_volume else None,
|
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
|
||||||
spread=spread,
|
spread=spread,
|
||||||
)
|
)
|
||||||
await self.custom_delay(seconds=delays[0] if delays else 0)
|
if delays is not None:
|
||||||
|
await self.custom_delay(seconds=delays[0])
|
||||||
await self.dispense(
|
await self.dispense(
|
||||||
resources=waste_liquid,
|
resources=[waste_liquid[_]],
|
||||||
vols=[vol],
|
vols=[vols[_]],
|
||||||
use_channels=use_channels,
|
use_channels=use_channels,
|
||||||
flow_rates=[flow_rates[1]] if flow_rates else None,
|
flow_rates=[flow_rates[1]] if flow_rates else None,
|
||||||
offsets=[offsets[1]] if offsets else None,
|
offsets=[offsets[1]] if offsets else None,
|
||||||
|
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None,
|
||||||
liquid_height=[liquid_height[1]] if liquid_height else None,
|
liquid_height=[liquid_height[1]] if liquid_height else None,
|
||||||
blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None,
|
|
||||||
spread=spread,
|
spread=spread,
|
||||||
)
|
)
|
||||||
await self.discard_tips() # For now, each of tips is discarded after use
|
await self.discard_tips()
|
||||||
|
elif len(use_channels) == 8:
|
||||||
|
tip = []
|
||||||
|
for _ in range(len(use_channels)):
|
||||||
|
tip.extend(next(self.current_tip))
|
||||||
|
await self.pick_up_tips(tip)
|
||||||
|
|
||||||
|
# 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理
|
||||||
|
if len(sources) % 8 != 0:
|
||||||
|
raise ValueError(f"Length of `sources` {len(sources)} must be a multiple of 8 for 8-channel mode.")
|
||||||
|
|
||||||
|
# 8个8个来取任务序列
|
||||||
|
|
||||||
|
for i in range(0, len(sources), 8):
|
||||||
|
# 取出8个任务
|
||||||
|
current_targets = waste_liquid[i:i + 8]
|
||||||
|
current_reagent_sources = sources[i:i + 8]
|
||||||
|
current_asp_vols = vols[i:i + 8]
|
||||||
|
current_dis_vols = vols[i:i + 8]
|
||||||
|
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
|
||||||
|
current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8
|
||||||
|
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
||||||
|
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
|
||||||
|
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
||||||
|
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
|
||||||
|
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||||
|
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
|
||||||
|
|
||||||
|
await self.aspirate(
|
||||||
|
resources=current_reagent_sources,
|
||||||
|
vols=current_asp_vols,
|
||||||
|
use_channels=use_channels,
|
||||||
|
flow_rates=current_asp_flow_rates,
|
||||||
|
offsets=current_asp_offset,
|
||||||
|
liquid_height=current_asp_liquid_height,
|
||||||
|
blow_out_air_volume=current_asp_blow_out_air_volume,
|
||||||
|
spread=spread,
|
||||||
|
)
|
||||||
|
if delays is not None:
|
||||||
|
await self.custom_delay(seconds=delays[0])
|
||||||
|
await self.dispense(
|
||||||
|
resources=current_targets,
|
||||||
|
vols=current_dis_vols,
|
||||||
|
use_channels=use_channels,
|
||||||
|
flow_rates=current_dis_flow_rates,
|
||||||
|
offsets=current_dis_offset,
|
||||||
|
liquid_height=current_dis_liquid_height,
|
||||||
|
blow_out_air_volume=current_dis_blow_out_air_volume,
|
||||||
|
spread=spread,
|
||||||
|
)
|
||||||
|
if delays is not None:
|
||||||
|
await self.custom_delay(seconds=delays[1])
|
||||||
|
await self.touch_tip(current_targets)
|
||||||
|
await self.discard_tips()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise RuntimeError(f"Liquid removal failed: {e}") from e
|
traceback.print_exc()
|
||||||
|
raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||||||
|
|
||||||
# ---------------------------------------------------------------
|
# ---------------------------------------------------------------
|
||||||
# ADD LIQUID -----------------------------------------------------
|
# ADD LIQUID -----------------------------------------------------
|
||||||
@@ -163,7 +749,7 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
if delays is not None:
|
if delays is not None:
|
||||||
await self.custom_delay(seconds=delays[1])
|
await self.custom_delay(seconds=delays[1])
|
||||||
await self.touch_tip(targets[_])
|
await self.touch_tip(targets[_])
|
||||||
await self.discard_tips()
|
|
||||||
elif len(use_channels) == 8:
|
elif len(use_channels) == 8:
|
||||||
tip = []
|
tip = []
|
||||||
for _ in range(len(use_channels)):
|
for _ in range(len(use_channels)):
|
||||||
@@ -183,13 +769,13 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
current_asp_vols = asp_vols[i:i + 8]
|
current_asp_vols = asp_vols[i:i + 8]
|
||||||
current_dis_vols = dis_vols[i:i + 8]
|
current_dis_vols = dis_vols[i:i + 8]
|
||||||
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
|
current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8
|
||||||
current_dis_flow_rates = flow_rates[i+8 :i + 16] if flow_rates else [None] * 8
|
current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8
|
||||||
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
||||||
current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8
|
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
|
||||||
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
||||||
current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8
|
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
|
||||||
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||||
current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8
|
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
|
||||||
|
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=current_reagent_sources,
|
resources=current_reagent_sources,
|
||||||
@@ -226,7 +812,8 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
)
|
)
|
||||||
if delays is not None:
|
if delays is not None:
|
||||||
await self.custom_delay(seconds=delays[1])
|
await self.custom_delay(seconds=delays[1])
|
||||||
#await self.touch_tip(current_targets)
|
await self.touch_tip(current_targets)
|
||||||
|
await self.discard_tips()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
@@ -347,11 +934,11 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
current_dis_vols = dis_vols[i:i + 8]
|
current_dis_vols = dis_vols[i:i + 8]
|
||||||
current_asp_flow_rates = asp_flow_rates[i:i + 8]
|
current_asp_flow_rates = asp_flow_rates[i:i + 8]
|
||||||
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
|
||||||
current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8
|
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
|
||||||
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
|
||||||
current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8
|
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
|
||||||
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||||
current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8
|
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
|
||||||
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8
|
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8
|
||||||
|
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
@@ -390,8 +977,9 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
)
|
)
|
||||||
if delays is not None:
|
if delays is not None:
|
||||||
await self.custom_delay(seconds=delays[1])
|
await self.custom_delay(seconds=delays[1])
|
||||||
|
await self.touch_tip(current_targets)
|
||||||
|
await self.discard_tips()
|
||||||
|
|
||||||
#await self.touch_tip(current_targets)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
raise RuntimeError(f"Liquid addition failed: {e}") from e
|
raise RuntimeError(f"Liquid addition failed: {e}") from e
|
||||||
@@ -417,7 +1005,11 @@ class LiquidHandlerAbstract(LiquidHandler):
|
|||||||
print(f"Current time: {time.strftime('%H:%M:%S')}")
|
print(f"Current time: {time.strftime('%H:%M:%S')}")
|
||||||
|
|
||||||
async def touch_tip(self, targets: Sequence[Container]):
|
async def touch_tip(self, targets: Sequence[Container]):
|
||||||
|
|
||||||
"""Touch the tip to the side of the well."""
|
"""Touch the tip to the side of the well."""
|
||||||
|
|
||||||
|
if not self.support_touch_tip:
|
||||||
|
return
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=[targets],
|
resources=[targets],
|
||||||
vols=[0],
|
vols=[0],
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ from pylabrobot.liquid_handling import (
|
|||||||
SingleChannelDispense,
|
SingleChannelDispense,
|
||||||
PickupTipRack,
|
PickupTipRack,
|
||||||
DropTipRack,
|
DropTipRack,
|
||||||
MultiHeadAspirationPlate,
|
MultiHeadAspirationPlate, ChatterBoxBackend, LiquidHandlerChatterboxBackend,
|
||||||
)
|
)
|
||||||
from pylabrobot.liquid_handling.standard import (
|
from pylabrobot.liquid_handling.standard import (
|
||||||
MultiHeadAspirationContainer,
|
MultiHeadAspirationContainer,
|
||||||
@@ -87,7 +87,31 @@ class PRCXI9300Container(Plate):
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class PRCXI9300Trash(Trash):
|
||||||
|
"""PRCXI 9300 的专用 Trash 类,继承自 Trash。
|
||||||
|
|
||||||
|
该类定义了 PRCXI 9300 的工作台布局和槽位信息。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, category: str):
|
||||||
|
super().__init__(name, size_x, size_y, size_z, category=category)
|
||||||
|
self._unilabos_state = {}
|
||||||
|
|
||||||
|
def load_state(self, state: Dict[str, Any]) -> None:
|
||||||
|
"""从给定的状态加载工作台信息。"""
|
||||||
|
#super().load_state(state)
|
||||||
|
self._unilabos_state = state
|
||||||
|
|
||||||
|
def serialize_state(self) -> Dict[str, Dict[str, Any]]:
|
||||||
|
data = super().serialize_state()
|
||||||
|
data.update(self._unilabos_state)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PRCXI9300Handler(LiquidHandlerAbstract):
|
class PRCXI9300Handler(LiquidHandlerAbstract):
|
||||||
|
support_touch_tip = False
|
||||||
@property
|
@property
|
||||||
def reset_ok(self) -> bool:
|
def reset_ok(self) -> bool:
|
||||||
"""检查设备是否已重置成功。"""
|
"""检查设备是否已重置成功。"""
|
||||||
@@ -105,7 +129,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
|
|||||||
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
|
WorkTablets(Number=count, Code=f"T{count}", Material=child._unilabos_state["Material"])
|
||||||
)
|
)
|
||||||
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup, debug)
|
self._unilabos_backend = PRCXI9300Backend(tablets_info, host, port, timeout, setup, debug)
|
||||||
super().__init__(backend=self._unilabos_backend, deck=deck)
|
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=True)
|
||||||
|
|
||||||
async def create_protocol(
|
async def create_protocol(
|
||||||
self,
|
self,
|
||||||
@@ -434,6 +458,35 @@ class PRCXI9300Backend(LiquidHandlerBackend):
|
|||||||
|
|
||||||
"""Pick up tips from the specified resource."""
|
"""Pick up tips from the specified resource."""
|
||||||
|
|
||||||
|
|
||||||
|
plate = ops[0].resource.parent.parent
|
||||||
|
deck = plate.parent
|
||||||
|
plate_index = deck.children.index(plate)
|
||||||
|
|
||||||
|
if deck.children[plate_index].name == "trash":
|
||||||
|
step = self.api_client.UnLoad(
|
||||||
|
"Left",
|
||||||
|
dosage=0,
|
||||||
|
plate_no=plate_index+1,
|
||||||
|
is_whole_plate=False,
|
||||||
|
hole_row=1,
|
||||||
|
hole_col=2,# 强制投放第二列了
|
||||||
|
blending_times=0,
|
||||||
|
balance_height=0,
|
||||||
|
plate_or_hole=f"H{hole_col}-8,T{PlateNo}",
|
||||||
|
hole_numbers="1,2,3,4,5,6,7,8",
|
||||||
|
)
|
||||||
|
self.steps_todo_list.append(step)
|
||||||
|
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# 判断是不是个trash?
|
||||||
|
# if all(isinstance(op.resource, Trash) for op in ops):
|
||||||
|
# print("All drop operations are for trash.")
|
||||||
|
# await self.discard_tips()
|
||||||
|
# return
|
||||||
|
|
||||||
if len(ops) != 8:
|
if len(ops) != 8:
|
||||||
raise ValueError(f"PRCXI9300Backend drop_tips: Expected 8 pickups, got {len(ops)}")
|
raise ValueError(f"PRCXI9300Backend drop_tips: Expected 8 pickups, got {len(ops)}")
|
||||||
|
|
||||||
@@ -1001,7 +1054,7 @@ if __name__ == "__main__":
|
|||||||
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
|
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
plate6 = PRCXI9300Container(name="plateT6", size_x=50, size_y=50, size_z=10, category="plate")
|
plate6 = PRCXI9300Trash(name="trash", size_x=50, size_y=50, size_z=10, category="trash")
|
||||||
plate6.load_state({
|
plate6.load_state({
|
||||||
"Material": {
|
"Material": {
|
||||||
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
|
"uuid": "57b1e4711e9e4a32b529f3132fc5931f",
|
||||||
@@ -1010,12 +1063,14 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul
|
from pylabrobot.resources.opentrons.tip_racks import tipone_96_tiprack_200ul
|
||||||
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
|
from pylabrobot.resources.opentrons.plates import corning_96_wellplate_360ul_flat
|
||||||
|
|
||||||
tip_rack = tipone_96_tiprack_200ul("TipRack")
|
tip_rack = tipone_96_tiprack_200ul("TipRack")
|
||||||
well_containers = corning_96_wellplate_360ul_flat("Plate")
|
well_containers = corning_96_wellplate_360ul_flat("Plate")
|
||||||
# from pprint import pprint
|
# from pprint import pprint
|
||||||
# pprint(well_containers.children)
|
# pprint(well_containers.children)
|
||||||
plate1.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
|
plate1.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
|
||||||
plate2.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
|
plate2.assign_child_resource(well_containers, location=Coordinate(0, 0, 0))
|
||||||
|
|
||||||
deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
|
deck.assign_child_resource(plate1, location=Coordinate(0, 0, 0))
|
||||||
deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
|
deck.assign_child_resource(plate2, location=Coordinate(0, 0, 0))
|
||||||
deck.assign_child_resource(plate3, location=Coordinate(0, 0, 0))
|
deck.assign_child_resource(plate3, location=Coordinate(0, 0, 0))
|
||||||
@@ -1039,8 +1094,8 @@ if __name__ == "__main__":
|
|||||||
reagent_sources=well_containers.children[-16:],
|
reagent_sources=well_containers.children[-16:],
|
||||||
targets=well_containers.children[:16],
|
targets=well_containers.children[:16],
|
||||||
use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
|
use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
|
||||||
flow_rates=[None] * 16,
|
flow_rates=[None] * 32,
|
||||||
offsets=[Coordinate(0, 0, 0)] * 16,
|
offsets=[Coordinate(0, 0, 0)] * 32,
|
||||||
liquid_height=[None] * 16,
|
liquid_height=[None] * 16,
|
||||||
blow_out_air_volume=[None] * 16,
|
blow_out_air_volume=[None] * 16,
|
||||||
delays=None,
|
delays=None,
|
||||||
@@ -1049,6 +1104,17 @@ if __name__ == "__main__":
|
|||||||
spread="wide",
|
spread="wide",
|
||||||
))
|
))
|
||||||
|
|
||||||
|
# asyncio.run(handler.remove_liquid(
|
||||||
|
# vols=[100]*16,
|
||||||
|
# sources=well_containers.children[-16:],
|
||||||
|
# waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
|
||||||
|
# use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
|
||||||
|
# flow_rates=[None] * 32,
|
||||||
|
# offsets=[Coordinate(0, 0, 0)] * 32,
|
||||||
|
# liquid_height=[None] * 32,
|
||||||
|
# blow_out_air_volume=[None] * 32,
|
||||||
|
# spread="wide",
|
||||||
|
# ))
|
||||||
# asyncio.run(handler.transfer_liquid(
|
# asyncio.run(handler.transfer_liquid(
|
||||||
# asp_vols=[100]*16,
|
# asp_vols=[100]*16,
|
||||||
# dis_vols=[100]*16,
|
# dis_vols=[100]*16,
|
||||||
|
|||||||
Reference in New Issue
Block a user