Merge branch 'dev' into prcix9320

This commit is contained in:
zhangshixiang
2026-01-07 15:30:42 +08:00
parent ead43b2bc1
commit e9f1a7bb44
8 changed files with 608 additions and 379 deletions

View File

@@ -1,337 +1,505 @@
"""
PRCXI transfer_liquid 集成测试。
这些用例会启动 UniLiquidHandler RViz 仿真 backend需要同时满足
1. 安装 pylabrobot 依赖;
2. 设置环境变量 UNILAB_SIM_TEST=1
3. 具备 ROS 运行环境rviz_backend 会创建 ROS 节点)。
"""
import asyncio
import os
from dataclasses import dataclass
from typing import List, Sequence
from typing import Any, Iterable, List, Optional, Sequence, Tuple
import pytest
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Deck, PRCXI9300Trash
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import (
PRCXI_300ul_Tips,
PRCXI_BioER_96_wellplate,
)
pytestmark = pytest.mark.slow
try:
from pylabrobot.resources import Coordinate, Deck, Plate, TipRack, Well
except ImportError: # pragma: no cover - 测试环境缺少 pylabrobot 时直接跳过
Coordinate = Deck = Plate = TipRack = Well = None # type: ignore[assignment]
PYLABROBOT_AVAILABLE = False
else:
PYLABROBOT_AVAILABLE = True
SIM_ENV_VAR = "UNILAB_SIM_TEST"
@dataclass
class SimulationContext:
handler: LiquidHandlerAbstract
deck: Deck
tip_rack: TipRack
source_plate: Plate
target_plate: Plate
waste_plate: Plate
channel_num: int
@dataclass(frozen=True)
class DummyContainer:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyContainer({self.name})"
@dataclass(frozen=True)
class DummyTipSpot:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyTipSpot({self.name})"
def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]:
"""Yield lists so code can safely call `tip.extend(next(self.current_tip))`."""
for i in range(n):
yield [DummyTipSpot(f"tip_{i}")]
class FakeLiquidHandler(LiquidHandlerAbstract):
"""不初始化真实 backend/deck仅用来记录 transfer_liquid 内部调用序列。"""
def __init__(self, channel_num: int = 8):
# 不调用 super().__init__避免真实硬件/后端依赖
self.channel_num = channel_num
self.support_touch_tip = True
self.current_tip = iter(make_tip_iter())
self.calls: List[Tuple[str, Any]] = []
async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs):
self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels}))
async def aspirate(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"aspirate",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def dispense(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"dispense",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def discard_tips(self, use_channels=None, *args, **kwargs):
# 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数)
self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None}))
async def custom_delay(self, seconds=0, msg=None):
self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg}))
async def touch_tip(self, targets):
# 原实现会访问 targets.get_size_x() 等;测试里只记录调用
self.calls.append(("touch_tip", {"targets": targets}))
async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None):
self.calls.append(
(
"mix",
{
"targets": targets,
"mix_time": mix_time,
"mix_vol": mix_vol,
},
)
)
def run(coro):
return asyncio.run(coro)
def _ensure_unilabos_extra(well: Well) -> None:
if not hasattr(well, "unilabos_extra") or well.unilabos_extra is None:
well.unilabos_extra = {} # type: ignore[attr-defined]
def test_one_to_one_single_channel_basic_calls():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(3)]
targets = [DummyContainer(f"T{i}") for i in range(3)]
def _assign_sample_uuid(well: Well, value: str) -> None:
_ensure_unilabos_extra(well)
well.unilabos_extra["sample_uuid"] = value # type: ignore[attr-defined]
def _zero_coordinate() -> Coordinate:
if hasattr(Coordinate, "zero"):
return Coordinate.zero()
return Coordinate(0, 0, 0)
def _zero_offsets(count: int) -> List[Coordinate]:
return [_zero_coordinate() for _ in range(count)]
def _build_simulation_deck() -> tuple[PRCXI9300Deck, TipRack, Plate, Plate, Plate, PRCXI9300Trash]:
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=542, size_y=374, size_z=50)
tip_rack = PRCXI_300ul_Tips("Tips")
source_plate = PRCXI_BioER_96_wellplate("SourcePlate")
target_plate = PRCXI_BioER_96_wellplate("TargetPlate")
waste_plate = PRCXI_BioER_96_wellplate("WastePlate")
trash = PRCXI9300Trash(name="trash", size_x=100, size_y=100, size_z=50)
deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
deck.assign_child_resource(source_plate, location=Coordinate(150, 0, 0))
deck.assign_child_resource(target_plate, location=Coordinate(300, 0, 0))
deck.assign_child_resource(waste_plate, location=Coordinate(450, 0, 0))
deck.assign_child_resource(trash, location=Coordinate(150, -120, 0))
return deck, tip_rack, source_plate, target_plate, waste_plate, trash
def _stop_backend(handler: LiquidHandlerAbstract) -> None:
try:
run(handler.backend.stop())
except Exception: # pragma: no cover - 如果 backend 已经停止
pass
simulate_handler = getattr(handler, "_simulate_handler", None)
if simulate_handler is not None and getattr(simulate_handler, "backend", None) is not None:
try:
run(simulate_handler.backend.stop())
except Exception: # pragma: no cover
pass
@pytest.fixture(params=[1, 8])
def prcxi_simulation(request) -> SimulationContext:
if not PYLABROBOT_AVAILABLE:
pytest.skip("pylabrobot is required for PRCXI simulation tests.")
if os.environ.get(SIM_ENV_VAR) != "1":
pytest.skip(f"Set {SIM_ENV_VAR}=1 to run PRCXI simulation tests.")
channel_num = request.param
deck, tip_rack, source_plate, target_plate, waste_plate, _trash = _build_simulation_deck()
backend_cfg = {
"type": "unilabos.devices.liquid_handling.rviz_backend.UniLiquidHandlerRvizBackend",
"channel_num": channel_num,
"total_height": 310,
"lh_device_id": f"pytest_prcxi_{channel_num}",
}
handler = LiquidHandlerAbstract(
backend=backend_cfg,
deck=deck,
simulator=True,
channel_num=channel_num,
total_height=310,
)
run(handler.setup())
handler.set_tiprack([tip_rack])
handler.support_touch_tip = False
context = SimulationContext(
handler=handler,
deck=deck,
tip_rack=tip_rack,
source_plate=source_plate,
target_plate=target_plate,
waste_plate=waste_plate,
channel_num=channel_num,
)
yield context
_stop_backend(handler)
def _pick_wells(plate: Plate, start: int, count: int) -> List[Well]:
wells = plate.children[start : start + count]
for well in wells:
_ensure_unilabos_extra(well)
return wells
def _assert_samples_match(sources: Sequence[Well], targets: Sequence[Well]) -> None:
for src, tgt in zip(sources, targets):
src_uuid = getattr(src, "unilabos_extra", {}).get("sample_uuid")
tgt_uuid = getattr(tgt, "unilabos_extra", {}).get("sample_uuid")
assert tgt_uuid == src_uuid
def test_transfer_liquid_single_channel_one_to_one(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
sources = prcxi_simulation.source_plate[0:3]
targets = prcxi_simulation.target_plate["A4:A6"]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"single_{idx}")
offsets = _zero_offsets(max(len(sources), len(targets)))
result = run(
handler.transfer_liquid(
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[prcxi_simulation.tip_rack],
tip_racks=[],
use_channels=[0],
asp_vols=[5.0, 6.0, 7.0],
dis_vols=[10.0, 11.0, 12.0],
offsets=offsets,
mix_times=None,
asp_vols=[1, 2, 3],
dis_vols=[4, 5, 6],
mix_times=None, # 应该仍能执行(不 mix
)
)
# assert result == """"""
assert [c[0] for c in lh.calls].count("pick_up_tips") == 3
assert [c[0] for c in lh.calls].count("aspirate") == 3
assert [c[0] for c in lh.calls].count("dispense") == 3
assert [c[0] for c in lh.calls].count("discard_tips") == 3
_assert_samples_match(sources, targets)
# 每次 aspirate/dispense 都是单孔列表
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [sources[0]]
assert aspirates[0]["vols"] == [1.0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[2]["resources"] == [targets[2]]
assert dispenses[2]["vols"] == [6.0]
def test_transfer_liquid_single_channel_one_to_many(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(16))
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
source = prcxi_simulation.source_plate.children[0]
targets = prcxi_simulation.target_plate["A1:E1"]
_assign_sample_uuid(source, "one_to_many_source")
offsets = _zero_offsets(max(len(targets), 1))
source = DummyContainer("S0")
target = DummyContainer("T0")
run(
handler.transfer_liquid(
lh.transfer_liquid(
sources=[source],
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=[5],
dis_vols=[5],
mix_stage="before",
mix_times=1,
mix_vol=3,
)
)
names = [name for name, _ in lh.calls]
assert names.count("mix") == 1
assert names.index("mix") < names.index("aspirate")
def test_one_to_one_eight_channel_groups_by_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = list(range(1, 17))
dis_vols = list(range(101, 117))
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0, # 触发逻辑但不 mix
)
)
# 16 个任务 -> 2 组,每组 8 通道一起做
assert [c[0] for c in lh.calls].count("pick_up_tips") == 2
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == 2
assert len(dispenses) == 2
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]]
assert dispenses[1]["resources"] == targets[8:16]
assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]]
def test_one_to_one_eight_channel_requires_multiple_of_8_targets():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(9)]
targets = [DummyContainer(f"T{i}") for i in range(9)]
with pytest.raises(ValueError, match="multiple of 8"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=[1] * 9,
dis_vols=[1] * 9,
mix_times=0,
)
)
def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(512))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = [i + 1 for i in range(16)]
dis_vols = [200 + i for i in range(16)]
asp_flow_rates = [0.1 * (i + 1) for i in range(16)]
dis_flow_rates = [0.2 * (i + 1) for i in range(16)]
offsets = [f"offset_{i}" for i in range(16)]
liquid_heights = [i * 0.5 for i in range(16)]
blow_out_air_volume = [i + 0.05 for i in range(16)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
liquid_height=liquid_heights,
blow_out_air_volume=blow_out_air_volume,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == len(dispenses) == 2
for batch_idx in range(2):
start = batch_idx * 8
end = start + 8
asp_call = aspirates[batch_idx]
dis_call = dispenses[batch_idx]
assert asp_call["resources"] == sources[start:end]
assert asp_call["flow_rates"] == asp_flow_rates[start:end]
assert asp_call["offsets"] == offsets[start:end]
assert asp_call["liquid_height"] == liquid_heights[start:end]
assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
assert dis_call["flow_rates"] == dis_flow_rates[start:end]
assert dis_call["offsets"] == offsets[start:end]
assert dis_call["liquid_height"] == liquid_heights[start:end]
assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
def test_one_to_one_eight_channel_handles_32_tasks_four_batches():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(1024))
sources = [DummyContainer(f"S{i}") for i in range(32)]
targets = [DummyContainer(f"T{i}") for i in range(32)]
asp_vols = [i + 1 for i in range(32)]
dis_vols = [300 + i for i in range(32)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0,
)
)
pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"]
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(pick_calls) == 4
assert len(aspirates) == len(dispenses) == 4
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[-1]["resources"] == sources[24:32]
assert dispenses[0]["resources"] == targets[0:8]
assert dispenses[-1]["resources"] == targets[24:32]
def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(3)]
dis_vols = [10, 20, 30] # sum=60
run(
lh.transfer_liquid(
sources=[source],
targets=targets,
tip_racks=[prcxi_simulation.tip_rack],
tip_racks=[],
use_channels=[0],
asp_vols=10.0,
dis_vols=[2.0, 2.0, 2.0, 2.0, 2.0],
offsets=offsets,
asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60
dis_vols=dis_vols,
mix_times=0,
)
)
for target in targets:
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "one_to_many_source"
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert len(aspirates) == 1
assert aspirates[0]["resources"] == [source]
assert aspirates[0]["vols"] == [60.0]
assert aspirates[0]["use_channels"] == [0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0]
def test_transfer_liquid_single_channel_many_to_one(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
def test_one_to_many_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(128))
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
sources = prcxi_simulation.source_plate[0:3]
target = prcxi_simulation.target_plate.children[4]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"many_to_one_{idx}")
offsets = _zero_offsets(max(len(sources), len([target])))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(8)]
dis_vols = [i + 1 for i in range(8)]
run(
handler.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[prcxi_simulation.tip_rack],
use_channels=[0],
asp_vols=[8.0, 9.0, 10.0],
dis_vols=1,
offsets=offsets,
mix_stage="after",
mix_times=1,
mix_vol=5,
)
)
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "many_to_one_2"
def test_transfer_liquid_eight_channel_batches(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 8:
pytest.skip("仅在八通道配置下运行")
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
sources = prcxi_simulation.source_plate[0:8]
targets = prcxi_simulation.target_plate[16:24]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"batch_{idx}")
offsets = _zero_offsets(len(targets))
use_channels = list(range(8))
asp_vols = [float(i + 1) * 2 for i in range(8)]
dis_vols = [float(i + 10) for i in range(8)]
run(
handler.transfer_liquid(
sources=sources,
lh.transfer_liquid(
sources=[source],
targets=targets,
tip_racks=[prcxi_simulation.tip_rack],
use_channels=use_channels,
asp_vols=asp_vols,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自)
dis_vols=dis_vols,
offsets=offsets,
mix_times=0,
)
)
_assert_samples_match(sources, targets)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [source] * 8
assert aspirates[0]["vols"] == [float(v) for v in dis_vols]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[0]["resources"] == targets
assert dispenses[0]["vols"] == [float(v) for v in dis_vols]
@pytest.mark.parametrize("mix_stage", ["before", "after", "both"])
def test_transfer_liquid_mix_stages(prcxi_simulation: SimulationContext, mix_stage: str):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
target = prcxi_simulation.target_plate[70]
sources = prcxi_simulation.source_plate[80:82]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"mix_stage_{mix_stage}_{idx}")
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
run(
handler.transfer_liquid(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[prcxi_simulation.tip_rack],
tip_racks=[],
use_channels=[0],
asp_vols=[4.0, 5.0],
dis_vols=1,
offsets=_zero_offsets(len(sources)),
mix_stage=mix_stage,
mix_times=2,
mix_vol=3,
asp_vols=asp_vols,
dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol
mix_times=0,
)
)
# mix_stage 前后都应该保留最新源的 sample_uuid
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == f"mix_stage_{mix_stage}_1"
if prcxi_simulation.channel_num != 8:
pytest.skip("仅在八通道配置下运行")
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols]
assert all(d["resources"] == [target] for d in dispenses)
handler = prcxi_simulation.handler
sources = prcxi_simulation.source_plate[0:8]
targets = prcxi_simulation.target_plate[16:24]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"batch_{idx}")
offsets = _zero_offsets(len(targets))
use_channels = list(range(8))
asp_vols = [float(i + 1) * 2 for i in range(8)]
dis_vols = [float(i + 10) for i in range(8)]
def test_many_to_one_single_channel_before_stage_mixes_target_once():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer("S0"), DummyContainer("S1")]
target = DummyContainer("T")
run(
handler.transfer_liquid(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[prcxi_simulation.tip_rack],
use_channels=use_channels,
asp_vols=asp_vols,
dis_vols=dis_vols,
offsets=offsets,
mix_stage="after",
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=[5, 6],
dis_vols=1,
mix_stage="before",
mix_times=2,
mix_vol=3,
mix_vol=4,
)
)
_assert_samples_match(sources, targets)
names = [name for name, _ in lh.calls]
assert names[0] == "mix"
assert names.count("mix") == 1
def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
dis_vols = [1, 2, 3]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=asp_vols,
dis_vols=dis_vols, # 比例模式
mix_times=0,
)
)
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols]
def test_many_to_one_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(8)]
target = DummyContainer("T")
asp_vols = [10 + i for i in range(8)]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert aspirates[0]["resources"] == sources
assert aspirates[0]["vols"] == [float(v) for v in asp_vols]
assert dispenses[0]["resources"] == [target] * 8
assert dispenses[0]["vols"] == [float(v) for v in asp_vols]
def test_transfer_liquid_mode_detection_unsupported_shape_raises():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer("S0"), DummyContainer("S1")]
targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")]
with pytest.raises(ValueError, match="Unsupported transfer mode"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=[1, 1],
dis_vols=[1, 1, 1],
mix_times=0,
)
)

