diff --git a/tests/devices/liquid_handling/README.md b/tests/devices/liquid_handling/README.md new file mode 100644 index 0000000..daea168 --- /dev/null +++ b/tests/devices/liquid_handling/README.md @@ -0,0 +1,15 @@ +# Liquid handling 集成测试 + +`test_transfer_liquid.py` 现在会调用 PRCXI 的 RViz 仿真 backend,运行前请确保: + +1. 已安装包含 `pylabrobot`、`rclpy` 的运行环境; +2. 启动 ROS 依赖(`rviz` 可选,但是 `rviz_backend` 会创建 ROS 节点); +3. 在 shell 中设置 `UNILAB_SIM_TEST=1`,否则 pytest 会自动跳过这些慢速用例: + +```bash +export UNILAB_SIM_TEST=1 +pytest tests/devices/liquid_handling/test_transfer_liquid.py -m slow +``` + +如果只需验证逻辑层(不依赖仿真),可以直接运行 `tests/devices/liquid_handling/unit_test.py`,该文件使用 Fake backend,适合作为 CI 的快速测试。*** + diff --git a/tests/devices/liquid_handling/test_transfer_liquid.py b/tests/devices/liquid_handling/test_transfer_liquid.py index 9896aac..5712e27 100644 --- a/tests/devices/liquid_handling/test_transfer_liquid.py +++ b/tests/devices/liquid_handling/test_transfer_liquid.py @@ -1,505 +1,325 @@ +""" +PRCXI transfer_liquid 集成测试。 + +这些用例会启动 UniLiquidHandler RViz 仿真 backend,需要同时满足: +1. 安装 pylabrobot 依赖; +2. 设置环境变量 UNILAB_SIM_TEST=1; +3. 具备 ROS 运行环境(rviz_backend 会创建 ROS 节点)。 +""" import asyncio +import os from dataclasses import dataclass -from typing import Any, Iterable, List, Optional, Sequence, Tuple +from typing import List, Sequence import pytest from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract +from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Deck, PRCXI9300Trash +from unilabos.devices.liquid_handling.prcxi.prcxi_labware import ( + PRCXI_300ul_Tips, + PRCXI_BioER_96_wellplate, +) + +pytestmark = pytest.mark.slow + +try: + from pylabrobot.resources import Coordinate, Deck, Plate, TipRack, Well +except ImportError: # pragma: no cover - 测试环境缺少 pylabrobot 时直接跳过 + Coordinate = Deck = Plate = TipRack = Well = None # type: ignore[assignment] + PYLABROBOT_AVAILABLE = False +else: + PYLABROBOT_AVAILABLE = True + +SIM_ENV_VAR = "UNILAB_SIM_TEST" -@dataclass(frozen=True) -class DummyContainer: - name: str - - def __repr__(self) -> str: # pragma: no cover - return f"DummyContainer({self.name})" - - -@dataclass(frozen=True) -class DummyTipSpot: - name: str - - def __repr__(self) -> str: # pragma: no cover - return f"DummyTipSpot({self.name})" - - -def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]: - """Yield lists so code can safely call `tip.extend(next(self.current_tip))`.""" - for i in range(n): - yield [DummyTipSpot(f"tip_{i}")] - - -class FakeLiquidHandler(LiquidHandlerAbstract): - """不初始化真实 backend/deck;仅用来记录 transfer_liquid 内部调用序列。""" - - def __init__(self, channel_num: int = 8): - # 不调用 super().__init__,避免真实硬件/后端依赖 - self.channel_num = channel_num - self.support_touch_tip = True - self.current_tip = iter(make_tip_iter()) - self.calls: List[Tuple[str, Any]] = [] - - async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs): - self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels})) - - async def aspirate( - self, - resources: Sequence[Any], - vols: List[float], - use_channels: Optional[List[int]] = None, - flow_rates: Optional[List[Optional[float]]] = None, - offsets: Any = None, - liquid_height: Any = None, - blow_out_air_volume: Any = None, - spread: str = "wide", - **backend_kwargs, - ): - self.calls.append( - ( - "aspirate", - { - "resources": list(resources), - "vols": list(vols), - "use_channels": list(use_channels) if use_channels is not None else None, - "flow_rates": list(flow_rates) if flow_rates is not None else None, - "offsets": list(offsets) if offsets is not None else None, - "liquid_height": list(liquid_height) if liquid_height is not None else None, - "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, - }, - ) - ) - - async def dispense( - self, - resources: Sequence[Any], - vols: List[float], - use_channels: Optional[List[int]] = None, - flow_rates: Optional[List[Optional[float]]] = None, - offsets: Any = None, - liquid_height: Any = None, - blow_out_air_volume: Any = None, - spread: str = "wide", - **backend_kwargs, - ): - self.calls.append( - ( - "dispense", - { - "resources": list(resources), - "vols": list(vols), - "use_channels": list(use_channels) if use_channels is not None else None, - "flow_rates": list(flow_rates) if flow_rates is not None else None, - "offsets": list(offsets) if offsets is not None else None, - "liquid_height": list(liquid_height) if liquid_height is not None else None, - "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, - }, - ) - ) - - async def discard_tips(self, use_channels=None, *args, **kwargs): - # 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数) - self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None})) - - async def custom_delay(self, seconds=0, msg=None): - self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg})) - - async def touch_tip(self, targets): - # 原实现会访问 targets.get_size_x() 等;测试里只记录调用 - self.calls.append(("touch_tip", {"targets": targets})) - - async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None): - self.calls.append( - ( - "mix", - { - "targets": targets, - "mix_time": mix_time, - "mix_vol": mix_vol, - }, - ) - ) +@dataclass +class SimulationContext: + handler: LiquidHandlerAbstract + deck: Deck + tip_rack: TipRack + source_plate: Plate + target_plate: Plate + waste_plate: Plate + channel_num: int def run(coro): return asyncio.run(coro) -def test_one_to_one_single_channel_basic_calls(): - lh = FakeLiquidHandler(channel_num=1) - lh.current_tip = iter(make_tip_iter(64)) +def _ensure_unilabos_extra(well: Well) -> None: + if not hasattr(well, "unilabos_extra") or well.unilabos_extra is None: + well.unilabos_extra = {} # type: ignore[attr-defined] - sources = [DummyContainer(f"S{i}") for i in range(3)] - targets = [DummyContainer(f"T{i}") for i in range(3)] + +def _assign_sample_uuid(well: Well, value: str) -> None: + _ensure_unilabos_extra(well) + well.unilabos_extra["sample_uuid"] = value # type: ignore[attr-defined] + + +def _zero_coordinate() -> Coordinate: + if hasattr(Coordinate, "zero"): + return Coordinate.zero() + return Coordinate(0, 0, 0) + + +def _zero_offsets(count: int) -> List[Coordinate]: + return [_zero_coordinate() for _ in range(count)] + + +def _build_simulation_deck() -> tuple[PRCXI9300Deck, TipRack, Plate, Plate, Plate, PRCXI9300Trash]: + deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=542, size_y=374, size_z=50) + tip_rack = PRCXI_300ul_Tips("Tips") + source_plate = PRCXI_BioER_96_wellplate("SourcePlate") + target_plate = PRCXI_BioER_96_wellplate("TargetPlate") + waste_plate = PRCXI_BioER_96_wellplate("WastePlate") + trash = PRCXI9300Trash(name="trash", size_x=100, size_y=100, size_z=50) + deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0)) + deck.assign_child_resource(source_plate, location=Coordinate(150, 0, 0)) + deck.assign_child_resource(target_plate, location=Coordinate(300, 0, 0)) + deck.assign_child_resource(waste_plate, location=Coordinate(450, 0, 0)) + deck.assign_child_resource(trash, location=Coordinate(150, -120, 0)) + return deck, tip_rack, source_plate, target_plate, waste_plate, trash + + +def _stop_backend(handler: LiquidHandlerAbstract) -> None: + try: + run(handler.backend.stop()) + except Exception: # pragma: no cover - 如果 backend 已经停止 + pass + simulate_handler = getattr(handler, "_simulate_handler", None) + if simulate_handler is not None and getattr(simulate_handler, "backend", None) is not None: + try: + run(simulate_handler.backend.stop()) + except Exception: # pragma: no cover + pass + + +@pytest.fixture(params=[1, 8]) +def prcxi_simulation(request) -> SimulationContext: + if not PYLABROBOT_AVAILABLE: + pytest.skip("pylabrobot is required for PRCXI simulation tests.") + if os.environ.get(SIM_ENV_VAR) != "1": + pytest.skip(f"Set {SIM_ENV_VAR}=1 to run PRCXI simulation tests.") + + channel_num = request.param + deck, tip_rack, source_plate, target_plate, waste_plate, _trash = _build_simulation_deck() + backend_cfg = { + "type": "unilabos.devices.liquid_handling.rviz_backend.UniLiquidHandlerRvizBackend", + "channel_num": channel_num, + "total_height": 310, + "lh_device_id": f"pytest_prcxi_{channel_num}", + } + handler = LiquidHandlerAbstract( + backend=backend_cfg, + deck=deck, + simulator=True, + channel_num=channel_num, + total_height=310, + ) + run(handler.setup()) + handler.set_tiprack([tip_rack]) + handler.support_touch_tip = False + + context = SimulationContext( + handler=handler, + deck=deck, + tip_rack=tip_rack, + source_plate=source_plate, + target_plate=target_plate, + waste_plate=waste_plate, + channel_num=channel_num, + ) + + yield context + + _stop_backend(handler) + + +def _pick_wells(plate: Plate, start: int, count: int) -> List[Well]: + wells = plate.children[start : start + count] + for well in wells: + _ensure_unilabos_extra(well) + return wells + + +def _assert_samples_match(sources: Sequence[Well], targets: Sequence[Well]) -> None: + for src, tgt in zip(sources, targets): + src_uuid = getattr(src, "unilabos_extra", {}).get("sample_uuid") + tgt_uuid = getattr(tgt, "unilabos_extra", {}).get("sample_uuid") + assert tgt_uuid == src_uuid + + +def test_transfer_liquid_single_channel_one_to_one(prcxi_simulation: SimulationContext): + if prcxi_simulation.channel_num != 1: + pytest.skip("仅在单通道配置下运行") + + handler = prcxi_simulation.handler + sources = _pick_wells(prcxi_simulation.source_plate, start=0, count=3) + targets = _pick_wells(prcxi_simulation.target_plate, start=10, count=3) + for idx, src in enumerate(sources): + _assign_sample_uuid(src, f"single_{idx}") + offsets = _zero_offsets(max(len(sources), len(targets))) run( - lh.transfer_liquid( + handler.transfer_liquid( sources=sources, targets=targets, - tip_racks=[], + tip_racks=[prcxi_simulation.tip_rack], use_channels=[0], - asp_vols=[1, 2, 3], - dis_vols=[4, 5, 6], - mix_times=None, # 应该仍能执行(不 mix) + asp_vols=[5.0, 6.0, 7.0], + dis_vols=[10.0, 11.0, 12.0], + offsets=offsets, + mix_times=None, ) ) - assert [c[0] for c in lh.calls].count("pick_up_tips") == 3 - assert [c[0] for c in lh.calls].count("aspirate") == 3 - assert [c[0] for c in lh.calls].count("dispense") == 3 - assert [c[0] for c in lh.calls].count("discard_tips") == 3 - - # 每次 aspirate/dispense 都是单孔列表 - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - assert aspirates[0]["resources"] == [sources[0]] - assert aspirates[0]["vols"] == [1.0] - - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert dispenses[2]["resources"] == [targets[2]] - assert dispenses[2]["vols"] == [6.0] + _assert_samples_match(sources, targets) -def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate(): - lh = FakeLiquidHandler(channel_num=1) - lh.current_tip = iter(make_tip_iter(16)) +def test_transfer_liquid_single_channel_one_to_many(prcxi_simulation: SimulationContext): + if prcxi_simulation.channel_num != 1: + pytest.skip("仅在单通道配置下运行") - source = DummyContainer("S0") - target = DummyContainer("T0") + handler = prcxi_simulation.handler + source = _pick_wells(prcxi_simulation.source_plate, start=20, count=1)[0] + targets = _pick_wells(prcxi_simulation.target_plate, start=30, count=3) + _assign_sample_uuid(source, "one_to_many_source") + offsets = _zero_offsets(max(len(targets), 1)) run( - lh.transfer_liquid( + handler.transfer_liquid( sources=[source], - targets=[target], - tip_racks=[], + targets=targets, + tip_racks=[prcxi_simulation.tip_rack], use_channels=[0], - asp_vols=[5], - dis_vols=[5], - mix_stage="before", + asp_vols=60.0, + dis_vols=[15.0, 20.0, 25.0], + offsets=offsets, + mix_times=0, + ) + ) + + for target in targets: + assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "one_to_many_source" + + +def test_transfer_liquid_single_channel_many_to_one(prcxi_simulation: SimulationContext): + if prcxi_simulation.channel_num != 1: + pytest.skip("仅在单通道配置下运行") + + handler = prcxi_simulation.handler + sources = _pick_wells(prcxi_simulation.source_plate, start=40, count=3) + target = _pick_wells(prcxi_simulation.target_plate, start=50, count=1)[0] + for idx, src in enumerate(sources): + _assign_sample_uuid(src, f"many_to_one_{idx}") + offsets = _zero_offsets(max(len(sources), len([target]))) + + run( + handler.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[prcxi_simulation.tip_rack], + use_channels=[0], + asp_vols=[8.0, 9.0, 10.0], + dis_vols=1, + offsets=offsets, + mix_stage="after", mix_times=1, + mix_vol=5, + ) + ) + + assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "many_to_one_2" + + +def test_transfer_liquid_eight_channel_batches(prcxi_simulation: SimulationContext): + if prcxi_simulation.channel_num != 8: + pytest.skip("仅在八通道配置下运行") + + handler = prcxi_simulation.handler + sources = _pick_wells(prcxi_simulation.source_plate, start=0, count=8) + targets = _pick_wells(prcxi_simulation.target_plate, start=16, count=8) + for idx, src in enumerate(sources): + _assign_sample_uuid(src, f"batch_{idx}") + offsets = _zero_offsets(len(targets)) + + use_channels = list(range(8)) + asp_vols = [float(i + 1) * 2 for i in range(8)] + dis_vols = [float(i + 10) for i in range(8)] + + run( + handler.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[prcxi_simulation.tip_rack], + use_channels=use_channels, + asp_vols=asp_vols, + dis_vols=dis_vols, + offsets=offsets, + mix_times=0, + ) + ) + + _assert_samples_match(sources, targets) + + +@pytest.mark.parametrize("mix_stage", ["before", "after", "both"]) +def test_transfer_liquid_mix_stages(prcxi_simulation: SimulationContext, mix_stage: str): + if prcxi_simulation.channel_num != 1: + pytest.skip("仅在单通道配置下运行") + + handler = prcxi_simulation.handler + target = _pick_wells(prcxi_simulation.target_plate, start=70, count=1)[0] + sources = _pick_wells(prcxi_simulation.source_plate, start=80, count=2) + for idx, src in enumerate(sources): + _assign_sample_uuid(src, f"mix_stage_{mix_stage}_{idx}") + + run( + handler.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[prcxi_simulation.tip_rack], + use_channels=[0], + asp_vols=[4.0, 5.0], + dis_vols=1, + offsets=_zero_offsets(len(sources)), + mix_stage=mix_stage, + mix_times=2, mix_vol=3, ) ) - names = [name for name, _ in lh.calls] - assert names.count("mix") == 1 - assert names.index("mix") < names.index("aspirate") + # mix_stage 前后都应该保留最新源的 sample_uuid + assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == f"mix_stage_{mix_stage}_1" + if prcxi_simulation.channel_num != 8: + pytest.skip("仅在八通道配置下运行") + handler = prcxi_simulation.handler + sources = _pick_wells(prcxi_simulation.source_plate, start=0, count=8) + targets = _pick_wells(prcxi_simulation.target_plate, start=16, count=8) + for idx, src in enumerate(sources): + _assign_sample_uuid(src, f"batch_{idx}") + offsets = _zero_offsets(len(targets)) -def test_one_to_one_eight_channel_groups_by_8(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(256)) - - sources = [DummyContainer(f"S{i}") for i in range(16)] - targets = [DummyContainer(f"T{i}") for i in range(16)] - asp_vols = list(range(1, 17)) - dis_vols = list(range(101, 117)) + use_channels = list(range(8)) + asp_vols = [float(i + 1) * 2 for i in range(8)] + dis_vols = [float(i + 10) for i in range(8)] run( - lh.transfer_liquid( + handler.transfer_liquid( sources=sources, targets=targets, - tip_racks=[], - use_channels=list(range(8)), + tip_racks=[prcxi_simulation.tip_rack], + use_channels=use_channels, asp_vols=asp_vols, dis_vols=dis_vols, - mix_times=0, # 触发逻辑但不 mix - ) - ) - - # 16 个任务 -> 2 组,每组 8 通道一起做 - assert [c[0] for c in lh.calls].count("pick_up_tips") == 2 - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert len(aspirates) == 2 - assert len(dispenses) == 2 - - assert aspirates[0]["resources"] == sources[0:8] - assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]] - assert dispenses[1]["resources"] == targets[8:16] - assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]] - - -def test_one_to_one_eight_channel_requires_multiple_of_8_targets(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(64)) - - sources = [DummyContainer(f"S{i}") for i in range(9)] - targets = [DummyContainer(f"T{i}") for i in range(9)] - - with pytest.raises(ValueError, match="multiple of 8"): - run( - lh.transfer_liquid( - sources=sources, - targets=targets, - tip_racks=[], - use_channels=list(range(8)), - asp_vols=[1] * 9, - dis_vols=[1] * 9, - mix_times=0, - ) - ) - - -def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(512)) - - sources = [DummyContainer(f"S{i}") for i in range(16)] - targets = [DummyContainer(f"T{i}") for i in range(16)] - asp_vols = [i + 1 for i in range(16)] - dis_vols = [200 + i for i in range(16)] - asp_flow_rates = [0.1 * (i + 1) for i in range(16)] - dis_flow_rates = [0.2 * (i + 1) for i in range(16)] - offsets = [f"offset_{i}" for i in range(16)] - liquid_heights = [i * 0.5 for i in range(16)] - blow_out_air_volume = [i + 0.05 for i in range(16)] - - run( - lh.transfer_liquid( - sources=sources, - targets=targets, - tip_racks=[], - use_channels=list(range(8)), - asp_vols=asp_vols, - dis_vols=dis_vols, - asp_flow_rates=asp_flow_rates, - dis_flow_rates=dis_flow_rates, offsets=offsets, - liquid_height=liquid_heights, - blow_out_air_volume=blow_out_air_volume, - mix_times=0, - ) - ) - - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert len(aspirates) == len(dispenses) == 2 - - for batch_idx in range(2): - start = batch_idx * 8 - end = start + 8 - asp_call = aspirates[batch_idx] - dis_call = dispenses[batch_idx] - assert asp_call["resources"] == sources[start:end] - assert asp_call["flow_rates"] == asp_flow_rates[start:end] - assert asp_call["offsets"] == offsets[start:end] - assert asp_call["liquid_height"] == liquid_heights[start:end] - assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end] - assert dis_call["flow_rates"] == dis_flow_rates[start:end] - assert dis_call["offsets"] == offsets[start:end] - assert dis_call["liquid_height"] == liquid_heights[start:end] - assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end] - - -def test_one_to_one_eight_channel_handles_32_tasks_four_batches(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(1024)) - - sources = [DummyContainer(f"S{i}") for i in range(32)] - targets = [DummyContainer(f"T{i}") for i in range(32)] - asp_vols = [i + 1 for i in range(32)] - dis_vols = [300 + i for i in range(32)] - - run( - lh.transfer_liquid( - sources=sources, - targets=targets, - tip_racks=[], - use_channels=list(range(8)), - asp_vols=asp_vols, - dis_vols=dis_vols, - mix_times=0, - ) - ) - - pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"] - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert len(pick_calls) == 4 - assert len(aspirates) == len(dispenses) == 4 - assert aspirates[0]["resources"] == sources[0:8] - assert aspirates[-1]["resources"] == sources[24:32] - assert dispenses[0]["resources"] == targets[0:8] - assert dispenses[-1]["resources"] == targets[24:32] - - -def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small(): - lh = FakeLiquidHandler(channel_num=1) - lh.current_tip = iter(make_tip_iter(64)) - - source = DummyContainer("SRC") - targets = [DummyContainer(f"T{i}") for i in range(3)] - dis_vols = [10, 20, 30] # sum=60 - - run( - lh.transfer_liquid( - sources=[source], - targets=targets, - tip_racks=[], - use_channels=[0], - asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60 - dis_vols=dis_vols, - mix_times=0, - ) - ) - - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - assert len(aspirates) == 1 - assert aspirates[0]["resources"] == [source] - assert aspirates[0]["vols"] == [60.0] - assert aspirates[0]["use_channels"] == [0] - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0] - - -def test_one_to_many_eight_channel_basic(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(128)) - - source = DummyContainer("SRC") - targets = [DummyContainer(f"T{i}") for i in range(8)] - dis_vols = [i + 1 for i in range(8)] - - run( - lh.transfer_liquid( - sources=[source], - targets=targets, - tip_racks=[], - use_channels=list(range(8)), - asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自) - dis_vols=dis_vols, - mix_times=0, - ) - ) - - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - assert aspirates[0]["resources"] == [source] * 8 - assert aspirates[0]["vols"] == [float(v) for v in dis_vols] - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert dispenses[0]["resources"] == targets - assert dispenses[0]["vols"] == [float(v) for v in dis_vols] - - -def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default(): - lh = FakeLiquidHandler(channel_num=1) - lh.current_tip = iter(make_tip_iter(128)) - - sources = [DummyContainer(f"S{i}") for i in range(3)] - target = DummyContainer("T") - asp_vols = [5, 6, 7] - - run( - lh.transfer_liquid( - sources=sources, - targets=[target], - tip_racks=[], - use_channels=[0], - asp_vols=asp_vols, - dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol - mix_times=0, - ) - ) - - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols] - assert all(d["resources"] == [target] for d in dispenses) - - -def test_many_to_one_single_channel_before_stage_mixes_target_once(): - lh = FakeLiquidHandler(channel_num=1) - lh.current_tip = iter(make_tip_iter(128)) - - sources = [DummyContainer("S0"), DummyContainer("S1")] - target = DummyContainer("T") - - run( - lh.transfer_liquid( - sources=sources, - targets=[target], - tip_racks=[], - use_channels=[0], - asp_vols=[5, 6], - dis_vols=1, - mix_stage="before", + mix_stage="after", mix_times=2, - mix_vol=4, + mix_vol=3, ) ) - names = [name for name, _ in lh.calls] - assert names[0] == "mix" - assert names.count("mix") == 1 - - -def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source(): - lh = FakeLiquidHandler(channel_num=1) - lh.current_tip = iter(make_tip_iter(128)) - - sources = [DummyContainer(f"S{i}") for i in range(3)] - target = DummyContainer("T") - asp_vols = [5, 6, 7] - dis_vols = [1, 2, 3] - - run( - lh.transfer_liquid( - sources=sources, - targets=[target], - tip_racks=[], - use_channels=[0], - asp_vols=asp_vols, - dis_vols=dis_vols, # 比例模式 - mix_times=0, - ) - ) - - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols] - - -def test_many_to_one_eight_channel_basic(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(256)) - - sources = [DummyContainer(f"S{i}") for i in range(8)] - target = DummyContainer("T") - asp_vols = [10 + i for i in range(8)] - - run( - lh.transfer_liquid( - sources=sources, - targets=[target], - tip_racks=[], - use_channels=list(range(8)), - asp_vols=asp_vols, - dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol - mix_times=0, - ) - ) - - aspirates = [payload for name, payload in lh.calls if name == "aspirate"] - dispenses = [payload for name, payload in lh.calls if name == "dispense"] - assert aspirates[0]["resources"] == sources - assert aspirates[0]["vols"] == [float(v) for v in asp_vols] - assert dispenses[0]["resources"] == [target] * 8 - assert dispenses[0]["vols"] == [float(v) for v in asp_vols] - - -def test_transfer_liquid_mode_detection_unsupported_shape_raises(): - lh = FakeLiquidHandler(channel_num=8) - lh.current_tip = iter(make_tip_iter(64)) - - sources = [DummyContainer("S0"), DummyContainer("S1")] - targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")] - - with pytest.raises(ValueError, match="Unsupported transfer mode"): - run( - lh.transfer_liquid( - sources=sources, - targets=targets, - tip_racks=[], - use_channels=[0], - asp_vols=[1, 1], - dis_vols=[1, 1, 1], - mix_times=0, - ) - ) - + _assert_samples_match(sources, targets) diff --git a/tests/devices/liquid_handling/unit_test.py b/tests/devices/liquid_handling/unit_test.py new file mode 100644 index 0000000..b7a72cd --- /dev/null +++ b/tests/devices/liquid_handling/unit_test.py @@ -0,0 +1,547 @@ +import asyncio +from dataclasses import dataclass +from typing import Any, Iterable, List, Optional, Sequence, Tuple + +import pytest + +from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract + + +@dataclass(frozen=True) +class DummyContainer: + name: str + + def __repr__(self) -> str: # pragma: no cover + return f"DummyContainer({self.name})" + + +@dataclass(frozen=True) +class DummyTipSpot: + name: str + + def __repr__(self) -> str: # pragma: no cover + return f"DummyTipSpot({self.name})" + + +def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]: + """Yield lists so code can safely call `tip.extend(next(self.current_tip))`.""" + for i in range(n): + yield [DummyTipSpot(f"tip_{i}")] + + +class FakeLiquidHandler(LiquidHandlerAbstract): + """不初始化真实 backend/deck;仅用来记录 transfer_liquid 内部调用序列。""" + + def __init__(self, channel_num: int = 8): + # 不调用 super().__init__,避免真实硬件/后端依赖 + self.channel_num = channel_num + self.support_touch_tip = True + self.current_tip = iter(make_tip_iter()) + self.calls: List[Tuple[str, Any]] = [] + + async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs): + self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels})) + + async def aspirate( + self, + resources: Sequence[Any], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Any = None, + liquid_height: Any = None, + blow_out_air_volume: Any = None, + spread: str = "wide", + **backend_kwargs, + ): + self.calls.append( + ( + "aspirate", + { + "resources": list(resources), + "vols": list(vols), + "use_channels": list(use_channels) if use_channels is not None else None, + "flow_rates": list(flow_rates) if flow_rates is not None else None, + "offsets": list(offsets) if offsets is not None else None, + "liquid_height": list(liquid_height) if liquid_height is not None else None, + "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, + }, + ) + ) + + async def dispense( + self, + resources: Sequence[Any], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Any = None, + liquid_height: Any = None, + blow_out_air_volume: Any = None, + spread: str = "wide", + **backend_kwargs, + ): + self.calls.append( + ( + "dispense", + { + "resources": list(resources), + "vols": list(vols), + "use_channels": list(use_channels) if use_channels is not None else None, + "flow_rates": list(flow_rates) if flow_rates is not None else None, + "offsets": list(offsets) if offsets is not None else None, + "liquid_height": list(liquid_height) if liquid_height is not None else None, + "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, + }, + ) + ) + + async def discard_tips(self, use_channels=None, *args, **kwargs): + # 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数) + self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None})) + + async def custom_delay(self, seconds=0, msg=None): + self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg})) + + async def touch_tip(self, targets): + # 原实现会访问 targets.get_size_x() 等;测试里只记录调用 + self.calls.append(("touch_tip", {"targets": targets})) + +def run(coro): + return asyncio.run(coro) + + +def test_one_to_one_single_channel_basic_calls(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + targets = [DummyContainer(f"T{i}") for i in range(3)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=[1, 2, 3], + dis_vols=[4, 5, 6], + mix_times=None, # 应该仍能执行(不 mix) + ) + ) + + assert [c[0] for c in lh.calls].count("pick_up_tips") == 3 + assert [c[0] for c in lh.calls].count("aspirate") == 3 + assert [c[0] for c in lh.calls].count("dispense") == 3 + assert [c[0] for c in lh.calls].count("discard_tips") == 3 + + # 每次 aspirate/dispense 都是单孔列表 + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert aspirates[0]["resources"] == [sources[0]] + assert aspirates[0]["vols"] == [1.0] + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert dispenses[2]["resources"] == [targets[2]] + assert dispenses[2]["vols"] == [6.0] + + +def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(16)) + + source = DummyContainer("S0") + target = DummyContainer("T0") + + run( + lh.transfer_liquid( + sources=[source], + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=[5], + dis_vols=[5], + mix_stage="before", + mix_times=1, + mix_vol=3, + ) + ) + + aspirate_calls = [(idx, payload) for idx, (name, payload) in enumerate(lh.calls) if name == "aspirate"] + assert len(aspirate_calls) >= 2 + mix_idx, mix_payload = aspirate_calls[0] + assert mix_payload["resources"] == [target] + assert mix_payload["vols"] == [3] + transfer_idx, transfer_payload = aspirate_calls[1] + assert transfer_payload["resources"] == [source] + assert mix_idx < transfer_idx + + +def test_one_to_one_eight_channel_groups_by_8(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(256)) + + sources = [DummyContainer(f"S{i}") for i in range(16)] + targets = [DummyContainer(f"T{i}") for i in range(16)] + asp_vols = list(range(1, 17)) + dis_vols = list(range(101, 117)) + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + mix_times=0, # 触发逻辑但不 mix + ) + ) + + # 16 个任务 -> 2 组,每组 8 通道一起做 + assert [c[0] for c in lh.calls].count("pick_up_tips") == 2 + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == 2 + assert len(dispenses) == 2 + + assert aspirates[0]["resources"] == sources[0:8] + assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]] + assert dispenses[1]["resources"] == targets[8:16] + assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]] + + +def test_one_to_one_eight_channel_requires_multiple_of_8_targets(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer(f"S{i}") for i in range(9)] + targets = [DummyContainer(f"T{i}") for i in range(9)] + + with pytest.raises(ValueError, match="multiple of 8"): + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=[1] * 9, + dis_vols=[1] * 9, + mix_times=0, + ) + ) + + +def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(512)) + + sources = [DummyContainer(f"S{i}") for i in range(16)] + targets = [DummyContainer(f"T{i}") for i in range(16)] + asp_vols = [i + 1 for i in range(16)] + dis_vols = [200 + i for i in range(16)] + asp_flow_rates = [0.1 * (i + 1) for i in range(16)] + dis_flow_rates = [0.2 * (i + 1) for i in range(16)] + offsets = [f"offset_{i}" for i in range(16)] + liquid_heights = [i * 0.5 for i in range(16)] + blow_out_air_volume = [i + 0.05 for i in range(16)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + asp_flow_rates=asp_flow_rates, + dis_flow_rates=dis_flow_rates, + offsets=offsets, + liquid_height=liquid_heights, + blow_out_air_volume=blow_out_air_volume, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == len(dispenses) == 2 + + for batch_idx in range(2): + start = batch_idx * 8 + end = start + 8 + asp_call = aspirates[batch_idx] + dis_call = dispenses[batch_idx] + assert asp_call["resources"] == sources[start:end] + assert asp_call["flow_rates"] == asp_flow_rates[start:end] + assert asp_call["offsets"] == offsets[start:end] + assert asp_call["liquid_height"] == liquid_heights[start:end] + assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end] + assert dis_call["flow_rates"] == dis_flow_rates[start:end] + assert dis_call["offsets"] == offsets[start:end] + assert dis_call["liquid_height"] == liquid_heights[start:end] + assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end] + + +def test_one_to_one_eight_channel_handles_32_tasks_four_batches(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(1024)) + + sources = [DummyContainer(f"S{i}") for i in range(32)] + targets = [DummyContainer(f"T{i}") for i in range(32)] + asp_vols = [i + 1 for i in range(32)] + dis_vols = [300 + i for i in range(32)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + mix_times=0, + ) + ) + + pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"] + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(pick_calls) == 4 + assert len(aspirates) == len(dispenses) == 4 + assert aspirates[0]["resources"] == sources[0:8] + assert aspirates[-1]["resources"] == sources[24:32] + assert dispenses[0]["resources"] == targets[0:8] + assert dispenses[-1]["resources"] == targets[24:32] + + +def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(64)) + + source = DummyContainer("SRC") + targets = [DummyContainer(f"T{i}") for i in range(3)] + dis_vols = [10, 20, 30] # sum=60 + + run( + lh.transfer_liquid( + sources=[source], + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60 + dis_vols=dis_vols, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert len(aspirates) == 1 + assert aspirates[0]["resources"] == [source] + assert aspirates[0]["vols"] == [60.0] + assert aspirates[0]["use_channels"] == [0] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0] + + +def test_one_to_many_eight_channel_basic(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(128)) + + source = DummyContainer("SRC") + targets = [DummyContainer(f"T{i}") for i in range(8)] + dis_vols = [i + 1 for i in range(8)] + + run( + lh.transfer_liquid( + sources=[source], + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自) + dis_vols=dis_vols, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert aspirates[0]["resources"] == [source] * 8 + assert aspirates[0]["vols"] == [float(v) for v in dis_vols] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert dispenses[0]["resources"] == targets + assert dispenses[0]["vols"] == [float(v) for v in dis_vols] + + +def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + target = DummyContainer("T") + asp_vols = [5, 6, 7] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=asp_vols, + dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol + mix_times=0, + ) + ) + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols] + assert all(d["resources"] == [target] for d in dispenses) + + +def test_many_to_one_single_channel_before_stage_mixes_target_once(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer("S0"), DummyContainer("S1")] + target = DummyContainer("T") + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=[5, 6], + dis_vols=1, + mix_stage="before", + mix_times=2, + mix_vol=4, + ) + ) + + aspirate_calls = [(idx, payload) for idx, (name, payload) in enumerate(lh.calls) if name == "aspirate"] + assert len(aspirate_calls) >= 1 + mix_idx, mix_payload = aspirate_calls[0] + assert mix_payload["resources"] == [target] + assert mix_payload["vols"] == [4] + # 第一個 mix 之後會真正開始吸 source + assert any(call["resources"] == [sources[0]] for _, call in aspirate_calls[1:]) + + +def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + target = DummyContainer("T") + asp_vols = [5, 6, 7] + dis_vols = [1, 2, 3] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=asp_vols, + dis_vols=dis_vols, # 比例模式 + mix_times=0, + ) + ) + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols] + + +def test_many_to_one_eight_channel_basic(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(256)) + + sources = [DummyContainer(f"S{i}") for i in range(8)] + target = DummyContainer("T") + asp_vols = [10 + i for i in range(8)] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert aspirates[0]["resources"] == sources + assert aspirates[0]["vols"] == [float(v) for v in asp_vols] + assert dispenses[0]["resources"] == [target] * 8 + assert dispenses[0]["vols"] == [float(v) for v in asp_vols] + + +def test_transfer_liquid_mode_detection_unsupported_shape_raises(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer("S0"), DummyContainer("S1")] + targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")] + + with pytest.raises(ValueError, match="Unsupported transfer mode"): + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=[1, 1], + dis_vols=[1, 1, 1], + mix_times=0, + ) + ) + + +def test_mix_single_target_produces_matching_cycles(): + lh = FakeLiquidHandler(channel_num=1) + target = DummyContainer("T_mix") + + run(lh.mix(targets=[target], mix_time=2, mix_vol=5)) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == len(dispenses) == 2 + assert all(call["resources"] == [target] for call in aspirates) + assert all(call["vols"] == [5] for call in aspirates) + assert all(call["resources"] == [target] for call in dispenses) + assert all(call["vols"] == [5] for call in dispenses) + + +def test_mix_multiple_targets_supports_per_target_offsets(): + lh = FakeLiquidHandler(channel_num=1) + targets = [DummyContainer("T0"), DummyContainer("T1")] + offsets = ["left", "right"] + heights = [0.1, 0.2] + rates = [0.5, 1.0] + + run( + lh.mix( + targets=targets, + mix_time=1, + mix_vol=3, + offsets=offsets, + height_to_bottom=heights, + mix_rate=rates, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert len(aspirates) == 2 + assert aspirates[0]["resources"] == [targets[0]] + assert aspirates[0]["offsets"] == [offsets[0]] + assert aspirates[0]["liquid_height"] == [heights[0]] + assert aspirates[0]["flow_rates"] == [rates[0]] + assert aspirates[1]["resources"] == [targets[1]] + assert aspirates[1]["offsets"] == [offsets[1]] + assert aspirates[1]["liquid_height"] == [heights[1]] + assert aspirates[1]["flow_rates"] == [rates[1]] + + diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index d02129c..a15d91f 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -207,7 +207,14 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] - for resource, volume, channel in zip(resources, vols, use_channels): + # 处理 use_channels 为 None 的情况(通常用于单通道操作) + if use_channels is None: + # 对于单通道操作,推断通道为 [0] + channels_to_use = [0] * len(resources) + else: + channels_to_use = use_channels + + for resource, volume, channel in zip(resources, vols, channels_to_use): res_samples.append({"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)}) res_volumes.append(volume) self.pending_liquids_dict[channel] = { @@ -920,6 +927,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) @@ -983,6 +991,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) @@ -1165,6 +1174,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) await self.aspirate( @@ -1199,6 +1209,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) @@ -1235,6 +1246,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) await self.aspirate( @@ -1271,6 +1283,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) @@ -1327,6 +1340,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[idx:idx + 1] if offsets and len(offsets) > idx else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) # 从源容器吸液(总体积) @@ -1366,6 +1380,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[idx:idx+1] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if touch_tip: await self.touch_tip([target]) @@ -1401,6 +1416,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[i:i + 8] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) # 从源容器吸液(8个通道都从同一个源,但每个通道的吸液体积不同) @@ -1446,6 +1462,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if touch_tip: @@ -1497,10 +1514,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): f"(matching `asp_vols`). Got length {len(dis_vols)}." ) + need_mix_after = mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0 + defer_final_discard = need_mix_after or touch_tip + if len(use_channels) == 1: # 单通道模式:多次吸液,一次分液 - # 先混合前(如果需要) + + # 如果需要 before mix,先 pick up tip 并执行 mix if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + await self.mix( targets=[target], mix_time=mix_times, @@ -1508,8 +1534,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[0:1] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) - + + await self.discard_tips(use_channels=use_channels) + # 从每个源容器吸液并分液到目标容器 for idx, source in enumerate(sources): tip = [] @@ -1527,10 +1556,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None, spread=spread, ) - + if delays is not None: await self.custom_delay(seconds=delays[0]) - + # 分液到目标容器 if use_proportional_mixing: # 按不同比例混合:使用对应的 dis_vols @@ -1546,7 +1575,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): dis_offset = offsets[0] if offsets and len(offsets) > 0 else None dis_liquid_height = liquid_height[0] if liquid_height and len(liquid_height) > 0 else None dis_blow_out = blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None - + await self.dispense( resources=[target], vols=[dis_vol], @@ -1557,14 +1586,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): liquid_height=[dis_liquid_height] if dis_liquid_height is not None else None, spread=spread, ) - + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - - await self.discard_tips(use_channels=use_channels) - + + if not (defer_final_discard and idx == len(sources) - 1): + await self.discard_tips(use_channels=use_channels) + # 最后在目标容器中混合(如果需要) - if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: + if need_mix_after: await self.mix( targets=[target], mix_time=mix_times, @@ -1572,18 +1602,27 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[0:1] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if touch_tip: await self.touch_tip([target]) + + if defer_final_discard: + await self.discard_tips(use_channels=use_channels) elif len(use_channels) == 8: # 8通道模式:需要确保源数量是8的倍数 if len(sources) % 8 != 0: raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.") - - # 每次处理8个源 + + # 如果需要 before mix,先 pick up tips 并执行 mix if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + await self.mix( targets=[target], mix_time=mix_times, @@ -1591,8 +1630,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[0:1] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) + await self.discard_tips([0,1,2,3,4,5,6,7]) + for i in range(0, len(sources), 8): tip = [] for _ in range(len(use_channels)): @@ -1650,11 +1692,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - - await self.discard_tips([0,1,2,3,4,5,6,7]) - + + if not (defer_final_discard and i + 8 >= len(sources)): + await self.discard_tips([0,1,2,3,4,5,6,7]) + # 最后在目标容器中混合(如果需要) - if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: + if need_mix_after: await self.mix( targets=[target], mix_time=mix_times, @@ -1662,11 +1705,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): offsets=offsets[0:1] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, + use_channels=use_channels, ) if touch_tip: await self.touch_tip([target]) + if defer_final_discard: + await self.discard_tips([0,1,2,3,4,5,6,7]) + # except Exception as e: # traceback.print_exc() # raise RuntimeError(f"Liquid addition failed: {e}") from e @@ -1686,7 +1733,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): print(f"Waiting time: {msg}") print(f"Current time: {time.strftime('%H:%M:%S')}") print(f"Time to finish: {time.strftime('%H:%M:%S', time.localtime(time.time() + seconds))}") - await self._ros_node.sleep(seconds) + # Use ROS node sleep if available, otherwise use asyncio.sleep + if hasattr(self, '_ros_node') and self._ros_node is not None: + await self._ros_node.sleep(seconds) + else: + import asyncio + await asyncio.sleep(seconds) if msg: print(f"Done: {msg}") print(f"Current time: {time.strftime('%H:%M:%S')}") @@ -1725,27 +1777,59 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom: Optional[float] = None, offsets: Optional[Coordinate] = None, mix_rate: Optional[float] = None, + use_channels: Optional[List[int]] = None, none_keys: List[str] = [], ): - if mix_time is None: # No mixing required + if mix_time is None or mix_time <= 0: # No mixing required return """Mix the liquid in the target wells.""" + if mix_vol is None: + raise ValueError("`mix_vol` must be provided when `mix_time` is set.") + + targets_list: List[Container] = list(targets) + if len(targets_list) == 0: + return + + def _expand(value, count: int): + if value is None: + return [None] * count + if isinstance(value, (list, tuple)): + if len(value) != count: + raise ValueError("Length of per-target parameters must match targets.") + return list(value) + return [value] * count + + offsets_list = _expand(offsets, len(targets_list)) + heights_list = _expand(height_to_bottom, len(targets_list)) + rates_list = _expand(mix_rate, len(targets_list)) + for _ in range(mix_time): - await self.aspirate( - resources=[targets], - vols=[mix_vol], - flow_rates=[mix_rate] if mix_rate else None, - offsets=[offsets] if offsets else None, - liquid_height=[height_to_bottom] if height_to_bottom else None, - ) - await self.custom_delay(seconds=1) - await self.dispense( - resources=[targets], - vols=[mix_vol], - flow_rates=[mix_rate] if mix_rate else None, - offsets=[offsets] if offsets else None, - liquid_height=[height_to_bottom] if height_to_bottom else None, - ) + for idx, target in enumerate(targets_list): + offset_arg = ( + [offsets_list[idx]] if offsets_list[idx] is not None else None + ) + height_arg = ( + [heights_list[idx]] if heights_list[idx] is not None else None + ) + rate_arg = [rates_list[idx]] if rates_list[idx] is not None else None + + await self.aspirate( + resources=[target], + vols=[mix_vol], + use_channels=use_channels, + flow_rates=rate_arg, + offsets=offset_arg, + liquid_height=height_arg, + ) + await self.custom_delay(seconds=1) + await self.dispense( + resources=[target], + vols=[mix_vol], + use_channels=use_channels, + flow_rates=rate_arg, + offsets=offset_arg, + liquid_height=height_arg, + ) def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]: """Yield tips from a list of TipRacks one-by-one until depleted."""