Compare commits

...

2 Commits

Author SHA1 Message Date
zhangshixiang
219a480c08 merge prcxi.py 2026-01-07 15:32:27 +08:00
zhangshixiang
e9f1a7bb44 Merge branch 'dev' into prcix9320 2026-01-07 15:30:42 +08:00
9 changed files with 618 additions and 442 deletions

View File

@@ -1,337 +1,505 @@
"""
PRCXI transfer_liquid 集成测试。
这些用例会启动 UniLiquidHandler RViz 仿真 backend需要同时满足
1. 安装 pylabrobot 依赖;
2. 设置环境变量 UNILAB_SIM_TEST=1
3. 具备 ROS 运行环境rviz_backend 会创建 ROS 节点)。
"""
import asyncio import asyncio
import os
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Sequence from typing import Any, Iterable, List, Optional, Sequence, Tuple
import pytest import pytest
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Deck, PRCXI9300Trash
from unilabos.devices.liquid_handling.prcxi.prcxi_labware import (
PRCXI_300ul_Tips,
PRCXI_BioER_96_wellplate,
)
pytestmark = pytest.mark.slow
try:
from pylabrobot.resources import Coordinate, Deck, Plate, TipRack, Well
except ImportError: # pragma: no cover - 测试环境缺少 pylabrobot 时直接跳过
Coordinate = Deck = Plate = TipRack = Well = None # type: ignore[assignment]
PYLABROBOT_AVAILABLE = False
else:
PYLABROBOT_AVAILABLE = True
SIM_ENV_VAR = "UNILAB_SIM_TEST"
@dataclass @dataclass(frozen=True)
class SimulationContext: class DummyContainer:
handler: LiquidHandlerAbstract name: str
deck: Deck
tip_rack: TipRack def __repr__(self) -> str: # pragma: no cover
source_plate: Plate return f"DummyContainer({self.name})"
target_plate: Plate
waste_plate: Plate
channel_num: int @dataclass(frozen=True)
class DummyTipSpot:
name: str
def __repr__(self) -> str: # pragma: no cover
return f"DummyTipSpot({self.name})"
def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]:
"""Yield lists so code can safely call `tip.extend(next(self.current_tip))`."""
for i in range(n):
yield [DummyTipSpot(f"tip_{i}")]
class FakeLiquidHandler(LiquidHandlerAbstract):
"""不初始化真实 backend/deck仅用来记录 transfer_liquid 内部调用序列。"""
def __init__(self, channel_num: int = 8):
# 不调用 super().__init__避免真实硬件/后端依赖
self.channel_num = channel_num
self.support_touch_tip = True
self.current_tip = iter(make_tip_iter())
self.calls: List[Tuple[str, Any]] = []
async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs):
self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels}))
async def aspirate(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"aspirate",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def dispense(
self,
resources: Sequence[Any],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
offsets: Any = None,
liquid_height: Any = None,
blow_out_air_volume: Any = None,
spread: str = "wide",
**backend_kwargs,
):
self.calls.append(
(
"dispense",
{
"resources": list(resources),
"vols": list(vols),
"use_channels": list(use_channels) if use_channels is not None else None,
"flow_rates": list(flow_rates) if flow_rates is not None else None,
"offsets": list(offsets) if offsets is not None else None,
"liquid_height": list(liquid_height) if liquid_height is not None else None,
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
},
)
)
async def discard_tips(self, use_channels=None, *args, **kwargs):
# 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数)
self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None}))
async def custom_delay(self, seconds=0, msg=None):
self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg}))
async def touch_tip(self, targets):
# 原实现会访问 targets.get_size_x() 等;测试里只记录调用
self.calls.append(("touch_tip", {"targets": targets}))
async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None):
self.calls.append(
(
"mix",
{
"targets": targets,
"mix_time": mix_time,
"mix_vol": mix_vol,
},
)
)
def run(coro): def run(coro):
return asyncio.run(coro) return asyncio.run(coro)
def _ensure_unilabos_extra(well: Well) -> None: def test_one_to_one_single_channel_basic_calls():
if not hasattr(well, "unilabos_extra") or well.unilabos_extra is None: lh = FakeLiquidHandler(channel_num=1)
well.unilabos_extra = {} # type: ignore[attr-defined] lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(3)]
targets = [DummyContainer(f"T{i}") for i in range(3)]
def _assign_sample_uuid(well: Well, value: str) -> None: run(
_ensure_unilabos_extra(well) lh.transfer_liquid(
well.unilabos_extra["sample_uuid"] = value # type: ignore[attr-defined]
def _zero_coordinate() -> Coordinate:
if hasattr(Coordinate, "zero"):
return Coordinate.zero()
return Coordinate(0, 0, 0)
def _zero_offsets(count: int) -> List[Coordinate]:
return [_zero_coordinate() for _ in range(count)]
def _build_simulation_deck() -> tuple[PRCXI9300Deck, TipRack, Plate, Plate, Plate, PRCXI9300Trash]:
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=542, size_y=374, size_z=50)
tip_rack = PRCXI_300ul_Tips("Tips")
source_plate = PRCXI_BioER_96_wellplate("SourcePlate")
target_plate = PRCXI_BioER_96_wellplate("TargetPlate")
waste_plate = PRCXI_BioER_96_wellplate("WastePlate")
trash = PRCXI9300Trash(name="trash", size_x=100, size_y=100, size_z=50)
deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0))
deck.assign_child_resource(source_plate, location=Coordinate(150, 0, 0))
deck.assign_child_resource(target_plate, location=Coordinate(300, 0, 0))
deck.assign_child_resource(waste_plate, location=Coordinate(450, 0, 0))
deck.assign_child_resource(trash, location=Coordinate(150, -120, 0))
return deck, tip_rack, source_plate, target_plate, waste_plate, trash
def _stop_backend(handler: LiquidHandlerAbstract) -> None:
try:
run(handler.backend.stop())
except Exception: # pragma: no cover - 如果 backend 已经停止
pass
simulate_handler = getattr(handler, "_simulate_handler", None)
if simulate_handler is not None and getattr(simulate_handler, "backend", None) is not None:
try:
run(simulate_handler.backend.stop())
except Exception: # pragma: no cover
pass
@pytest.fixture(params=[1, 8])
def prcxi_simulation(request) -> SimulationContext:
if not PYLABROBOT_AVAILABLE:
pytest.skip("pylabrobot is required for PRCXI simulation tests.")
if os.environ.get(SIM_ENV_VAR) != "1":
pytest.skip(f"Set {SIM_ENV_VAR}=1 to run PRCXI simulation tests.")
channel_num = request.param
deck, tip_rack, source_plate, target_plate, waste_plate, _trash = _build_simulation_deck()
backend_cfg = {
"type": "unilabos.devices.liquid_handling.rviz_backend.UniLiquidHandlerRvizBackend",
"channel_num": channel_num,
"total_height": 310,
"lh_device_id": f"pytest_prcxi_{channel_num}",
}
handler = LiquidHandlerAbstract(
backend=backend_cfg,
deck=deck,
simulator=True,
channel_num=channel_num,
total_height=310,
)
run(handler.setup())
handler.set_tiprack([tip_rack])
handler.support_touch_tip = False
context = SimulationContext(
handler=handler,
deck=deck,
tip_rack=tip_rack,
source_plate=source_plate,
target_plate=target_plate,
waste_plate=waste_plate,
channel_num=channel_num,
)
yield context
_stop_backend(handler)
def _pick_wells(plate: Plate, start: int, count: int) -> List[Well]:
wells = plate.children[start : start + count]
for well in wells:
_ensure_unilabos_extra(well)
return wells
def _assert_samples_match(sources: Sequence[Well], targets: Sequence[Well]) -> None:
for src, tgt in zip(sources, targets):
src_uuid = getattr(src, "unilabos_extra", {}).get("sample_uuid")
tgt_uuid = getattr(tgt, "unilabos_extra", {}).get("sample_uuid")
assert tgt_uuid == src_uuid
def test_transfer_liquid_single_channel_one_to_one(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 1:
pytest.skip("仅在单通道配置下运行")
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
sources = prcxi_simulation.source_plate[0:3]
targets = prcxi_simulation.target_plate["A4:A6"]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"single_{idx}")
offsets = _zero_offsets(max(len(sources), len(targets)))
result = run(
handler.transfer_liquid(
sources=sources, sources=sources,
targets=targets, targets=targets,
tip_racks=[prcxi_simulation.tip_rack], tip_racks=[],
use_channels=[0], use_channels=[0],
asp_vols=[5.0, 6.0, 7.0], asp_vols=[1, 2, 3],
dis_vols=[10.0, 11.0, 12.0], dis_vols=[4, 5, 6],
offsets=offsets, mix_times=None, # 应该仍能执行(不 mix
mix_times=None,
) )
) )
# assert result == """""" assert [c[0] for c in lh.calls].count("pick_up_tips") == 3
assert [c[0] for c in lh.calls].count("aspirate") == 3
assert [c[0] for c in lh.calls].count("dispense") == 3
assert [c[0] for c in lh.calls].count("discard_tips") == 3
_assert_samples_match(sources, targets) # 每次 aspirate/dispense 都是单孔列表
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [sources[0]]
assert aspirates[0]["vols"] == [1.0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[2]["resources"] == [targets[2]]
assert dispenses[2]["vols"] == [6.0]
def test_transfer_liquid_single_channel_one_to_many(prcxi_simulation: SimulationContext): def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate():
if prcxi_simulation.channel_num != 1: lh = FakeLiquidHandler(channel_num=1)
pytest.skip("仅在单通道配置下运行") lh.current_tip = iter(make_tip_iter(16))
handler = prcxi_simulation.handler source = DummyContainer("S0")
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: target = DummyContainer("T0")
_ensure_unilabos_extra(well)
source = prcxi_simulation.source_plate.children[0]
targets = prcxi_simulation.target_plate["A1:E1"]
_assign_sample_uuid(source, "one_to_many_source")
offsets = _zero_offsets(max(len(targets), 1))
run( run(
handler.transfer_liquid( lh.transfer_liquid(
sources=[source],
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=[5],
dis_vols=[5],
mix_stage="before",
mix_times=1,
mix_vol=3,
)
)
names = [name for name, _ in lh.calls]
assert names.count("mix") == 1
assert names.index("mix") < names.index("aspirate")
def test_one_to_one_eight_channel_groups_by_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = list(range(1, 17))
dis_vols = list(range(101, 117))
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0, # 触发逻辑但不 mix
)
)
# 16 个任务 -> 2 组,每组 8 通道一起做
assert [c[0] for c in lh.calls].count("pick_up_tips") == 2
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == 2
assert len(dispenses) == 2
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]]
assert dispenses[1]["resources"] == targets[8:16]
assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]]
def test_one_to_one_eight_channel_requires_multiple_of_8_targets():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer(f"S{i}") for i in range(9)]
targets = [DummyContainer(f"T{i}") for i in range(9)]
with pytest.raises(ValueError, match="multiple of 8"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=[1] * 9,
dis_vols=[1] * 9,
mix_times=0,
)
)
def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(512))
sources = [DummyContainer(f"S{i}") for i in range(16)]
targets = [DummyContainer(f"T{i}") for i in range(16)]
asp_vols = [i + 1 for i in range(16)]
dis_vols = [200 + i for i in range(16)]
asp_flow_rates = [0.1 * (i + 1) for i in range(16)]
dis_flow_rates = [0.2 * (i + 1) for i in range(16)]
offsets = [f"offset_{i}" for i in range(16)]
liquid_heights = [i * 0.5 for i in range(16)]
blow_out_air_volume = [i + 0.05 for i in range(16)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
asp_flow_rates=asp_flow_rates,
dis_flow_rates=dis_flow_rates,
offsets=offsets,
liquid_height=liquid_heights,
blow_out_air_volume=blow_out_air_volume,
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(aspirates) == len(dispenses) == 2
for batch_idx in range(2):
start = batch_idx * 8
end = start + 8
asp_call = aspirates[batch_idx]
dis_call = dispenses[batch_idx]
assert asp_call["resources"] == sources[start:end]
assert asp_call["flow_rates"] == asp_flow_rates[start:end]
assert asp_call["offsets"] == offsets[start:end]
assert asp_call["liquid_height"] == liquid_heights[start:end]
assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
assert dis_call["flow_rates"] == dis_flow_rates[start:end]
assert dis_call["offsets"] == offsets[start:end]
assert dis_call["liquid_height"] == liquid_heights[start:end]
assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
def test_one_to_one_eight_channel_handles_32_tasks_four_batches():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(1024))
sources = [DummyContainer(f"S{i}") for i in range(32)]
targets = [DummyContainer(f"T{i}") for i in range(32)]
asp_vols = [i + 1 for i in range(32)]
dis_vols = [300 + i for i in range(32)]
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=dis_vols,
mix_times=0,
)
)
pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"]
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert len(pick_calls) == 4
assert len(aspirates) == len(dispenses) == 4
assert aspirates[0]["resources"] == sources[0:8]
assert aspirates[-1]["resources"] == sources[24:32]
assert dispenses[0]["resources"] == targets[0:8]
assert dispenses[-1]["resources"] == targets[24:32]
def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(64))
source = DummyContainer("SRC")
targets = [DummyContainer(f"T{i}") for i in range(3)]
dis_vols = [10, 20, 30] # sum=60
run(
lh.transfer_liquid(
sources=[source], sources=[source],
targets=targets, targets=targets,
tip_racks=[prcxi_simulation.tip_rack], tip_racks=[],
use_channels=[0], use_channels=[0],
asp_vols=10.0, asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60
dis_vols=[2.0, 2.0, 2.0, 2.0, 2.0], dis_vols=dis_vols,
offsets=offsets,
mix_times=0, mix_times=0,
) )
) )
for target in targets: aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "one_to_many_source" assert len(aspirates) == 1
assert aspirates[0]["resources"] == [source]
assert aspirates[0]["vols"] == [60.0]
assert aspirates[0]["use_channels"] == [0]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0]
def test_transfer_liquid_single_channel_many_to_one(prcxi_simulation: SimulationContext): def test_one_to_many_eight_channel_basic():
if prcxi_simulation.channel_num != 1: lh = FakeLiquidHandler(channel_num=8)
pytest.skip("仅在单通道配置下运行") lh.current_tip = iter(make_tip_iter(128))
handler = prcxi_simulation.handler source = DummyContainer("SRC")
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: targets = [DummyContainer(f"T{i}") for i in range(8)]
_ensure_unilabos_extra(well) dis_vols = [i + 1 for i in range(8)]
sources = prcxi_simulation.source_plate[0:3]
target = prcxi_simulation.target_plate.children[4]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"many_to_one_{idx}")
offsets = _zero_offsets(max(len(sources), len([target])))
run( run(
handler.transfer_liquid( lh.transfer_liquid(
sources=sources, sources=[source],
targets=[target],
tip_racks=[prcxi_simulation.tip_rack],
use_channels=[0],
asp_vols=[8.0, 9.0, 10.0],
dis_vols=1,
offsets=offsets,
mix_stage="after",
mix_times=1,
mix_vol=5,
)
)
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "many_to_one_2"
def test_transfer_liquid_eight_channel_batches(prcxi_simulation: SimulationContext):
if prcxi_simulation.channel_num != 8:
pytest.skip("仅在八通道配置下运行")
handler = prcxi_simulation.handler
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children:
_ensure_unilabos_extra(well)
sources = prcxi_simulation.source_plate[0:8]
targets = prcxi_simulation.target_plate[16:24]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"batch_{idx}")
offsets = _zero_offsets(len(targets))
use_channels = list(range(8))
asp_vols = [float(i + 1) * 2 for i in range(8)]
dis_vols = [float(i + 10) for i in range(8)]
run(
handler.transfer_liquid(
sources=sources,
targets=targets, targets=targets,
tip_racks=[prcxi_simulation.tip_rack], tip_racks=[],
use_channels=use_channels, use_channels=list(range(8)),
asp_vols=asp_vols, asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自)
dis_vols=dis_vols, dis_vols=dis_vols,
offsets=offsets,
mix_times=0, mix_times=0,
) )
) )
_assert_samples_match(sources, targets) aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
assert aspirates[0]["resources"] == [source] * 8
assert aspirates[0]["vols"] == [float(v) for v in dis_vols]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert dispenses[0]["resources"] == targets
assert dispenses[0]["vols"] == [float(v) for v in dis_vols]
@pytest.mark.parametrize("mix_stage", ["before", "after", "both"]) def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default():
def test_transfer_liquid_mix_stages(prcxi_simulation: SimulationContext, mix_stage: str): lh = FakeLiquidHandler(channel_num=1)
if prcxi_simulation.channel_num != 1: lh.current_tip = iter(make_tip_iter(128))
pytest.skip("仅在单通道配置下运行")
handler = prcxi_simulation.handler sources = [DummyContainer(f"S{i}") for i in range(3)]
for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: target = DummyContainer("T")
_ensure_unilabos_extra(well) asp_vols = [5, 6, 7]
target = prcxi_simulation.target_plate[70]
sources = prcxi_simulation.source_plate[80:82]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"mix_stage_{mix_stage}_{idx}")
run( run(
handler.transfer_liquid( lh.transfer_liquid(
sources=sources, sources=sources,
targets=[target], targets=[target],
tip_racks=[prcxi_simulation.tip_rack], tip_racks=[],
use_channels=[0], use_channels=[0],
asp_vols=[4.0, 5.0], asp_vols=asp_vols,
dis_vols=1, dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol
offsets=_zero_offsets(len(sources)), mix_times=0,
mix_stage=mix_stage,
mix_times=2,
mix_vol=3,
) )
) )
# mix_stage 前后都应该保留最新源的 sample_uuid dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == f"mix_stage_{mix_stage}_1" assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols]
if prcxi_simulation.channel_num != 8: assert all(d["resources"] == [target] for d in dispenses)
pytest.skip("仅在八通道配置下运行")
handler = prcxi_simulation.handler
sources = prcxi_simulation.source_plate[0:8]
targets = prcxi_simulation.target_plate[16:24]
for idx, src in enumerate(sources):
_assign_sample_uuid(src, f"batch_{idx}")
offsets = _zero_offsets(len(targets))
use_channels = list(range(8)) def test_many_to_one_single_channel_before_stage_mixes_target_once():
asp_vols = [float(i + 1) * 2 for i in range(8)] lh = FakeLiquidHandler(channel_num=1)
dis_vols = [float(i + 10) for i in range(8)] lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer("S0"), DummyContainer("S1")]
target = DummyContainer("T")
run( run(
handler.transfer_liquid( lh.transfer_liquid(
sources=sources, sources=sources,
targets=targets, targets=[target],
tip_racks=[prcxi_simulation.tip_rack], tip_racks=[],
use_channels=use_channels, use_channels=[0],
asp_vols=asp_vols, asp_vols=[5, 6],
dis_vols=dis_vols, dis_vols=1,
offsets=offsets, mix_stage="before",
mix_stage="after",
mix_times=2, mix_times=2,
mix_vol=3, mix_vol=4,
) )
) )
_assert_samples_match(sources, targets) names = [name for name, _ in lh.calls]
assert names[0] == "mix"
assert names.count("mix") == 1
def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source():
lh = FakeLiquidHandler(channel_num=1)
lh.current_tip = iter(make_tip_iter(128))
sources = [DummyContainer(f"S{i}") for i in range(3)]
target = DummyContainer("T")
asp_vols = [5, 6, 7]
dis_vols = [1, 2, 3]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=[0],
asp_vols=asp_vols,
dis_vols=dis_vols, # 比例模式
mix_times=0,
)
)
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols]
def test_many_to_one_eight_channel_basic():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(256))
sources = [DummyContainer(f"S{i}") for i in range(8)]
target = DummyContainer("T")
asp_vols = [10 + i for i in range(8)]
run(
lh.transfer_liquid(
sources=sources,
targets=[target],
tip_racks=[],
use_channels=list(range(8)),
asp_vols=asp_vols,
dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol
mix_times=0,
)
)
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
assert aspirates[0]["resources"] == sources
assert aspirates[0]["vols"] == [float(v) for v in asp_vols]
assert dispenses[0]["resources"] == [target] * 8
assert dispenses[0]["vols"] == [float(v) for v in asp_vols]
def test_transfer_liquid_mode_detection_unsupported_shape_raises():
lh = FakeLiquidHandler(channel_num=8)
lh.current_tip = iter(make_tip_iter(64))
sources = [DummyContainer("S0"), DummyContainer("S1")]
targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")]
with pytest.raises(ValueError, match="Unsupported transfer mode"):
run(
lh.transfer_liquid(
sources=sources,
targets=targets,
tip_racks=[],
use_channels=[0],
asp_vols=[1, 1],
dis_vols=[1, 1, 1],
mix_times=0,
)
)