View File

@@ -418,7 +418,7 @@ def main():
# 如果从远端获取了物料信息,则与本地物料进行同步
if request_startup_json and "nodes" in request_startup_json:
print_status("开始同步远端物料到本地...", "info")
remote_tree_set = ResourceTreeSet.from_raw_list(request_startup_json["nodes"])
remote_tree_set = ResourceTreeSet.from_raw_dict_list(request_startup_json["nodes"])
resource_tree_set.merge_remote_resources(remote_tree_set)
print_status("远端物料同步完成", "info")

View File

@@ -579,6 +579,8 @@ class MessageProcessor:
elif message_type == "session_id":
self.session_id = message_data.get("session_id")
logger.info(f"[MessageProcessor] Session ID: {self.session_id}")
elif message_type == "request_reload":
await self._handle_request_reload(message_data)
else:
logger.debug(f"[MessageProcessor] Unknown message type: {message_type}")
@@ -888,6 +890,20 @@ class MessageProcessor:
)
thread.start()
async def _handle_request_reload(self, data: Dict[str, Any]):
"""
处理重载请求
当LabGo发送request_reload时重新发送设备注册信息
"""
reason = data.get("reason", "unknown")
logger.info(f"[MessageProcessor] Received reload request, reason: {reason}")
# 重新发送host_node_ready信息
if self.websocket_client:
self.websocket_client.publish_host_ready()
logger.info("[MessageProcessor] Re-sent host_node_ready after reload request")
async def _send_action_state_response(
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int
):
@@ -1282,7 +1298,7 @@ class WebSocketClient(BaseCommunicationClient):
self.message_processor.send_message(message)
job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
logger.debug(f"[WebSocketClient] Job status published: {job_log} - {status}")
logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}")
def send_ping(self, ping_id: str, timestamp: float) -> None:
"""发送ping消息"""
@@ -1313,17 +1329,55 @@ class WebSocketClient(BaseCommunicationClient):
logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}")
def publish_host_ready(self) -> None:
"""发布host_node ready信号"""
"""发布host_node ready信号,包含设备和动作信息"""
if self.is_disabled or not self.is_connected():
logger.debug("[WebSocketClient] Not connected, cannot publish host ready signal")
return
# 收集设备信息
devices = []
machine_name = BasicConfig.machine_name
try:
host_node = HostNode.get_instance(0)
if host_node:
# 获取设备信息
for device_id, namespace in host_node.devices_names.items():
device_key = f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}"
is_online = device_key in host_node._online_devices
# 获取设备的动作信息
actions = {}
for action_id, client in host_node._action_clients.items():
# action_id 格式: /namespace/device_id/action_name
if device_id in action_id:
action_name = action_id.split("/")[-1]
actions[action_name] = {
"action_path": action_id,
"action_type": str(type(client).__name__),
}
devices.append({
"device_id": device_id,
"namespace": namespace,
"device_key": device_key,
"is_online": is_online,
"machine_name": host_node.device_machine_names.get(device_id, machine_name),
"actions": actions,
})
logger.info(f"[WebSocketClient] Collected {len(devices)} devices for host_ready")
except Exception as e:
logger.warning(f"[WebSocketClient] Error collecting device info: {e}")
message = {
"action": "host_node_ready",
"data": {
"status": "ready",
"timestamp": time.time(),
"machine_name": machine_name,
"devices": devices,
},
}
self.message_processor.send_message(message)
logger.info("[WebSocketClient] Host node ready signal published")
logger.info(f"[WebSocketClient] Host node ready signal published with {len(devices)} devices")

