test_transfer_liquid_3

unilabos/test/experiments/prcxi_9320_with_res_test.json需要起的部分:
"debug": true,
"setup": false,
"simulator": true
This commit is contained in:
Xianwei Qi
2025-12-31 16:39:18 +08:00
parent 888c6cf542
commit 63eb0c0a4c
4 changed files with 957 additions and 491 deletions

View File

@@ -0,0 +1,15 @@
# Liquid handling 集成测试
`test_transfer_liquid.py` 现在会调用 PRCXI 的 RViz 仿真 backend运行前请确保
1. 已安装包含 `pylabrobot``rclpy` 的运行环境;
2. 启动 ROS 依赖(`rviz` 可选,但是 `rviz_backend` 会创建 ROS 节点);
3. 在 shell 中设置 `UNILAB_SIM_TEST=1`,否则 pytest 会自动跳过这些慢速用例:
```bash
export UNILAB_SIM_TEST=1
pytest tests/devices/liquid_handling/test_transfer_liquid.py -m slow
```
如果只需验证逻辑层(不依赖仿真),可以直接运行 `tests/devices/liquid_handling/unit_test.py`,该文件使用 Fake backend适合作为 CI 的快速测试。***

View File

@@ -1,505 +1,325 @@
"""
PRCXI transfer_liquid 集成测试。
这些用例会启动 UniLiquidHandler RViz 仿真 backend需要同时满足
1. 安装 pylabrobot 依赖;
2. 设置环境变量 UNILAB_SIM_TEST=1
3. 具备 ROS 运行环境rviz_backend 会创建 ROS 节点)。
"""
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Iterable, List, Optional, Sequence, Tuple
from typing import List, Sequence
import pytest
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Deck, PRCXI9300Trash
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import (
PRCXI_300ul_Tips,
PRCXI_BioER_96_wellplate,
)
pytestmark = pytest.mark.slow
try:
from pylabrobot.resources import Coordinate, Deck, Plate, TipRack, Well
except ImportError: # pragma: no cover - 测试环境缺少 pylabrobot 时直接跳过
Coordinate = Deck = Plate = TipRack = Well = None # type: ignore[assignment]
PYLABROBOT_AVAILABLE = False
else:
PYLABROBOT_AVAILABLE = True
SIM_ENV_VAR = "UNILAB_SIM_TEST"
@dataclass(frozen=True)
class DummyContainer:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyContainer({self.name})"
@dataclass(frozen=True)
class DummyTipSpot:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyTipSpot({self.name})"
def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]:
"""Yield lists so code can safely call `tip.extend(next(self.current_tip))`."""
for i in range(n):
yield [DummyTipSpot(f"tip_{i}")]
class FakeLiquidHandler(LiquidHandlerAbstract):
"""不初始化真实 backend/deck仅用来记录 transfer_liquid 内部调用序列。"""
def __init__(self, channel_num: int = 8):
# 不调用 super().__init__避免真实硬件/后端依赖
self.channel_num = channel_num
self.support_touch_tip = True
self.current_tip = iter(make_tip_iter())
self.calls: List[Tuple[str, Any]] = []
async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs):
self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels}))
async def aspirate(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"aspirate",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def dispense(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"dispense",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def discard_tips(self, use_channels=None, *args, **kwargs):
# 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数)
self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None}))
async def custom_delay(self, seconds=0, msg=None):
self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg}))
async def touch_tip(self, targets):
# 原实现会访问 targets.get_size_x() 等;测试里只记录调用
self.calls.append(("touch_tip", {"targets": targets}))
async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None):
self.calls.append(
(
"mix",
{
"targets": targets,
"mix_time": mix_time,
"mix_vol": mix_vol,
},
)
)
@dataclass
class SimulationContext:
handler: LiquidHandlerAbstract
deck: Deck
tip_rack: TipRack
source_plate: Plate
target_plate: Plate
waste_plate: Plate
channel_num: int
def run(coro):
return asyncio.run(coro)
def test_one_to_one_single_channel_basic_calls():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
def _ensure_unilabos_extra(well: Well) -> None:
if not hasattr(well, "unilabos_extra") or well.unilabos_extra is None:
well.unilabos_extra = {} # type: ignore[attr-defined]
sources = [DummyContainer(f"S{i}") for i in range(3)]
targets = [DummyContainer(f"T{i}") for i in range(3)]
def _assign_sample_uuid(well: Well, value: str) -> None:
_ensure_unilabos_extra(well)
well.unilabos_extra["sample_uuid"] = value # type: ignore[attr-defined]
def _zero_coordinate() -> Coordinate:
if hasattr(Coordinate, "zero"):
return Coordinate.zero()
return Coordinate(0, 0, 0)
def _zero_offsets(count: int) -> List[Coordinate]:
return [_zero_coordinate() for _ in range(count)]
def _build_simulation_deck() -> tuple[PRCXI9300Deck, TipRack, Plate, Plate, Plate, PRCXI9300Trash]:
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=542, size_y=374, size_z=50)
tip_rack = PRCXI_300ul_Tips("Tips")
source_plate = PRCXI_BioER_96_wellplate("SourcePlate")
target_plate = PRCXI_BioER_96_wellplate("TargetPlate")
waste_plate = PRCXI_BioER_96_wellplate("WastePlate")
trash = PRCXI9300Trash(name="trash", size_x=100, size_y=100, size_z=50)
deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
deck.assign_child_resource(source_plate, location=Coordinate(150, 0, 0))
deck.assign_child_resource(target_plate, location=Coordinate(300, 0, 0))
deck.assign_child_resource(waste_plate, location=Coordinate(450, 0, 0))
deck.assign_child_resource(trash, location=Coordinate(150, -120, 0))
return deck, tip_rack, source_plate, target_plate, waste_plate, trash
def _stop_backend(handler: LiquidHandlerAbstract) -> None:
try:
run(handler.backend.stop())
except Exception: # pragma: no cover - 如果 backend 已经停止
pass
simulate_handler = getattr(handler, "_simulate_handler", None)
if simulate_handler is not None and getattr(simulate_handler, "backend", None) is not None:
try:
run(simulate_handler.backend.stop())
except Exception: # pragma: no cover
pass
@pytest.fixture(params=[1, 8])
def prcxi_simulation(request) -> SimulationContext:
if not PYLABROBOT_AVAILABLE:
pytest.skip("pylabrobot is required for PRCXI simulation tests.")
if os.environ.get(SIM_ENV_VAR) != "1":
pytest.skip(f"Set {SIM_ENV_VAR}=1 to run PRCXI simulation tests.")
channel_num = request.param
deck, tip_rack, source_plate, target_plate, waste_plate, _trash = _build_simulation_deck()
backend_cfg = {
"type": "unilabos.devices.liquid_handling.rviz_backend.UniLiquidHandlerRvizBackend",
"channel_num": channel_num,
"total_height": 310,
"lh_device_id": f"pytest_prcxi_{channel_num}",
}
handler = LiquidHandlerAbstract(
backend=backend_cfg,
deck=deck,
simulator=True,
channel_num=channel_num,
total_height=310,
)
run(handler.setup())
handler.set_tiprack([tip_rack])
handler.support_touch_tip = False
context = SimulationContext(
handler=handler,
deck=deck,
tip_rack=tip_rack,
source_plate=source_plate,
target_plate=target_plate,
waste_plate=waste_plate,
channel_num=channel_num,
)
yield context
_stop_backend(handler)
def _pick_wells(plate: Plate, start: int, count: int) -> List[Well]:
wells = plate.children[start : start + count]
for well in wells:
_ensure_unilabos_extra(well)
return wells
def _assert_samples_match(sources: Sequence[Well], targets: Sequence[Well]) -> None:
for src, tgt in zip(sources, targets):
src_uuid = getattr(src, "unilabos_extra", {}).get("sample_uuid")
tgt_uuid = getattr(tgt, "unilabos_extra", {}).get("sample_uuid")
assert tgt_uuid == src_uuid
def test_transfer_liquid_single_channel_one_to_one(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
handler = prcxi_simulation.handler
sources = _pick_wells(prcxi_simulation.source_plate, start=0, count=3)
targets = _pick_wells(prcxi_simulation.target_plate, start=10, count=3)
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"single_{idx}")
offsets = _zero_offsets(max(len(sources), len(targets)))
run(
lh.transfer_liquid(
handler.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
tip_racks=[prcxi_simulation.tip_rack],
use_channels=[0],
asp_vols=[1, 2, 3],
dis_vols=[4, 5, 6],
mix_times=None, # 应该仍能执行(不 mix
asp_vols=[5.0, 6.0, 7.0],
dis_vols=[10.0, 11.0, 12.0],
offsets=offsets,
mix_times=None,
)
)
assert [c[0] for c in lh.calls].count("pick_up_tips") == 3
assert [c[0] for c in lh.calls].count("aspirate") == 3
assert [c[0] for c in lh.calls].count("dispense") == 3
assert [c[0] for c in lh.calls].count("discard_tips") == 3
# 每次 aspirate/dispense 都是单孔列表
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [sources[0]]
assert aspirates[0]["vols"] == [1.0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[2]["resources"] == [targets[2]]
assert dispenses[2]["vols"] == [6.0]
_assert_samples_match(sources, targets)
def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(16))
def test_transfer_liquid_single_channel_one_to_many(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
source = DummyContainer("S0")
target = DummyContainer("T0")
handler = prcxi_simulation.handler
source = _pick_wells(prcxi_simulation.source_plate, start=20, count=1)[0]
targets = _pick_wells(prcxi_simulation.target_plate, start=30, count=3)
_assign_sample_uuid(source, "one_to_many_source")
offsets = _zero_offsets(max(len(targets), 1))
run(
lh.transfer_liquid(
handler.transfer_liquid(
sources=[source],
targets=[target],
tip_racks=[],
targets=targets,
tip_racks=[prcxi_simulation.tip_rack],
use_channels=[0],
asp_vols=[5],
dis_vols=[5],
mix_stage="before",
asp_vols=60.0,
dis_vols=[15.0, 20.0, 25.0],
offsets=offsets,
mix_times=0,
)
)
for target in targets:
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "one_to_many_source"
def test_transfer_liquid_single_channel_many_to_one(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
handler = prcxi_simulation.handler
sources = _pick_wells(prcxi_simulation.source_plate, start=40, count=3)
target = _pick_wells(prcxi_simulation.target_plate, start=50, count=1)[0]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"many_to_one_{idx}")
offsets = _zero_offsets(max(len(sources), len([target])))
run(
handler.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[prcxi_simulation.tip_rack],
use_channels=[0],
asp_vols=[8.0, 9.0, 10.0],
dis_vols=1,
offsets=offsets,
mix_stage="after",
mix_times=1,
mix_vol=5,
)
)
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "many_to_one_2"
def test_transfer_liquid_eight_channel_batches(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 8:
pytest.skip("仅在八通道配置下运行")
handler = prcxi_simulation.handler
sources = _pick_wells(prcxi_simulation.source_plate, start=0, count=8)
targets = _pick_wells(prcxi_simulation.target_plate, start=16, count=8)
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"batch_{idx}")
offsets = _zero_offsets(len(targets))
use_channels = list(range(8))
asp_vols = [float(i + 1) * 2 for i in range(8)]
dis_vols = [float(i + 10) for i in range(8)]
run(
handler.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[prcxi_simulation.tip_rack],
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
offsets=offsets,
mix_times=0,
)
)
_assert_samples_match(sources, targets)
@pytest.mark.parametrize("mix_stage", ["before", "after", "both"])
def test_transfer_liquid_mix_stages(prcxi_simulation: SimulationContext, mix_stage: str):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
handler = prcxi_simulation.handler
target = _pick_wells(prcxi_simulation.target_plate, start=70, count=1)[0]
sources = _pick_wells(prcxi_simulation.source_plate, start=80, count=2)
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"mix_stage_{mix_stage}_{idx}")
run(
handler.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[prcxi_simulation.tip_rack],
use_channels=[0],
asp_vols=[4.0, 5.0],
dis_vols=1,
offsets=_zero_offsets(len(sources)),
mix_stage=mix_stage,
mix_times=2,
mix_vol=3,
)
)
names = [name for name, _ in lh.calls]
assert names.count("mix") == 1
assert names.index("mix") < names.index("aspirate")
# mix_stage 前后都应该保留最新源的 sample_uuid
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == f"mix_stage_{mix_stage}_1"
if prcxi_simulation.channel_num != 8:
pytest.skip("仅在八通道配置下运行")
handler = prcxi_simulation.handler
sources = _pick_wells(prcxi_simulation.source_plate, start=0, count=8)
targets = _pick_wells(prcxi_simulation.target_plate, start=16, count=8)
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"batch_{idx}")
offsets = _zero_offsets(len(targets))
def test_one_to_one_eight_channel_groups_by_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = list(range(1, 17))
dis_vols = list(range(101, 117))
use_channels = list(range(8))
asp_vols = [float(i + 1) * 2 for i in range(8)]
dis_vols = [float(i + 10) for i in range(8)]
run(
lh.transfer_liquid(
handler.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
tip_racks=[prcxi_simulation.tip_rack],
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0, # 触发逻辑但不 mix
)
)
# 16 个任务 -> 2 组,每组 8 通道一起做
assert [c[0] for c in lh.calls].count("pick_up_tips") == 2
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == 2
assert len(dispenses) == 2
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]]
assert dispenses[1]["resources"] == targets[8:16]
assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]]
def test_one_to_one_eight_channel_requires_multiple_of_8_targets():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(9)]
targets = [DummyContainer(f"T{i}") for i in range(9)]
with pytest.raises(ValueError, match="multiple of 8"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=[1] * 9,
dis_vols=[1] * 9,
mix_times=0,
)
)
def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(512))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = [i + 1 for i in range(16)]
dis_vols = [200 + i for i in range(16)]
asp_flow_rates = [0.1 * (i + 1) for i in range(16)]
dis_flow_rates = [0.2 * (i + 1) for i in range(16)]
offsets = [f"offset_{i}" for i in range(16)]
liquid_heights = [i * 0.5 for i in range(16)]
blow_out_air_volume = [i + 0.05 for i in range(16)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
liquid_height=liquid_heights,
blow_out_air_volume=blow_out_air_volume,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == len(dispenses) == 2
for batch_idx in range(2):
start = batch_idx * 8
end = start + 8
asp_call = aspirates[batch_idx]
dis_call = dispenses[batch_idx]
assert asp_call["resources"] == sources[start:end]
assert asp_call["flow_rates"] == asp_flow_rates[start:end]
assert asp_call["offsets"] == offsets[start:end]
assert asp_call["liquid_height"] == liquid_heights[start:end]
assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
assert dis_call["flow_rates"] == dis_flow_rates[start:end]
assert dis_call["offsets"] == offsets[start:end]
assert dis_call["liquid_height"] == liquid_heights[start:end]
assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
def test_one_to_one_eight_channel_handles_32_tasks_four_batches():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(1024))
sources = [DummyContainer(f"S{i}") for i in range(32)]
targets = [DummyContainer(f"T{i}") for i in range(32)]
asp_vols = [i + 1 for i in range(32)]
dis_vols = [300 + i for i in range(32)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0,
)
)
pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"]
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(pick_calls) == 4
assert len(aspirates) == len(dispenses) == 4
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[-1]["resources"] == sources[24:32]
assert dispenses[0]["resources"] == targets[0:8]
assert dispenses[-1]["resources"] == targets[24:32]
def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(3)]
dis_vols = [10, 20, 30] # sum=60
run(
lh.transfer_liquid(
sources=[source],
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60
dis_vols=dis_vols,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert len(aspirates) == 1
assert aspirates[0]["resources"] == [source]
assert aspirates[0]["vols"] == [60.0]
assert aspirates[0]["use_channels"] == [0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0]
def test_one_to_many_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(128))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(8)]
dis_vols = [i + 1 for i in range(8)]
run(
lh.transfer_liquid(
sources=[source],
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自)
dis_vols=dis_vols,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [source] * 8
assert aspirates[0]["vols"] == [float(v) for v in dis_vols]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[0]["resources"] == targets
assert dispenses[0]["vols"] == [float(v) for v in dis_vols]
def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=asp_vols,
dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol
mix_times=0,
)
)
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols]
assert all(d["resources"] == [target] for d in dispenses)
def test_many_to_one_single_channel_before_stage_mixes_target_once():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer("S0"), DummyContainer("S1")]
target = DummyContainer("T")
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=[5, 6],
dis_vols=1,
mix_stage="before",
mix_stage="after",
mix_times=2,
mix_vol=4,
mix_vol=3,
)
)
names = [name for name, _ in lh.calls]
assert names[0] == "mix"
assert names.count("mix") == 1
def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
dis_vols = [1, 2, 3]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=asp_vols,
dis_vols=dis_vols, # 比例模式
mix_times=0,
)
)
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols]
def test_many_to_one_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(8)]
target = DummyContainer("T")
asp_vols = [10 + i for i in range(8)]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert aspirates[0]["resources"] == sources
assert aspirates[0]["vols"] == [float(v) for v in asp_vols]
assert dispenses[0]["resources"] == [target] * 8
assert dispenses[0]["vols"] == [float(v) for v in asp_vols]
def test_transfer_liquid_mode_detection_unsupported_shape_raises():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer("S0"), DummyContainer("S1")]
targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")]
with pytest.raises(ValueError, match="Unsupported transfer mode"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=[1, 1],
dis_vols=[1, 1, 1],
mix_times=0,
)
)
_assert_samples_match(sources, targets)