View File

@@ -418,7 +418,7 @@ def main():
# 如果从远端获取了物料信息,则与本地物料进行同步 # 如果从远端获取了物料信息,则与本地物料进行同步
if request_startup_json and "nodes" in request_startup_json: if request_startup_json and "nodes" in request_startup_json:
print_status("开始同步远端物料到本地...", "info") print_status("开始同步远端物料到本地...", "info")
remote_tree_set = ResourceTreeSet.from_raw_list(request_startup_json["nodes"]) remote_tree_set = ResourceTreeSet.from_raw_dict_list(request_startup_json["nodes"])
resource_tree_set.merge_remote_resources(remote_tree_set) resource_tree_set.merge_remote_resources(remote_tree_set)
print_status("远端物料同步完成", "info") print_status("远端物料同步完成", "info")

View File

@@ -579,6 +579,8 @@ class MessageProcessor:
elif message_type == "session_id": elif message_type == "session_id":
self.session_id = message_data.get("session_id") self.session_id = message_data.get("session_id")
logger.info(f"[MessageProcessor] Session ID: {self.session_id}") logger.info(f"[MessageProcessor] Session ID: {self.session_id}")
elif message_type == "request_reload":
await self._handle_request_reload(message_data)
else: else:
logger.debug(f"[MessageProcessor] Unknown message type: {message_type}") logger.debug(f"[MessageProcessor] Unknown message type: {message_type}")
@@ -888,6 +890,20 @@ class MessageProcessor:
) )
thread.start() thread.start()
async def _handle_request_reload(self, data: Dict[str, Any]):
"""
处理重载请求
当LabGo发送request_reload时重新发送设备注册信息
"""
reason = data.get("reason", "unknown")
logger.info(f"[MessageProcessor] Received reload request, reason: {reason}")
# 重新发送host_node_ready信息
if self.websocket_client:
self.websocket_client.publish_host_ready()
logger.info("[MessageProcessor] Re-sent host_node_ready after reload request")
async def _send_action_state_response( async def _send_action_state_response(
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int
): ):
@@ -1282,7 +1298,7 @@ class WebSocketClient(BaseCommunicationClient):
self.message_processor.send_message(message) self.message_processor.send_message(message)
job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name) job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
logger.debug(f"[WebSocketClient] Job status published: {job_log} - {status}") logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}")
def send_ping(self, ping_id: str, timestamp: float) -> None: def send_ping(self, ping_id: str, timestamp: float) -> None:
"""发送ping消息""" """发送ping消息"""
@@ -1313,17 +1329,55 @@ class WebSocketClient(BaseCommunicationClient):
logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}") logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}")
def publish_host_ready(self) -> None: def publish_host_ready(self) -> None:
"""发布host_node ready信号""" """发布host_node ready信号,包含设备和动作信息"""
if self.is_disabled or not self.is_connected(): if self.is_disabled or not self.is_connected():
logger.debug("[WebSocketClient] Not connected, cannot publish host ready signal") logger.debug("[WebSocketClient] Not connected, cannot publish host ready signal")
return return
# 收集设备信息
devices = []
machine_name = BasicConfig.machine_name
try:
host_node = HostNode.get_instance(0)
if host_node:
# 获取设备信息
for device_id, namespace in host_node.devices_names.items():
device_key = f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}"
is_online = device_key in host_node._online_devices
# 获取设备的动作信息
actions = {}
for action_id, client in host_node._action_clients.items():
# action_id 格式: /namespace/device_id/action_name
if device_id in action_id:
action_name = action_id.split("/")[-1]
actions[action_name] = {
"action_path": action_id,
"action_type": str(type(client).__name__),
}
devices.append({
"device_id": device_id,
"namespace": namespace,
"device_key": device_key,
"is_online": is_online,
"machine_name": host_node.device_machine_names.get(device_id, machine_name),
"actions": actions,
})
logger.info(f"[WebSocketClient] Collected {len(devices)} devices for host_ready")
except Exception as e:
logger.warning(f"[WebSocketClient] Error collecting device info: {e}")
message = { message = {
"action": "host_node_ready", "action": "host_node_ready",
"data": { "data": {
"status": "ready", "status": "ready",
"timestamp": time.time(), "timestamp": time.time(),
"machine_name": machine_name,
"devices": devices,
}, },
} }
self.message_processor.send_message(message) self.message_processor.send_message(message)
logger.info("[WebSocketClient] Host node ready signal published") logger.info(f"[WebSocketClient] Host node ready signal published with {len(devices)} devices")