View File

@@ -134,7 +134,7 @@ def canonicalize_nodes_data(
parent_instance.children.append(current_instance)
# 第五步:创建 ResourceTreeSet
resource_tree_set = ResourceTreeSet.from_nested_list(standardized_instances)
resource_tree_set = ResourceTreeSet.from_nested_instance_list(standardized_instances)
return resource_tree_set

View File

@@ -5,7 +5,8 @@ import json
import threading
import time
import traceback
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \
Tuple
from concurrent.futures import ThreadPoolExecutor
import asyncio
@@ -362,78 +363,82 @@ class BaseROS2DeviceNode(Node, Generic[T]):
return res
async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response):
from pylabrobot.resources.resource import Resource as ResourcePLR
from pylabrobot.resources.deck import Deck
from pylabrobot.resources import Coordinate
from pylabrobot.resources import Plate
# 物料传输到对应的node节点
rclient = self.create_client(ResourceAdd, "/resources/add")
rclient.wait_for_service()
rclient2 = self.create_client(ResourceAdd, "/resources/add")
rclient2.wait_for_service()
request = ResourceAdd.Request()
request2 = ResourceAdd.Request()
client = self._resource_clients["c2s_update_resource_tree"]
request = SerialCommand.Request()
request2 = SerialCommand.Request()
command_json = json.loads(req.command)
namespace = command_json["namespace"]
bind_parent_id = command_json["bind_parent_id"]
edge_device_id = command_json["edge_device_id"]
location = command_json["bind_location"]
other_calling_param = command_json["other_calling_param"]
resources = command_json["resource"]
input_resources = command_json["resource"]
initialize_full = other_calling_param.pop("initialize_full", False)
# 用来增加液体
ADD_LIQUID_TYPE = other_calling_param.pop("ADD_LIQUID_TYPE", [])
LIQUID_VOLUME = other_calling_param.pop("LIQUID_VOLUME", [])
LIQUID_INPUT_SLOT = other_calling_param.pop("LIQUID_INPUT_SLOT", [])
LIQUID_VOLUME: List[float] = other_calling_param.pop("LIQUID_VOLUME", [])
LIQUID_INPUT_SLOT: List[int] = other_calling_param.pop("LIQUID_INPUT_SLOT", [])
slot = other_calling_param.pop("slot", "-1")
resource = None
if slot != "-1": # slot为负数的时候采用assign方法
if slot != -1: # slot为负数的时候采用assign方法
other_calling_param["slot"] = slot
# 本地拿到这个物料,可能需要先做初始化?
if isinstance(resources, list):
if (
len(resources) == 1 and isinstance(resources[0], list) and not initialize_full
): # 取消,不存在的情况
# 预先initialize过以整组的形式传入
request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]]
elif initialize_full:
resources = initialize_resources(resources)
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources]
else:
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources]
else:
if initialize_full:
resources = initialize_resources([resources])
request.resources = [convert_to_ros_msg(Resource, resources)]
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1:
container_instance = request.resources[0]
container_query_dict: dict = resources
# 本地拿到这个物料,可能需要先做初始化
if isinstance(input_resources, list) and initialize_full:
input_resources = initialize_resources(input_resources)
elif initialize_full:
input_resources = initialize_resources([input_resources])
rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources)
parent_resource = None
if bind_parent_id != self.node_name:
parent_resource = self.resource_tracker.figure_resource(
{"name": bind_parent_id}
)
for r in rts.root_nodes:
# noinspection PyUnresolvedReferences
r.res_content.parent_uuid = parent_resource.unilabos_uuid
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer):
# noinspection PyTypeChecker
container_instance: RegularContainer = rts.root_nodes[0]
found_resources = self.resource_tracker.figure_resource(
{"id": container_query_dict["name"]}, try_mode=True
{"id": container_instance.name}, try_mode=True
)
if not len(found_resources):
self.resource_tracker.add_resource(container_instance)
logger.info(f"添加物料{container_query_dict['name']}到资源跟踪器")
logger.info(f"添加物料{container_instance.name}到资源跟踪器")
else:
assert (
len(found_resources) == 1
), f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统"
resource = found_resources[0]
if isinstance(resource, Resource):
regular_container = RegularContainer(resource.id)
regular_container.ulr_resource = resource
regular_container.ulr_resource_data.update(json.loads(container_instance.data))
logger.info(f"更新物料{container_query_dict['name']}的数据{resource.data} ULR")
elif isinstance(resource, dict):
if "data" not in resource:
resource["data"] = {}
resource["data"].update(json.loads(container_instance.data))
request.resources[0].name = resource["name"]
logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict")
), f"找到多个同名物料: {container_instance.name}, 请检查物料系统"
found_resource = found_resources[0]
if isinstance(found_resource, RegularContainer):
logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}")
found_resource.state.update(json.loads(container_instance.state))
elif isinstance(found_resource, dict):
raise ValueError("已不支持 字典 版本的RegularContainer")
else:
logger.info(
f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}"
f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
)
response: ResourceAdd.Response = await rclient.call_async(request)
# 应该先add_resource了
# noinspection PyUnresolvedReferences
request.command = json.dumps({
"action": "add",
"data": {
"data": rts.dump(),
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "",
"first_add": True,
},
})
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
self.lab_logger().info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
final_response = {
"created_resources": [ROS2MessageInstance(i).get_python_dict() for i in request.resources],
"created_resources": rts.dump(),
"liquid_input_resources": [],
}
res.response = json.dumps(final_response)
@@ -458,59 +463,60 @@ class BaseROS2DeviceNode(Node, Generic[T]):
)
res.response = get_result_info_str(traceback.format_exc(), False, {})
return res
# 接下来该根据bind_parent_id进行assign了目前只有plr可以进行assign不然没有办法输入到物料系统中
if bind_parent_id != self.node_name:
resource = self.resource_tracker.figure_resource(
{"name": bind_parent_id}
) # 拿到父节点进行具体assign等操作
# request.resources = [convert_to_ros_msg(Resource, resources)]
try:
from pylabrobot.resources.resource import Resource as ResourcePLR
from pylabrobot.resources.deck import Deck
from pylabrobot.resources import Coordinate
from pylabrobot.resources import OTDeck
from pylabrobot.resources import Plate
contain_model = not isinstance(resource, Deck)
if isinstance(resource, ResourcePLR):
# resources.list()
plr_instance = ResourceTreeSet.from_raw_list(resources).to_plr_resources()[0]
# resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources}))
# plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model)
if len(rts.root_nodes) == 1 and parent_resource is not None:
plr_instance = rts.to_plr_resources()[0]
if isinstance(plr_instance, Plate):
empty_liquid_info_in = [(None, 0)] * plr_instance.num_items
empty_liquid_info_in: List[Tuple[Optional[str], float]] = [(None, 0)] * plr_instance.num_items
if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1:
ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT)
LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT)
self.lab_logger().warning(f"增加液体资源时数量为1自动补全为 {len(LIQUID_INPUT_SLOT)}")
for liquid_type, liquid_volume, liquid_input_slot in zip(
ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT
):
empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume)
plr_instance.set_well_liquids(empty_liquid_info_in)
input_wells_ulr = [
convert_to_ros_msg(
Resource,
resource_plr_to_ulab(plr_instance.get_well(LIQUID_INPUT_SLOT), with_children=False),
)
for r in LIQUID_INPUT_SLOT
]
final_response["liquid_input_resources"] = [
ROS2MessageInstance(i).get_python_dict() for i in input_wells_ulr
]
try:
# noinspection PyProtectedMember
keys = list(plr_instance._ordering.keys())
for ind, r in enumerate(LIQUID_INPUT_SLOT[:]):
if isinstance(r, int):
# noinspection PyTypeChecker
LIQUID_INPUT_SLOT[ind] = keys[r]
input_wells = [plr_instance.get_well(r) for r in LIQUID_INPUT_SLOT]
except AttributeError:
# 按照id回去失败回退到children
input_wells = []
for r in LIQUID_INPUT_SLOT:
input_wells.append(plr_instance.children[r])
final_response["liquid_input_resources"] = ResourceTreeSet.from_plr_resources(input_wells).dump()
res.response = json.dumps(final_response)
if isinstance(resource, OTDeck) and "slot" in other_calling_param:
if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param:
other_calling_param["slot"] = int(other_calling_param["slot"])
resource.assign_child_at_slot(plr_instance, **other_calling_param)
parent_resource.assign_child_at_slot(plr_instance, **other_calling_param)
else:
_discard_slot = other_calling_param.pop("slot", "-1")
resource.assign_child_resource(
_discard_slot = other_calling_param.pop("slot", -1)
parent_resource.assign_child_resource(
plr_instance,
Coordinate(location["x"], location["y"], location["z"]),
**other_calling_param,
)
request2.resources = [
convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)])
]
rclient2.call(request2)
# 调整了液体以及Deck之后要重新Assign
# noinspection PyUnresolvedReferences
request.command = json.dumps({
"action": "add",
"data": {
"data": ResourceTreeSet.from_plr_resources([parent_resource]).dump(),
"mount_uuid": parent_resource.parent.unilabos_uuid if parent_resource.parent is not None else self.uuid,
"first_add": False,
},
})
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
# 这里created_resources不包含parent_resource
# 发送给ResourceMeshManager
action_client = ActionClient(
self,
@@ -521,7 +527,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
goal = SendCmd.Goal()
goal.command = json.dumps(
{
"resources": resources,
"resources": input_resources,
"bind_parent_id": bind_parent_id,
}
)
@@ -614,7 +620,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
)
) # type: ignore
raw_nodes = json.loads(response.response)
tree_set = ResourceTreeSet.from_raw_list(raw_nodes)
tree_set = ResourceTreeSet.from_raw_dict_list(raw_nodes)
self.lab_logger().debug(f"获取资源结果: {len(tree_set.trees)} 个资源树")
return tree_set
@@ -642,7 +648,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raw_data = json.loads(response.response)
# 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(raw_data)
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
plr_resource = tree_set.to_plr_resources()[0]
self.lab_logger().debug(f"获取资源 {resource_id} 成功")
return plr_resource
@@ -1523,7 +1529,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raw_data = json.loads(response.response)
# 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(raw_data)
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
plr_resource = tree_set.to_plr_resources()[0]
# 通过资源跟踪器获取本地实例