View File

@@ -0,0 +1,547 @@
import asyncio
from dataclasses import dataclass
from typing import Any, Iterable, List, Optional, Sequence, Tuple
import pytest
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
@dataclass(frozen=True)
class DummyContainer:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyContainer({self.name})"
@dataclass(frozen=True)
class DummyTipSpot:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyTipSpot({self.name})"
def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]:
"""Yield lists so code can safely call `tip.extend(next(self.current_tip))`."""
for i in range(n):
yield [DummyTipSpot(f"tip_{i}")]
class FakeLiquidHandler(LiquidHandlerAbstract):
"""不初始化真实 backend/deck仅用来记录 transfer_liquid 内部调用序列。"""
def __init__(self, channel_num: int = 8):
# 不调用 super().__init__避免真实硬件/后端依赖
self.channel_num = channel_num
self.support_touch_tip = True
self.current_tip = iter(make_tip_iter())
self.calls: List[Tuple[str, Any]] = []
async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs):
self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels}))
async def aspirate(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"aspirate",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def dispense(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"dispense",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def discard_tips(self, use_channels=None, *args, **kwargs):
# 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数)
self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None}))
async def custom_delay(self, seconds=0, msg=None):
self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg}))
async def touch_tip(self, targets):
# 原实现会访问 targets.get_size_x() 等;测试里只记录调用
self.calls.append(("touch_tip", {"targets": targets}))
def run(coro):
return asyncio.run(coro)
def test_one_to_one_single_channel_basic_calls():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(3)]
targets = [DummyContainer(f"T{i}") for i in range(3)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=[1, 2, 3],
dis_vols=[4, 5, 6],
mix_times=None, # 应该仍能执行(不 mix
)
)
assert [c[0] for c in lh.calls].count("pick_up_tips") == 3
assert [c[0] for c in lh.calls].count("aspirate") == 3
assert [c[0] for c in lh.calls].count("dispense") == 3
assert [c[0] for c in lh.calls].count("discard_tips") == 3
# 每次 aspirate/dispense 都是单孔列表
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [sources[0]]
assert aspirates[0]["vols"] == [1.0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[2]["resources"] == [targets[2]]
assert dispenses[2]["vols"] == [6.0]
def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(16))
source = DummyContainer("S0")
target = DummyContainer("T0")
run(
lh.transfer_liquid(
sources=[source],
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=[5],
dis_vols=[5],
mix_stage="before",
mix_times=1,
mix_vol=3,
)
)
aspirate_calls = [(idx, payload) for idx, (name, payload) in enumerate(lh.calls) if name == "aspirate"]
assert len(aspirate_calls) >= 2
mix_idx, mix_payload = aspirate_calls[0]
assert mix_payload["resources"] == [target]
assert mix_payload["vols"] == [3]
transfer_idx, transfer_payload = aspirate_calls[1]
assert transfer_payload["resources"] == [source]
assert mix_idx < transfer_idx
def test_one_to_one_eight_channel_groups_by_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = list(range(1, 17))
dis_vols = list(range(101, 117))
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0, # 触发逻辑但不 mix
)
)
# 16 个任务 -> 2 组,每组 8 通道一起做
assert [c[0] for c in lh.calls].count("pick_up_tips") == 2
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == 2
assert len(dispenses) == 2
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]]
assert dispenses[1]["resources"] == targets[8:16]
assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]]
def test_one_to_one_eight_channel_requires_multiple_of_8_targets():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(9)]
targets = [DummyContainer(f"T{i}") for i in range(9)]
with pytest.raises(ValueError, match="multiple of 8"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=[1] * 9,
dis_vols=[1] * 9,
mix_times=0,
)
)
def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(512))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = [i + 1 for i in range(16)]
dis_vols = [200 + i for i in range(16)]
asp_flow_rates = [0.1 * (i + 1) for i in range(16)]
dis_flow_rates = [0.2 * (i + 1) for i in range(16)]
offsets = [f"offset_{i}" for i in range(16)]
liquid_heights = [i * 0.5 for i in range(16)]
blow_out_air_volume = [i + 0.05 for i in range(16)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
liquid_height=liquid_heights,
blow_out_air_volume=blow_out_air_volume,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == len(dispenses) == 2
for batch_idx in range(2):
start = batch_idx * 8
end = start + 8
asp_call = aspirates[batch_idx]
dis_call = dispenses[batch_idx]
assert asp_call["resources"] == sources[start:end]
assert asp_call["flow_rates"] == asp_flow_rates[start:end]
assert asp_call["offsets"] == offsets[start:end]
assert asp_call["liquid_height"] == liquid_heights[start:end]
assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
assert dis_call["flow_rates"] == dis_flow_rates[start:end]
assert dis_call["offsets"] == offsets[start:end]
assert dis_call["liquid_height"] == liquid_heights[start:end]
assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
def test_one_to_one_eight_channel_handles_32_tasks_four_batches():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(1024))
sources = [DummyContainer(f"S{i}") for i in range(32)]
targets = [DummyContainer(f"T{i}") for i in range(32)]
asp_vols = [i + 1 for i in range(32)]
dis_vols = [300 + i for i in range(32)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0,
)
)
pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"]
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(pick_calls) == 4
assert len(aspirates) == len(dispenses) == 4
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[-1]["resources"] == sources[24:32]
assert dispenses[0]["resources"] == targets[0:8]
assert dispenses[-1]["resources"] == targets[24:32]
def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(3)]
dis_vols = [10, 20, 30] # sum=60
run(
lh.transfer_liquid(
sources=[source],
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60
dis_vols=dis_vols,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert len(aspirates) == 1
assert aspirates[0]["resources"] == [source]
assert aspirates[0]["vols"] == [60.0]
assert aspirates[0]["use_channels"] == [0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0]
def test_one_to_many_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(128))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(8)]
dis_vols = [i + 1 for i in range(8)]
run(
lh.transfer_liquid(
sources=[source],
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自)
dis_vols=dis_vols,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [source] * 8
assert aspirates[0]["vols"] == [float(v) for v in dis_vols]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[0]["resources"] == targets
assert dispenses[0]["vols"] == [float(v) for v in dis_vols]
def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=asp_vols,
dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol
mix_times=0,
)
)
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols]
assert all(d["resources"] == [target] for d in dispenses)
def test_many_to_one_single_channel_before_stage_mixes_target_once():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer("S0"), DummyContainer("S1")]
target = DummyContainer("T")
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=[5, 6],
dis_vols=1,
mix_stage="before",
mix_times=2,
mix_vol=4,
)
)
aspirate_calls = [(idx, payload) for idx, (name, payload) in enumerate(lh.calls) if name == "aspirate"]
assert len(aspirate_calls) >= 1
mix_idx, mix_payload = aspirate_calls[0]
assert mix_payload["resources"] == [target]
assert mix_payload["vols"] == [4]
# 第一個 mix 之後會真正開始吸 source
assert any(call["resources"] == [sources[0]] for _, call in aspirate_calls[1:])
def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
dis_vols = [1, 2, 3]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=asp_vols,
dis_vols=dis_vols, # 比例模式
mix_times=0,
)
)
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols]
def test_many_to_one_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(8)]
target = DummyContainer("T")
asp_vols = [10 + i for i in range(8)]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert aspirates[0]["resources"] == sources
assert aspirates[0]["vols"] == [float(v) for v in asp_vols]
assert dispenses[0]["resources"] == [target] * 8
assert dispenses[0]["vols"] == [float(v) for v in asp_vols]
def test_transfer_liquid_mode_detection_unsupported_shape_raises():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer("S0"), DummyContainer("S1")]
targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")]
with pytest.raises(ValueError, match="Unsupported transfer mode"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=[1, 1],
dis_vols=[1, 1, 1],
mix_times=0,
)
)
def test_mix_single_target_produces_matching_cycles():
lh = FakeLiquidHandler(channel_num=1)
target = DummyContainer("T_mix")
run(lh.mix(targets=[target], mix_time=2, mix_vol=5))
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == len(dispenses) == 2
assert all(call["resources"] == [target] for call in aspirates)
assert all(call["vols"] == [5] for call in aspirates)
assert all(call["resources"] == [target] for call in dispenses)
assert all(call["vols"] == [5] for call in dispenses)
def test_mix_multiple_targets_supports_per_target_offsets():
lh = FakeLiquidHandler(channel_num=1)
targets = [DummyContainer("T0"), DummyContainer("T1")]
offsets = ["left", "right"]
heights = [0.1, 0.2]
rates = [0.5, 1.0]
run(
lh.mix(
targets=targets,
mix_time=1,
mix_vol=3,
offsets=offsets,
height_to_bottom=heights,
mix_rate=rates,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert len(aspirates) == 2
assert aspirates[0]["resources"] == [targets[0]]
assert aspirates[0]["offsets"] == [offsets[0]]
assert aspirates[0]["liquid_height"] == [heights[0]]
assert aspirates[0]["flow_rates"] == [rates[0]]
assert aspirates[1]["resources"] == [targets[1]]
assert aspirates[1]["offsets"] == [offsets[1]]
assert aspirates[1]["liquid_height"] == [heights[1]]
assert aspirates[1]["flow_rates"] == [rates[1]]

View File

@@ -207,7 +207,14 @@ class LiquidHandlerMiddleware(LiquidHandler):
res_samples = []
res_volumes = []
for resource, volume, channel in zip(resources, vols, use_channels):
# 处理 use_channels 为 None 的情况(通常用于单通道操作)
if use_channels is None:
# 对于单通道操作,推断通道为 [0]
channels_to_use = [0] * len(resources)
else:
channels_to_use = use_channels
for resource, volume, channel in zip(resources, vols, channels_to_use):
res_samples.append({"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)})
res_volumes.append(volume)
self.pending_liquids_dict[channel] = {
@@ -920,6 +927,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
@@ -983,6 +991,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
@@ -1165,6 +1174,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.aspirate(
@@ -1199,6 +1209,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
@@ -1235,6 +1246,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.aspirate(
@@ -1271,6 +1283,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
@@ -1327,6 +1340,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[idx:idx + 1] if offsets and len(offsets) > idx else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
# 从源容器吸液(总体积)
@@ -1366,6 +1380,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[idx:idx+1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if touch_tip:
await self.touch_tip([target])
@@ -1401,6 +1416,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[i:i + 8] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
# 从源容器吸液8个通道都从同一个源但每个通道的吸液体积不同
@@ -1446,6 +1462,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if touch_tip:
@@ -1497,10 +1514,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
f"(matching `asp_vols`). Got length {len(dis_vols)}."
)
need_mix_after = mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0
defer_final_discard = need_mix_after or touch_tip
if len(use_channels) == 1:
# 单通道模式:多次吸液,一次分液
# 先混合前(如果需要)
# 如果需要 before mix先 pick up tip 并执行 mix
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.mix(
targets=[target],
mix_time=mix_times,
@@ -1508,8 +1534,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.discard_tips(use_channels=use_channels)
# 从每个源容器吸液并分液到目标容器
for idx, source in enumerate(sources):
tip = []
@@ -1527,10 +1556,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到目标容器
if use_proportional_mixing:
# 按不同比例混合:使用对应的 dis_vols
@@ -1546,7 +1575,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
dis_offset = offsets[0] if offsets and len(offsets) > 0 else None
dis_liquid_height = liquid_height[0] if liquid_height and len(liquid_height) > 0 else None
dis_blow_out = blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
await self.dispense(
resources=[target],
vols=[dis_vol],
@@ -1557,14 +1586,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
liquid_height=[dis_liquid_height] if dis_liquid_height is not None else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.discard_tips(use_channels=use_channels)
if not (defer_final_discard and idx == len(sources) - 1):
await self.discard_tips(use_channels=use_channels)
# 最后在目标容器中混合(如果需要)
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
if need_mix_after:
await self.mix(
targets=[target],
mix_time=mix_times,
@@ -1572,18 +1602,27 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if touch_tip:
await self.touch_tip([target])
if defer_final_discard:
await self.discard_tips(use_channels=use_channels)
elif len(use_channels) == 8:
# 8通道模式需要确保源数量是8的倍数
if len(sources) % 8 != 0:
raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.")
# 每次处理8个源
# 如果需要 before mix先 pick up tips 并执行 mix
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.mix(
targets=[target],
mix_time=mix_times,
@@ -1591,8 +1630,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.discard_tips([0,1,2,3,4,5,6,7])
for i in range(0, len(sources), 8):
tip = []
for _ in range(len(use_channels)):
@@ -1650,11 +1692,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.discard_tips([0,1,2,3,4,5,6,7])
if not (defer_final_discard and i + 8 >= len(sources)):
await self.discard_tips([0,1,2,3,4,5,6,7])
# 最后在目标容器中混合(如果需要)
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
if need_mix_after:
await self.mix(
targets=[target],
mix_time=mix_times,
@@ -1662,11 +1705,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if touch_tip:
await self.touch_tip([target])
if defer_final_discard:
await self.discard_tips([0,1,2,3,4,5,6,7])
# except Exception as e:
# traceback.print_exc()
# raise RuntimeError(f"Liquid addition failed: {e}") from e
@@ -1686,7 +1733,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
print(f"Waiting time: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}")
print(f"Time to finish: {time.strftime('%H:%M:%S', time.localtime(time.time() + seconds))}")
await self._ros_node.sleep(seconds)
# Use ROS node sleep if available, otherwise use asyncio.sleep
if hasattr(self, '_ros_node') and self._ros_node is not None:
await self._ros_node.sleep(seconds)
else:
import asyncio
await asyncio.sleep(seconds)
if msg:
print(f"Done: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}")
@@ -1725,27 +1777,59 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom: Optional[float] = None,
offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None,
use_channels: Optional[List[int]] = None,
none_keys: List[str] = [],
):
if mix_time is None: # No mixing required
if mix_time is None or mix_time <= 0: # No mixing required
return
"""Mix the liquid in the target wells."""
if mix_vol is None:
raise ValueError("`mix_vol` must be provided when `mix_time` is set.")
targets_list: List[Container] = list(targets)
if len(targets_list) == 0:
return
def _expand(value, count: int):
if value is None:
return [None] * count
if isinstance(value, (list, tuple)):
if len(value) != count:
raise ValueError("Length of per-target parameters must match targets.")
return list(value)
return [value] * count
offsets_list = _expand(offsets, len(targets_list))
heights_list = _expand(height_to_bottom, len(targets_list))
rates_list = _expand(mix_rate, len(targets_list))
for _ in range(mix_time):
await self.aspirate(
resources=[targets],
vols=[mix_vol],
flow_rates=[mix_rate] if mix_rate else None,
offsets=[offsets] if offsets else None,
liquid_height=[height_to_bottom] if height_to_bottom else None,
)
await self.custom_delay(seconds=1)
await self.dispense(
resources=[targets],
vols=[mix_vol],
flow_rates=[mix_rate] if mix_rate else None,
offsets=[offsets] if offsets else None,
liquid_height=[height_to_bottom] if height_to_bottom else None,
)
for idx, target in enumerate(targets_list):
offset_arg = (
[offsets_list[idx]] if offsets_list[idx] is not None else None
)
height_arg = (
[heights_list[idx]] if heights_list[idx] is not None else None
)
rate_arg = [rates_list[idx]] if rates_list[idx] is not None else None
await self.aspirate(
resources=[target],
vols=[mix_vol],
use_channels=use_channels,
flow_rates=rate_arg,
offsets=offset_arg,
liquid_height=height_arg,
)
await self.custom_delay(seconds=1)
await self.dispense(
resources=[target],
vols=[mix_vol],
use_channels=use_channels,
flow_rates=rate_arg,
offsets=offset_arg,
liquid_height=height_arg,
)
def iter_tips(self, tip_racks: Sequence[TipRack]) -> Iterator[Resource]:
"""Yield tips from a list of TipRacks one-by-one until depleted."""