View File

@@ -71,7 +71,16 @@ class PRCXI9300Deck(Deck):
def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs): def __init__(self, name: str, size_x: float, size_y: float, size_z: float, **kwargs):
super().__init__(name, size_x, size_y, size_z) super().__init__(name, size_x, size_y, size_z)
self.slots = [None] * 6 # PRCXI 9300 有 6 个槽位 self.slots = [None] * 16 # PRCXI 9300/9320 最大16 个槽位
self.slot_locations = [Coordinate(0, 0, 0)] * 16
def assign_child_at_slot(self, resource: Resource, slot: int, reassign: bool = False) -> None:
if self.slots[slot - 1] is not None and not reassign:
raise ValueError(f"Spot {slot} is already occupied")
self.slots[slot - 1] = resource
super().assign_child_resource(resource, location=self.slot_locations[slot - 1])
class PRCXI9300Container(Plate): class PRCXI9300Container(Plate):
"""PRCXI 9300 的专用 Container 类,继承自 Plate用于槽位定位和未知模块。 """PRCXI 9300 的专用 Container 类,继承自 Plate用于槽位定位和未知模块。
@@ -87,81 +96,19 @@ class PRCXI9300Container(Plate):
category: str, category: str,
ordering: collections.OrderedDict, ordering: collections.OrderedDict,
model: Optional[str] = None, model: Optional[str] = None,
material_info: Optional[Dict[str, Any]] = None,
ordering_layout: str = "col-major",
**kwargs, **kwargs,
): ):
super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model) super().__init__(name, size_x, size_y, size_z, category=category, ordering=ordering, model=model)
self._unilabos_state = {} self._unilabos_state = {}
self.sites = kwargs.get("sites", [])
self.sites = create_homogeneous_resources(
klass=ResourceHolder,
locations=[Coordinate(0, 0, 0)],
resource_size_x=size_x,
resource_size_y=size_y,
resource_size_z=size_z,
name_prefix=name,
)[0]
# 为 ItemizedCarrier 添加 _unilabos_state 属性,以便与其他 PRCXI 组件兼容
sites_resource = ItemizedCarrier(
name=name+"_sites",
sites={name: self.sites},
size_x=size_x,
size_y=size_y,
size_z=size_z,
category="warehouse",
model=model,
)
sites_resource._unilabos_state = {} # 添加 _unilabos_state 属性
if material_info:
sites_resource._unilabos_state["Material"] = material_info
self.assign_child_resource(sites_resource, location=self.sites.location)
# 保存排序方式供graphio.py的坐标映射使用
# 使用独立属性避免与父类的layout冲突
self.ordering_layout = ordering_layout
def serialize(self) -> dict:
"""序列化时保存 ordering_layout 属性"""
data = super().serialize()
data['ordering_layout'] = self.ordering_layout
return data
def load_state(self, state: Dict[str, Any]) -> None: def load_state(self, state: Dict[str, Any]) -> None:
"""从给定的状态加载工作台信息。""" """从给定的状态加载工作台信息。"""
super().load_state(state) super().load_state(state)
self._unilabos_state = state self._unilabos_state = state
def get_site(self) -> ResourceHolder:
"""获取容器的站点"""
return self.sites
def add_resource_to_site(self, resource) -> None:
"""向站点添加资源"""
self.sites.assign_child_resource(resource)
def get_resource_at_site(self):
"""获取站点上的资源"""
return self.sites.children[0] if self.sites.children else None
def serialize_state(self) -> Dict[str, Dict[str, Any]]: def serialize_state(self) -> Dict[str, Dict[str, Any]]:
data = super().serialize_state() data = super().serialize_state()
data.update(self._unilabos_state) data.update(self._unilabos_state)
# 避免序列化 ResourceHolder 对象
if hasattr(self, 'sites') and self.sites:
# 创建 sites 的可序列化版本
if hasattr(self.sites, '__class__') and 'pylabrobot' in str(self.sites.__class__.__module__):
data['sites'] = {
"__pylabrobot_object__": True,
"class": self.sites.__class__.__name__,
"module": self.sites.__class__.__module__,
"name": getattr(self.sites, 'name', str(self.sites))
}
else:
data['sites'] = self.sites
return data return data
class PRCXI9300Plate(Plate): class PRCXI9300Plate(Plate):
""" """