View File

@@ -45,6 +45,7 @@ from unilabos.ros.nodes.resource_tracker import (
)
from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
@@ -180,7 +181,7 @@ class HostNode(BaseROS2DeviceNode):
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
except Exception as ex:
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
# 初始化Node基类传递空参数覆盖列表
@@ -455,10 +456,10 @@ class HostNode(BaseROS2DeviceNode):
async def create_resource(
self,
device_id: str,
device_id: DeviceSlot,
res_id: str,
class_name: str,
parent: str,
parent: ResourceSlot,
bind_locations: Point,
liquid_input_slot: list[int] = [],
liquid_type: list[str] = [],
@@ -805,7 +806,7 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}")
if goal_status != GoalStatus.STATUS_CANCELED:
self.lab_logger().debug(f"[Host Node] Result data: {result_data}")
self.lab_logger().trace(f"[Host Node] Result data: {result_data}")
# 清理 _goals 中的记录
if job_id in self._goals:

View File

@@ -244,7 +244,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
r
) # type: ignore
raw_data = json.loads(response.response)
tree_set = ResourceTreeSet.from_raw_list(raw_data)
tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
target = tree_set.dump()
protocol_kwargs[k] = target[0][0] if v == "unilabos_msgs/Resource" else target
except Exception as ex:

View File

@@ -523,7 +523,7 @@ class ResourceTreeSet(object):
return plr_resources
@classmethod
def from_raw_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet":
def from_raw_dict_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet":
"""
从原始字典列表创建 ResourceTreeSet自动建立 parent-children 关系
@@ -573,10 +573,10 @@ class ResourceTreeSet(object):
parent_instance.children.append(instance)
# 第四步:使用 from_nested_list 创建 ResourceTreeSet
return cls.from_nested_list(instances)
return cls.from_nested_instance_list(instances)
@classmethod
def from_nested_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet":
def from_nested_instance_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet":
"""
从扁平化的资源列表创建ResourceTreeSet自动按根节点分组
@@ -785,7 +785,7 @@ class ResourceTreeSet(object):
"""
nested_lists = []
for tree_data in data:
nested_lists.extend(ResourceTreeSet.from_raw_list(tree_data).trees)
nested_lists.extend(ResourceTreeSet.from_raw_dict_list(tree_data).trees)
return cls(nested_lists)
@@ -965,7 +965,7 @@ class DeviceNodeResourceTracker(object):
if current_uuid in self.uuid_to_resources:
self.uuid_to_resources.pop(current_uuid)
self.uuid_to_resources[new_uuid] = res
logger.debug(f"更新uuid: {current_uuid} -> {new_uuid}")
logger.trace(f"更新uuid: {current_uuid} -> {new_uuid}")
replaced = 1
return replaced