View File

@@ -134,7 +134,7 @@ def canonicalize_nodes_data(
parent_instance.children.append(current_instance) parent_instance.children.append(current_instance)
# 第五步:创建 ResourceTreeSet # 第五步:创建 ResourceTreeSet
resource_tree_set = ResourceTreeSet.from_nested_list(standardized_instances) resource_tree_set = ResourceTreeSet.from_nested_instance_list(standardized_instances)
return resource_tree_set return resource_tree_set

View File

@@ -5,7 +5,8 @@ import json
import threading import threading
import time import time
import traceback import traceback
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \
Tuple
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import asyncio import asyncio
@@ -362,78 +363,82 @@ class BaseROS2DeviceNode(Node, Generic[T]):
return res return res
async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response): async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response):
from pylabrobot.resources.resource import Resource as ResourcePLR
from pylabrobot.resources.deck import Deck
from pylabrobot.resources import Coordinate
from pylabrobot.resources import Plate
# 物料传输到对应的node节点 # 物料传输到对应的node节点
rclient = self.create_client(ResourceAdd, "/resources/add") client = self._resource_clients["c2s_update_resource_tree"]
rclient.wait_for_service() request = SerialCommand.Request()
rclient2 = self.create_client(ResourceAdd, "/resources/add") request2 = SerialCommand.Request()
rclient2.wait_for_service()
request = ResourceAdd.Request()
request2 = ResourceAdd.Request()
command_json = json.loads(req.command) command_json = json.loads(req.command)
namespace = command_json["namespace"] namespace = command_json["namespace"]
bind_parent_id = command_json["bind_parent_id"] bind_parent_id = command_json["bind_parent_id"]
edge_device_id = command_json["edge_device_id"] edge_device_id = command_json["edge_device_id"]
location = command_json["bind_location"] location = command_json["bind_location"]
other_calling_param = command_json["other_calling_param"] other_calling_param = command_json["other_calling_param"]
resources = command_json["resource"] input_resources = command_json["resource"]
initialize_full = other_calling_param.pop("initialize_full", False) initialize_full = other_calling_param.pop("initialize_full", False)
# 用来增加液体 # 用来增加液体
ADD_LIQUID_TYPE = other_calling_param.pop("ADD_LIQUID_TYPE", []) ADD_LIQUID_TYPE = other_calling_param.pop("ADD_LIQUID_TYPE", [])
LIQUID_VOLUME = other_calling_param.pop("LIQUID_VOLUME", []) LIQUID_VOLUME: List[float] = other_calling_param.pop("LIQUID_VOLUME", [])
LIQUID_INPUT_SLOT = other_calling_param.pop("LIQUID_INPUT_SLOT", []) LIQUID_INPUT_SLOT: List[int] = other_calling_param.pop("LIQUID_INPUT_SLOT", [])
slot = other_calling_param.pop("slot", "-1") slot = other_calling_param.pop("slot", "-1")
resource = None if slot != -1: # slot为负数的时候采用assign方法
if slot != "-1": # slot为负数的时候采用assign方法
other_calling_param["slot"] = slot other_calling_param["slot"] = slot
# 本地拿到这个物料,可能需要先做初始化? # 本地拿到这个物料,可能需要先做初始化
if isinstance(resources, list): if isinstance(input_resources, list) and initialize_full:
if ( input_resources = initialize_resources(input_resources)
len(resources) == 1 and isinstance(resources[0], list) and not initialize_full elif initialize_full:
): # 取消,不存在的情况 input_resources = initialize_resources([input_resources])
# 预先initialize过以整组的形式传入 rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources)
request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]] parent_resource = None
elif initialize_full: if bind_parent_id != self.node_name:
resources = initialize_resources(resources) parent_resource = self.resource_tracker.figure_resource(
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources] {"name": bind_parent_id}
else: )
request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources] for r in rts.root_nodes:
else: # noinspection PyUnresolvedReferences
if initialize_full: r.res_content.parent_uuid = parent_resource.unilabos_uuid
resources = initialize_resources([resources])
request.resources = [convert_to_ros_msg(Resource, resources)] if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer):
if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1: # noinspection PyTypeChecker
container_instance = request.resources[0] container_instance: RegularContainer = rts.root_nodes[0]
container_query_dict: dict = resources
found_resources = self.resource_tracker.figure_resource( found_resources = self.resource_tracker.figure_resource(
{"id": container_query_dict["name"]}, try_mode=True {"id": container_instance.name}, try_mode=True
) )
if not len(found_resources): if not len(found_resources):
self.resource_tracker.add_resource(container_instance) self.resource_tracker.add_resource(container_instance)
logger.info(f"添加物料{container_query_dict['name']}到资源跟踪器") logger.info(f"添加物料{container_instance.name}到资源跟踪器")
else: else:
assert ( assert (
len(found_resources) == 1 len(found_resources) == 1
), f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统" ), f"找到多个同名物料: {container_instance.name}, 请检查物料系统"
resource = found_resources[0] found_resource = found_resources[0]
if isinstance(resource, Resource): if isinstance(found_resource, RegularContainer):
regular_container = RegularContainer(resource.id) logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}")
regular_container.ulr_resource = resource found_resource.state.update(json.loads(container_instance.state))
regular_container.ulr_resource_data.update(json.loads(container_instance.data)) elif isinstance(found_resource, dict):
logger.info(f"更新物料{container_query_dict['name']}的数据{resource.data} ULR") raise ValueError("已不支持 字典 版本的RegularContainer")
elif isinstance(resource, dict):
if "data" not in resource:
resource["data"] = {}
resource["data"].update(json.loads(container_instance.data))
request.resources[0].name = resource["name"]
logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict")
else: else:
logger.info( logger.info(
f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}" f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}"
) )
response: ResourceAdd.Response = await rclient.call_async(request) # noinspection PyUnresolvedReferences
# 应该先add_resource了 request.command = json.dumps({
"action": "add",
"data": {
"data": rts.dump(),
"mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "",
"first_add": True,
},
})
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
self.lab_logger().info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
final_response = { final_response = {
"created_resources": [ROS2MessageInstance(i).get_python_dict() for i in request.resources], "created_resources": rts.dump(),
"liquid_input_resources": [], "liquid_input_resources": [],
} }
res.response = json.dumps(final_response) res.response = json.dumps(final_response)
@@ -458,59 +463,60 @@ class BaseROS2DeviceNode(Node, Generic[T]):
) )
res.response = get_result_info_str(traceback.format_exc(), False, {}) res.response = get_result_info_str(traceback.format_exc(), False, {})
return res return res
# 接下来该根据bind_parent_id进行assign了目前只有plr可以进行assign不然没有办法输入到物料系统中
if bind_parent_id != self.node_name:
resource = self.resource_tracker.figure_resource(
{"name": bind_parent_id}
) # 拿到父节点进行具体assign等操作
# request.resources = [convert_to_ros_msg(Resource, resources)]
try: try:
from pylabrobot.resources.resource import Resource as ResourcePLR if len(rts.root_nodes) == 1 and parent_resource is not None:
from pylabrobot.resources.deck import Deck plr_instance = rts.to_plr_resources()[0]
from pylabrobot.resources import Coordinate
from pylabrobot.resources import OTDeck
from pylabrobot.resources import Plate
contain_model = not isinstance(resource, Deck)
if isinstance(resource, ResourcePLR):
# resources.list()
plr_instance = ResourceTreeSet.from_raw_list(resources).to_plr_resources()[0]
# resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources}))
# plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model)
if isinstance(plr_instance, Plate): if isinstance(plr_instance, Plate):
empty_liquid_info_in = [(None, 0)] * plr_instance.num_items empty_liquid_info_in: List[Tuple[Optional[str], float]] = [(None, 0)] * plr_instance.num_items
if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1:
ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT)
LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT)
self.lab_logger().warning(f"增加液体资源时数量为1自动补全为 {len(LIQUID_INPUT_SLOT)}")
for liquid_type, liquid_volume, liquid_input_slot in zip( for liquid_type, liquid_volume, liquid_input_slot in zip(
ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT
): ):
empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume) empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume)
plr_instance.set_well_liquids(empty_liquid_info_in) plr_instance.set_well_liquids(empty_liquid_info_in)
input_wells_ulr = [ try:
convert_to_ros_msg( # noinspection PyProtectedMember
Resource, keys = list(plr_instance._ordering.keys())
resource_plr_to_ulab(plr_instance.get_well(LIQUID_INPUT_SLOT), with_children=False), for ind, r in enumerate(LIQUID_INPUT_SLOT[:]):
) if isinstance(r, int):
for r in LIQUID_INPUT_SLOT # noinspection PyTypeChecker
] LIQUID_INPUT_SLOT[ind] = keys[r]
final_response["liquid_input_resources"] = [ input_wells = [plr_instance.get_well(r) for r in LIQUID_INPUT_SLOT]
ROS2MessageInstance(i).get_python_dict() for i in input_wells_ulr except AttributeError:
] # 按照id回去失败回退到children
input_wells = []
for r in LIQUID_INPUT_SLOT:
input_wells.append(plr_instance.children[r])
final_response["liquid_input_resources"] = ResourceTreeSet.from_plr_resources(input_wells).dump()
res.response = json.dumps(final_response) res.response = json.dumps(final_response)
if isinstance(resource, OTDeck) and "slot" in other_calling_param: if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param:
other_calling_param["slot"] = int(other_calling_param["slot"]) other_calling_param["slot"] = int(other_calling_param["slot"])
resource.assign_child_at_slot(plr_instance, **other_calling_param) parent_resource.assign_child_at_slot(plr_instance, **other_calling_param)
else: else:
_discard_slot = other_calling_param.pop("slot", "-1") _discard_slot = other_calling_param.pop("slot", -1)
resource.assign_child_resource( parent_resource.assign_child_resource(
plr_instance, plr_instance,
Coordinate(location["x"], location["y"], location["z"]), Coordinate(location["x"], location["y"], location["z"]),
**other_calling_param, **other_calling_param,
) )
request2.resources = [ # 调整了液体以及Deck之后要重新Assign
convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)]) # noinspection PyUnresolvedReferences
] request.command = json.dumps({
rclient2.call(request2) "action": "add",
"data": {
"data": ResourceTreeSet.from_plr_resources([parent_resource]).dump(),
"mount_uuid": parent_resource.parent.unilabos_uuid if parent_resource.parent is not None else self.uuid,
"first_add": False,
},
})
tree_response: SerialCommand.Response = await client.call_async(request)
uuid_maps = json.loads(tree_response.response)
self.resource_tracker.loop_update_uuid(input_resources, uuid_maps)
self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes")
# 这里created_resources不包含parent_resource
# 发送给ResourceMeshManager # 发送给ResourceMeshManager
action_client = ActionClient( action_client = ActionClient(
self, self,
@@ -521,7 +527,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
goal = SendCmd.Goal() goal = SendCmd.Goal()
goal.command = json.dumps( goal.command = json.dumps(
{ {
"resources": resources, "resources": input_resources,
"bind_parent_id": bind_parent_id, "bind_parent_id": bind_parent_id,
} }
) )
@@ -614,7 +620,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
) )
) # type: ignore ) # type: ignore
raw_nodes = json.loads(response.response) raw_nodes = json.loads(response.response)
tree_set = ResourceTreeSet.from_raw_list(raw_nodes) tree_set = ResourceTreeSet.from_raw_dict_list(raw_nodes)
self.lab_logger().debug(f"获取资源结果: {len(tree_set.trees)} 个资源树") self.lab_logger().debug(f"获取资源结果: {len(tree_set.trees)} 个资源树")
return tree_set return tree_set
@@ -642,7 +648,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raw_data = json.loads(response.response) raw_data = json.loads(response.response)
# 转换为 PLR 资源 # 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(raw_data) tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
plr_resource = tree_set.to_plr_resources()[0] plr_resource = tree_set.to_plr_resources()[0]
self.lab_logger().debug(f"获取资源 {resource_id} 成功") self.lab_logger().debug(f"获取资源 {resource_id} 成功")
return plr_resource return plr_resource
@@ -1523,7 +1529,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raw_data = json.loads(response.response) raw_data = json.loads(response.response)
# 转换为 PLR 资源 # 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(raw_data) tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
plr_resource = tree_set.to_plr_resources()[0] plr_resource = tree_set.to_plr_resources()[0]
# 通过资源跟踪器获取本地实例 # 通过资源跟踪器获取本地实例

View File

@@ -45,6 +45,7 @@ from unilabos.ros.nodes.resource_tracker import (
) )
from unilabos.utils import logger from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.log import warning
from unilabos.utils.type_check import serialize_result_info from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
@@ -180,7 +181,7 @@ class HostNode(BaseROS2DeviceNode):
for plr_resource in ResourceTreeSet([tree]).to_plr_resources(): for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource) self._resource_tracker.add_resource(plr_resource)
except Exception as ex: except Exception as ex:
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!") warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
except Exception as ex: except Exception as ex:
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}") logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
# 初始化Node基类传递空参数覆盖列表 # 初始化Node基类传递空参数覆盖列表
@@ -455,10 +456,10 @@ class HostNode(BaseROS2DeviceNode):
async def create_resource( async def create_resource(
self, self,
device_id: str, device_id: DeviceSlot,
res_id: str, res_id: str,
class_name: str, class_name: str,
parent: str, parent: ResourceSlot,
bind_locations: Point, bind_locations: Point,
liquid_input_slot: list[int] = [], liquid_input_slot: list[int] = [],
liquid_type: list[str] = [], liquid_type: list[str] = [],
@@ -805,7 +806,7 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}") self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}")
if goal_status != GoalStatus.STATUS_CANCELED: if goal_status != GoalStatus.STATUS_CANCELED:
self.lab_logger().debug(f"[Host Node] Result data: {result_data}") self.lab_logger().trace(f"[Host Node] Result data: {result_data}")
# 清理 _goals 中的记录 # 清理 _goals 中的记录
if job_id in self._goals: if job_id in self._goals:

View File

@@ -244,7 +244,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode):
r r
) # type: ignore ) # type: ignore
raw_data = json.loads(response.response) raw_data = json.loads(response.response)
tree_set = ResourceTreeSet.from_raw_list(raw_data) tree_set = ResourceTreeSet.from_raw_dict_list(raw_data)
target = tree_set.dump() target = tree_set.dump()
protocol_kwargs[k] = target[0][0] if v == "unilabos_msgs/Resource" else target protocol_kwargs[k] = target[0][0] if v == "unilabos_msgs/Resource" else target
except Exception as ex: except Exception as ex:

View File

@@ -523,7 +523,7 @@ class ResourceTreeSet(object):
return plr_resources return plr_resources
@classmethod @classmethod
def from_raw_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet": def from_raw_dict_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet":
""" """
从原始字典列表创建 ResourceTreeSet自动建立 parent-children 关系 从原始字典列表创建 ResourceTreeSet自动建立 parent-children 关系
@@ -573,10 +573,10 @@ class ResourceTreeSet(object):
parent_instance.children.append(instance) parent_instance.children.append(instance)
# 第四步:使用 from_nested_list 创建 ResourceTreeSet # 第四步:使用 from_nested_list 创建 ResourceTreeSet
return cls.from_nested_list(instances) return cls.from_nested_instance_list(instances)
@classmethod @classmethod
def from_nested_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet": def from_nested_instance_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet":
""" """
从扁平化的资源列表创建ResourceTreeSet自动按根节点分组 从扁平化的资源列表创建ResourceTreeSet自动按根节点分组
@@ -785,7 +785,7 @@ class ResourceTreeSet(object):
""" """
nested_lists = [] nested_lists = []
for tree_data in data: for tree_data in data:
nested_lists.extend(ResourceTreeSet.from_raw_list(tree_data).trees) nested_lists.extend(ResourceTreeSet.from_raw_dict_list(tree_data).trees)
return cls(nested_lists) return cls(nested_lists)
@@ -965,7 +965,7 @@ class DeviceNodeResourceTracker(object):
if current_uuid in self.uuid_to_resources: if current_uuid in self.uuid_to_resources:
self.uuid_to_resources.pop(current_uuid) self.uuid_to_resources.pop(current_uuid)
self.uuid_to_resources[new_uuid] = res self.uuid_to_resources[new_uuid] = res
logger.debug(f"更新uuid: {current_uuid} -> {new_uuid}") logger.trace(f"更新uuid: {current_uuid} -> {new_uuid}")
replaced = 1 replaced = 1
return replaced return replaced