diff --git a/tests/devices/liquid_handling/test_transfer_liquid.py b/tests/devices/liquid_handling/test_transfer_liquid.py index e429ce2..9896aac 100644 --- a/tests/devices/liquid_handling/test_transfer_liquid.py +++ b/tests/devices/liquid_handling/test_transfer_liquid.py @@ -1,337 +1,505 @@ -""" -PRCXI transfer_liquid 集成测试。 - -这些用例会启动 UniLiquidHandler RViz 仿真 backend,需要同时满足: -1. 安装 pylabrobot 依赖; -2. 设置环境变量 UNILAB_SIM_TEST=1; -3. 具备 ROS 运行环境(rviz_backend 会创建 ROS 节点)。 -""" import asyncio -import os from dataclasses import dataclass -from typing import List, Sequence +from typing import Any, Iterable, List, Optional, Sequence, Tuple import pytest from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract -from unilabos.devices.liquid_handling.prcxi.prcxi import PRCXI9300Deck, PRCXI9300Trash -from unilabos.devices.liquid_handling.prcxi.prcxi_labware import ( - PRCXI_300ul_Tips, - PRCXI_BioER_96_wellplate, -) - -pytestmark = pytest.mark.slow - -try: - from pylabrobot.resources import Coordinate, Deck, Plate, TipRack, Well -except ImportError: # pragma: no cover - 测试环境缺少 pylabrobot 时直接跳过 - Coordinate = Deck = Plate = TipRack = Well = None # type: ignore[assignment] - PYLABROBOT_AVAILABLE = False -else: - PYLABROBOT_AVAILABLE = True - -SIM_ENV_VAR = "UNILAB_SIM_TEST" -@dataclass -class SimulationContext: - handler: LiquidHandlerAbstract - deck: Deck - tip_rack: TipRack - source_plate: Plate - target_plate: Plate - waste_plate: Plate - channel_num: int +@dataclass(frozen=True) +class DummyContainer: + name: str + + def __repr__(self) -> str: # pragma: no cover + return f"DummyContainer({self.name})" + + +@dataclass(frozen=True) +class DummyTipSpot: + name: str + + def __repr__(self) -> str: # pragma: no cover + return f"DummyTipSpot({self.name})" + + +def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]: + """Yield lists so code can safely call `tip.extend(next(self.current_tip))`.""" + for i in range(n): + yield [DummyTipSpot(f"tip_{i}")] + + +class FakeLiquidHandler(LiquidHandlerAbstract): + """不初始化真实 backend/deck;仅用来记录 transfer_liquid 内部调用序列。""" + + def __init__(self, channel_num: int = 8): + # 不调用 super().__init__,避免真实硬件/后端依赖 + self.channel_num = channel_num + self.support_touch_tip = True + self.current_tip = iter(make_tip_iter()) + self.calls: List[Tuple[str, Any]] = [] + + async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs): + self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels})) + + async def aspirate( + self, + resources: Sequence[Any], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Any = None, + liquid_height: Any = None, + blow_out_air_volume: Any = None, + spread: str = "wide", + **backend_kwargs, + ): + self.calls.append( + ( + "aspirate", + { + "resources": list(resources), + "vols": list(vols), + "use_channels": list(use_channels) if use_channels is not None else None, + "flow_rates": list(flow_rates) if flow_rates is not None else None, + "offsets": list(offsets) if offsets is not None else None, + "liquid_height": list(liquid_height) if liquid_height is not None else None, + "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, + }, + ) + ) + + async def dispense( + self, + resources: Sequence[Any], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Any = None, + liquid_height: Any = None, + blow_out_air_volume: Any = None, + spread: str = "wide", + **backend_kwargs, + ): + self.calls.append( + ( + "dispense", + { + "resources": list(resources), + "vols": list(vols), + "use_channels": list(use_channels) if use_channels is not None else None, + "flow_rates": list(flow_rates) if flow_rates is not None else None, + "offsets": list(offsets) if offsets is not None else None, + "liquid_height": list(liquid_height) if liquid_height is not None else None, + "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, + }, + ) + ) + + async def discard_tips(self, use_channels=None, *args, **kwargs): + # 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数) + self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None})) + + async def custom_delay(self, seconds=0, msg=None): + self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg})) + + async def touch_tip(self, targets): + # 原实现会访问 targets.get_size_x() 等;测试里只记录调用 + self.calls.append(("touch_tip", {"targets": targets})) + + async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None): + self.calls.append( + ( + "mix", + { + "targets": targets, + "mix_time": mix_time, + "mix_vol": mix_vol, + }, + ) + ) def run(coro): return asyncio.run(coro) -def _ensure_unilabos_extra(well: Well) -> None: - if not hasattr(well, "unilabos_extra") or well.unilabos_extra is None: - well.unilabos_extra = {} # type: ignore[attr-defined] +def test_one_to_one_single_channel_basic_calls(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(64)) + sources = [DummyContainer(f"S{i}") for i in range(3)] + targets = [DummyContainer(f"T{i}") for i in range(3)] -def _assign_sample_uuid(well: Well, value: str) -> None: - _ensure_unilabos_extra(well) - well.unilabos_extra["sample_uuid"] = value # type: ignore[attr-defined] - - -def _zero_coordinate() -> Coordinate: - if hasattr(Coordinate, "zero"): - return Coordinate.zero() - return Coordinate(0, 0, 0) - - -def _zero_offsets(count: int) -> List[Coordinate]: - return [_zero_coordinate() for _ in range(count)] - - -def _build_simulation_deck() -> tuple[PRCXI9300Deck, TipRack, Plate, Plate, Plate, PRCXI9300Trash]: - deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=542, size_y=374, size_z=50) - tip_rack = PRCXI_300ul_Tips("Tips") - source_plate = PRCXI_BioER_96_wellplate("SourcePlate") - target_plate = PRCXI_BioER_96_wellplate("TargetPlate") - waste_plate = PRCXI_BioER_96_wellplate("WastePlate") - trash = PRCXI9300Trash(name="trash", size_x=100, size_y=100, size_z=50) - deck.assign_child_resource(tip_rack, location=Coordinate(0, 0, 0)) - deck.assign_child_resource(source_plate, location=Coordinate(150, 0, 0)) - deck.assign_child_resource(target_plate, location=Coordinate(300, 0, 0)) - deck.assign_child_resource(waste_plate, location=Coordinate(450, 0, 0)) - deck.assign_child_resource(trash, location=Coordinate(150, -120, 0)) - return deck, tip_rack, source_plate, target_plate, waste_plate, trash - - -def _stop_backend(handler: LiquidHandlerAbstract) -> None: - try: - run(handler.backend.stop()) - except Exception: # pragma: no cover - 如果 backend 已经停止 - pass - simulate_handler = getattr(handler, "_simulate_handler", None) - if simulate_handler is not None and getattr(simulate_handler, "backend", None) is not None: - try: - run(simulate_handler.backend.stop()) - except Exception: # pragma: no cover - pass - - -@pytest.fixture(params=[1, 8]) -def prcxi_simulation(request) -> SimulationContext: - if not PYLABROBOT_AVAILABLE: - pytest.skip("pylabrobot is required for PRCXI simulation tests.") - if os.environ.get(SIM_ENV_VAR) != "1": - pytest.skip(f"Set {SIM_ENV_VAR}=1 to run PRCXI simulation tests.") - - channel_num = request.param - deck, tip_rack, source_plate, target_plate, waste_plate, _trash = _build_simulation_deck() - backend_cfg = { - "type": "unilabos.devices.liquid_handling.rviz_backend.UniLiquidHandlerRvizBackend", - "channel_num": channel_num, - "total_height": 310, - "lh_device_id": f"pytest_prcxi_{channel_num}", - } - handler = LiquidHandlerAbstract( - backend=backend_cfg, - deck=deck, - simulator=True, - channel_num=channel_num, - total_height=310, - ) - run(handler.setup()) - handler.set_tiprack([tip_rack]) - handler.support_touch_tip = False - - context = SimulationContext( - handler=handler, - deck=deck, - tip_rack=tip_rack, - source_plate=source_plate, - target_plate=target_plate, - waste_plate=waste_plate, - channel_num=channel_num, - ) - - yield context - - _stop_backend(handler) - - -def _pick_wells(plate: Plate, start: int, count: int) -> List[Well]: - wells = plate.children[start : start + count] - for well in wells: - _ensure_unilabos_extra(well) - return wells - - -def _assert_samples_match(sources: Sequence[Well], targets: Sequence[Well]) -> None: - for src, tgt in zip(sources, targets): - src_uuid = getattr(src, "unilabos_extra", {}).get("sample_uuid") - tgt_uuid = getattr(tgt, "unilabos_extra", {}).get("sample_uuid") - assert tgt_uuid == src_uuid - - -def test_transfer_liquid_single_channel_one_to_one(prcxi_simulation: SimulationContext): - if prcxi_simulation.channel_num != 1: - pytest.skip("仅在单通道配置下运行") - - handler = prcxi_simulation.handler - for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: - _ensure_unilabos_extra(well) - sources = prcxi_simulation.source_plate[0:3] - targets = prcxi_simulation.target_plate["A4:A6"] - for idx, src in enumerate(sources): - _assign_sample_uuid(src, f"single_{idx}") - offsets = _zero_offsets(max(len(sources), len(targets))) - - result = run( - handler.transfer_liquid( + run( + lh.transfer_liquid( sources=sources, targets=targets, - tip_racks=[prcxi_simulation.tip_rack], + tip_racks=[], use_channels=[0], - asp_vols=[5.0, 6.0, 7.0], - dis_vols=[10.0, 11.0, 12.0], - offsets=offsets, - mix_times=None, + asp_vols=[1, 2, 3], + dis_vols=[4, 5, 6], + mix_times=None, # 应该仍能执行(不 mix) ) ) - # assert result == """""" + assert [c[0] for c in lh.calls].count("pick_up_tips") == 3 + assert [c[0] for c in lh.calls].count("aspirate") == 3 + assert [c[0] for c in lh.calls].count("dispense") == 3 + assert [c[0] for c in lh.calls].count("discard_tips") == 3 - _assert_samples_match(sources, targets) + # 每次 aspirate/dispense 都是单孔列表 + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert aspirates[0]["resources"] == [sources[0]] + assert aspirates[0]["vols"] == [1.0] + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert dispenses[2]["resources"] == [targets[2]] + assert dispenses[2]["vols"] == [6.0] -def test_transfer_liquid_single_channel_one_to_many(prcxi_simulation: SimulationContext): - if prcxi_simulation.channel_num != 1: - pytest.skip("仅在单通道配置下运行") +def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(16)) - handler = prcxi_simulation.handler - for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: - _ensure_unilabos_extra(well) - source = prcxi_simulation.source_plate.children[0] - targets = prcxi_simulation.target_plate["A1:E1"] - _assign_sample_uuid(source, "one_to_many_source") - offsets = _zero_offsets(max(len(targets), 1)) + source = DummyContainer("S0") + target = DummyContainer("T0") run( - handler.transfer_liquid( + lh.transfer_liquid( + sources=[source], + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=[5], + dis_vols=[5], + mix_stage="before", + mix_times=1, + mix_vol=3, + ) + ) + + names = [name for name, _ in lh.calls] + assert names.count("mix") == 1 + assert names.index("mix") < names.index("aspirate") + + +def test_one_to_one_eight_channel_groups_by_8(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(256)) + + sources = [DummyContainer(f"S{i}") for i in range(16)] + targets = [DummyContainer(f"T{i}") for i in range(16)] + asp_vols = list(range(1, 17)) + dis_vols = list(range(101, 117)) + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + mix_times=0, # 触发逻辑但不 mix + ) + ) + + # 16 个任务 -> 2 组,每组 8 通道一起做 + assert [c[0] for c in lh.calls].count("pick_up_tips") == 2 + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == 2 + assert len(dispenses) == 2 + + assert aspirates[0]["resources"] == sources[0:8] + assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]] + assert dispenses[1]["resources"] == targets[8:16] + assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]] + + +def test_one_to_one_eight_channel_requires_multiple_of_8_targets(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer(f"S{i}") for i in range(9)] + targets = [DummyContainer(f"T{i}") for i in range(9)] + + with pytest.raises(ValueError, match="multiple of 8"): + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=[1] * 9, + dis_vols=[1] * 9, + mix_times=0, + ) + ) + + +def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(512)) + + sources = [DummyContainer(f"S{i}") for i in range(16)] + targets = [DummyContainer(f"T{i}") for i in range(16)] + asp_vols = [i + 1 for i in range(16)] + dis_vols = [200 + i for i in range(16)] + asp_flow_rates = [0.1 * (i + 1) for i in range(16)] + dis_flow_rates = [0.2 * (i + 1) for i in range(16)] + offsets = [f"offset_{i}" for i in range(16)] + liquid_heights = [i * 0.5 for i in range(16)] + blow_out_air_volume = [i + 0.05 for i in range(16)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + asp_flow_rates=asp_flow_rates, + dis_flow_rates=dis_flow_rates, + offsets=offsets, + liquid_height=liquid_heights, + blow_out_air_volume=blow_out_air_volume, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == len(dispenses) == 2 + + for batch_idx in range(2): + start = batch_idx * 8 + end = start + 8 + asp_call = aspirates[batch_idx] + dis_call = dispenses[batch_idx] + assert asp_call["resources"] == sources[start:end] + assert asp_call["flow_rates"] == asp_flow_rates[start:end] + assert asp_call["offsets"] == offsets[start:end] + assert asp_call["liquid_height"] == liquid_heights[start:end] + assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end] + assert dis_call["flow_rates"] == dis_flow_rates[start:end] + assert dis_call["offsets"] == offsets[start:end] + assert dis_call["liquid_height"] == liquid_heights[start:end] + assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end] + + +def test_one_to_one_eight_channel_handles_32_tasks_four_batches(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(1024)) + + sources = [DummyContainer(f"S{i}") for i in range(32)] + targets = [DummyContainer(f"T{i}") for i in range(32)] + asp_vols = [i + 1 for i in range(32)] + dis_vols = [300 + i for i in range(32)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + mix_times=0, + ) + ) + + pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"] + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(pick_calls) == 4 + assert len(aspirates) == len(dispenses) == 4 + assert aspirates[0]["resources"] == sources[0:8] + assert aspirates[-1]["resources"] == sources[24:32] + assert dispenses[0]["resources"] == targets[0:8] + assert dispenses[-1]["resources"] == targets[24:32] + + +def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(64)) + + source = DummyContainer("SRC") + targets = [DummyContainer(f"T{i}") for i in range(3)] + dis_vols = [10, 20, 30] # sum=60 + + run( + lh.transfer_liquid( sources=[source], targets=targets, - tip_racks=[prcxi_simulation.tip_rack], + tip_racks=[], use_channels=[0], - asp_vols=10.0, - dis_vols=[2.0, 2.0, 2.0, 2.0, 2.0], - offsets=offsets, + asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60 + dis_vols=dis_vols, mix_times=0, ) ) - for target in targets: - assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "one_to_many_source" + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert len(aspirates) == 1 + assert aspirates[0]["resources"] == [source] + assert aspirates[0]["vols"] == [60.0] + assert aspirates[0]["use_channels"] == [0] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0] -def test_transfer_liquid_single_channel_many_to_one(prcxi_simulation: SimulationContext): - if prcxi_simulation.channel_num != 1: - pytest.skip("仅在单通道配置下运行") +def test_one_to_many_eight_channel_basic(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(128)) - handler = prcxi_simulation.handler - for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: - _ensure_unilabos_extra(well) - sources = prcxi_simulation.source_plate[0:3] - target = prcxi_simulation.target_plate.children[4] - for idx, src in enumerate(sources): - _assign_sample_uuid(src, f"many_to_one_{idx}") - offsets = _zero_offsets(max(len(sources), len([target]))) + source = DummyContainer("SRC") + targets = [DummyContainer(f"T{i}") for i in range(8)] + dis_vols = [i + 1 for i in range(8)] run( - handler.transfer_liquid( - sources=sources, - targets=[target], - tip_racks=[prcxi_simulation.tip_rack], - use_channels=[0], - asp_vols=[8.0, 9.0, 10.0], - dis_vols=1, - offsets=offsets, - mix_stage="after", - mix_times=1, - mix_vol=5, - ) - ) - - assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == "many_to_one_2" - - -def test_transfer_liquid_eight_channel_batches(prcxi_simulation: SimulationContext): - if prcxi_simulation.channel_num != 8: - pytest.skip("仅在八通道配置下运行") - - handler = prcxi_simulation.handler - for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: - _ensure_unilabos_extra(well) - sources = prcxi_simulation.source_plate[0:8] - targets = prcxi_simulation.target_plate[16:24] - for idx, src in enumerate(sources): - _assign_sample_uuid(src, f"batch_{idx}") - offsets = _zero_offsets(len(targets)) - - use_channels = list(range(8)) - asp_vols = [float(i + 1) * 2 for i in range(8)] - dis_vols = [float(i + 10) for i in range(8)] - - run( - handler.transfer_liquid( - sources=sources, + lh.transfer_liquid( + sources=[source], targets=targets, - tip_racks=[prcxi_simulation.tip_rack], - use_channels=use_channels, - asp_vols=asp_vols, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自) dis_vols=dis_vols, - offsets=offsets, mix_times=0, ) ) - _assert_samples_match(sources, targets) + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert aspirates[0]["resources"] == [source] * 8 + assert aspirates[0]["vols"] == [float(v) for v in dis_vols] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert dispenses[0]["resources"] == targets + assert dispenses[0]["vols"] == [float(v) for v in dis_vols] -@pytest.mark.parametrize("mix_stage", ["before", "after", "both"]) -def test_transfer_liquid_mix_stages(prcxi_simulation: SimulationContext, mix_stage: str): - if prcxi_simulation.channel_num != 1: - pytest.skip("仅在单通道配置下运行") +def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) - handler = prcxi_simulation.handler - for well in prcxi_simulation.source_plate.children + prcxi_simulation.target_plate.children: - _ensure_unilabos_extra(well) - target = prcxi_simulation.target_plate[70] - sources = prcxi_simulation.source_plate[80:82] - for idx, src in enumerate(sources): - _assign_sample_uuid(src, f"mix_stage_{mix_stage}_{idx}") + sources = [DummyContainer(f"S{i}") for i in range(3)] + target = DummyContainer("T") + asp_vols = [5, 6, 7] run( - handler.transfer_liquid( + lh.transfer_liquid( sources=sources, targets=[target], - tip_racks=[prcxi_simulation.tip_rack], + tip_racks=[], use_channels=[0], - asp_vols=[4.0, 5.0], - dis_vols=1, - offsets=_zero_offsets(len(sources)), - mix_stage=mix_stage, - mix_times=2, - mix_vol=3, + asp_vols=asp_vols, + dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol + mix_times=0, ) ) - # mix_stage 前后都应该保留最新源的 sample_uuid - assert getattr(target, "unilabos_extra", {}).get("sample_uuid") == f"mix_stage_{mix_stage}_1" - if prcxi_simulation.channel_num != 8: - pytest.skip("仅在八通道配置下运行") + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols] + assert all(d["resources"] == [target] for d in dispenses) - handler = prcxi_simulation.handler - sources = prcxi_simulation.source_plate[0:8] - targets = prcxi_simulation.target_plate[16:24] - for idx, src in enumerate(sources): - _assign_sample_uuid(src, f"batch_{idx}") - offsets = _zero_offsets(len(targets)) - use_channels = list(range(8)) - asp_vols = [float(i + 1) * 2 for i in range(8)] - dis_vols = [float(i + 10) for i in range(8)] +def test_many_to_one_single_channel_before_stage_mixes_target_once(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer("S0"), DummyContainer("S1")] + target = DummyContainer("T") run( - handler.transfer_liquid( + lh.transfer_liquid( sources=sources, - targets=targets, - tip_racks=[prcxi_simulation.tip_rack], - use_channels=use_channels, - asp_vols=asp_vols, - dis_vols=dis_vols, - offsets=offsets, - mix_stage="after", + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=[5, 6], + dis_vols=1, + mix_stage="before", mix_times=2, - mix_vol=3, + mix_vol=4, ) ) - _assert_samples_match(sources, targets) + names = [name for name, _ in lh.calls] + assert names[0] == "mix" + assert names.count("mix") == 1 + + +def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + target = DummyContainer("T") + asp_vols = [5, 6, 7] + dis_vols = [1, 2, 3] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=asp_vols, + dis_vols=dis_vols, # 比例模式 + mix_times=0, + ) + ) + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols] + + +def test_many_to_one_eight_channel_basic(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(256)) + + sources = [DummyContainer(f"S{i}") for i in range(8)] + target = DummyContainer("T") + asp_vols = [10 + i for i in range(8)] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert aspirates[0]["resources"] == sources + assert aspirates[0]["vols"] == [float(v) for v in asp_vols] + assert dispenses[0]["resources"] == [target] * 8 + assert dispenses[0]["vols"] == [float(v) for v in asp_vols] + + +def test_transfer_liquid_mode_detection_unsupported_shape_raises(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer("S0"), DummyContainer("S1")] + targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")] + + with pytest.raises(ValueError, match="Unsupported transfer mode"): + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=[1, 1], + dis_vols=[1, 1, 1], + mix_times=0, + ) + ) + diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 2a2facd..3af50cb 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -418,7 +418,7 @@ def main(): # 如果从远端获取了物料信息,则与本地物料进行同步 if request_startup_json and "nodes" in request_startup_json: print_status("开始同步远端物料到本地...", "info") - remote_tree_set = ResourceTreeSet.from_raw_list(request_startup_json["nodes"]) + remote_tree_set = ResourceTreeSet.from_raw_dict_list(request_startup_json["nodes"]) resource_tree_set.merge_remote_resources(remote_tree_set) print_status("远端物料同步完成", "info") diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 6da4f5f..23f139d 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -579,6 +579,8 @@ class MessageProcessor: elif message_type == "session_id": self.session_id = message_data.get("session_id") logger.info(f"[MessageProcessor] Session ID: {self.session_id}") + elif message_type == "request_reload": + await self._handle_request_reload(message_data) else: logger.debug(f"[MessageProcessor] Unknown message type: {message_type}") @@ -888,6 +890,20 @@ class MessageProcessor: ) thread.start() + async def _handle_request_reload(self, data: Dict[str, Any]): + """ + 处理重载请求 + + 当LabGo发送request_reload时,重新发送设备注册信息 + """ + reason = data.get("reason", "unknown") + logger.info(f"[MessageProcessor] Received reload request, reason: {reason}") + + # 重新发送host_node_ready信息 + if self.websocket_client: + self.websocket_client.publish_host_ready() + logger.info("[MessageProcessor] Re-sent host_node_ready after reload request") + async def _send_action_state_response( self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int ): @@ -1282,7 +1298,7 @@ class WebSocketClient(BaseCommunicationClient): self.message_processor.send_message(message) job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name) - logger.debug(f"[WebSocketClient] Job status published: {job_log} - {status}") + logger.trace(f"[WebSocketClient] Job status published: {job_log} - {status}") def send_ping(self, ping_id: str, timestamp: float) -> None: """发送ping消息""" @@ -1313,17 +1329,55 @@ class WebSocketClient(BaseCommunicationClient): logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}") def publish_host_ready(self) -> None: - """发布host_node ready信号""" + """发布host_node ready信号,包含设备和动作信息""" if self.is_disabled or not self.is_connected(): logger.debug("[WebSocketClient] Not connected, cannot publish host ready signal") return + # 收集设备信息 + devices = [] + machine_name = BasicConfig.machine_name + + try: + host_node = HostNode.get_instance(0) + if host_node: + # 获取设备信息 + for device_id, namespace in host_node.devices_names.items(): + device_key = f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}" + is_online = device_key in host_node._online_devices + + # 获取设备的动作信息 + actions = {} + for action_id, client in host_node._action_clients.items(): + # action_id 格式: /namespace/device_id/action_name + if device_id in action_id: + action_name = action_id.split("/")[-1] + actions[action_name] = { + "action_path": action_id, + "action_type": str(type(client).__name__), + } + + devices.append({ + "device_id": device_id, + "namespace": namespace, + "device_key": device_key, + "is_online": is_online, + "machine_name": host_node.device_machine_names.get(device_id, machine_name), + "actions": actions, + }) + + logger.info(f"[WebSocketClient] Collected {len(devices)} devices for host_ready") + except Exception as e: + logger.warning(f"[WebSocketClient] Error collecting device info: {e}") + message = { "action": "host_node_ready", "data": { "status": "ready", "timestamp": time.time(), + "machine_name": machine_name, + "devices": devices, }, } self.message_processor.send_message(message) - logger.info("[WebSocketClient] Host node ready signal published") + logger.info(f"[WebSocketClient] Host node ready signal published with {len(devices)} devices") diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 4541cb0..8744b45 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -134,7 +134,7 @@ def canonicalize_nodes_data( parent_instance.children.append(current_instance) # 第五步:创建 ResourceTreeSet - resource_tree_set = ResourceTreeSet.from_nested_list(standardized_instances) + resource_tree_set = ResourceTreeSet.from_nested_instance_list(standardized_instances) return resource_tree_set diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 16aba6d..e0b1b9a 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -5,7 +5,8 @@ import json import threading import time import traceback -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union +from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \ + Tuple from concurrent.futures import ThreadPoolExecutor import asyncio @@ -362,78 +363,82 @@ class BaseROS2DeviceNode(Node, Generic[T]): return res async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response): + from pylabrobot.resources.resource import Resource as ResourcePLR + from pylabrobot.resources.deck import Deck + from pylabrobot.resources import Coordinate + from pylabrobot.resources import Plate # 物料传输到对应的node节点 - rclient = self.create_client(ResourceAdd, "/resources/add") - rclient.wait_for_service() - rclient2 = self.create_client(ResourceAdd, "/resources/add") - rclient2.wait_for_service() - request = ResourceAdd.Request() - request2 = ResourceAdd.Request() + client = self._resource_clients["c2s_update_resource_tree"] + request = SerialCommand.Request() + request2 = SerialCommand.Request() command_json = json.loads(req.command) namespace = command_json["namespace"] bind_parent_id = command_json["bind_parent_id"] edge_device_id = command_json["edge_device_id"] location = command_json["bind_location"] other_calling_param = command_json["other_calling_param"] - resources = command_json["resource"] + input_resources = command_json["resource"] initialize_full = other_calling_param.pop("initialize_full", False) # 用来增加液体 ADD_LIQUID_TYPE = other_calling_param.pop("ADD_LIQUID_TYPE", []) - LIQUID_VOLUME = other_calling_param.pop("LIQUID_VOLUME", []) - LIQUID_INPUT_SLOT = other_calling_param.pop("LIQUID_INPUT_SLOT", []) + LIQUID_VOLUME: List[float] = other_calling_param.pop("LIQUID_VOLUME", []) + LIQUID_INPUT_SLOT: List[int] = other_calling_param.pop("LIQUID_INPUT_SLOT", []) slot = other_calling_param.pop("slot", "-1") - resource = None - if slot != "-1": # slot为负数的时候采用assign方法 + if slot != -1: # slot为负数的时候采用assign方法 other_calling_param["slot"] = slot - # 本地拿到这个物料,可能需要先做初始化? - if isinstance(resources, list): - if ( - len(resources) == 1 and isinstance(resources[0], list) and not initialize_full - ): # 取消,不存在的情况 - # 预先initialize过,以整组的形式传入 - request.resources = [convert_to_ros_msg(Resource, resource_) for resource_ in resources[0]] - elif initialize_full: - resources = initialize_resources(resources) - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources] - else: - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources] - else: - if initialize_full: - resources = initialize_resources([resources]) - request.resources = [convert_to_ros_msg(Resource, resources)] - if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1: - container_instance = request.resources[0] - container_query_dict: dict = resources + # 本地拿到这个物料,可能需要先做初始化 + if isinstance(input_resources, list) and initialize_full: + input_resources = initialize_resources(input_resources) + elif initialize_full: + input_resources = initialize_resources([input_resources]) + rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources) + parent_resource = None + if bind_parent_id != self.node_name: + parent_resource = self.resource_tracker.figure_resource( + {"name": bind_parent_id} + ) + for r in rts.root_nodes: + # noinspection PyUnresolvedReferences + r.res_content.parent_uuid = parent_resource.unilabos_uuid + + if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer): + # noinspection PyTypeChecker + container_instance: RegularContainer = rts.root_nodes[0] found_resources = self.resource_tracker.figure_resource( - {"id": container_query_dict["name"]}, try_mode=True + {"id": container_instance.name}, try_mode=True ) if not len(found_resources): self.resource_tracker.add_resource(container_instance) - logger.info(f"添加物料{container_query_dict['name']}到资源跟踪器") + logger.info(f"添加物料{container_instance.name}到资源跟踪器") else: assert ( len(found_resources) == 1 - ), f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统" - resource = found_resources[0] - if isinstance(resource, Resource): - regular_container = RegularContainer(resource.id) - regular_container.ulr_resource = resource - regular_container.ulr_resource_data.update(json.loads(container_instance.data)) - logger.info(f"更新物料{container_query_dict['name']}的数据{resource.data} ULR") - elif isinstance(resource, dict): - if "data" not in resource: - resource["data"] = {} - resource["data"].update(json.loads(container_instance.data)) - request.resources[0].name = resource["name"] - logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict") + ), f"找到多个同名物料: {container_instance.name}, 请检查物料系统" + found_resource = found_resources[0] + if isinstance(found_resource, RegularContainer): + logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") + found_resource.state.update(json.loads(container_instance.state)) + elif isinstance(found_resource, dict): + raise ValueError("已不支持 字典 版本的RegularContainer") else: logger.info( - f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}" + f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}" ) - response: ResourceAdd.Response = await rclient.call_async(request) - # 应该先add_resource了 + # noinspection PyUnresolvedReferences + request.command = json.dumps({ + "action": "add", + "data": { + "data": rts.dump(), + "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", + "first_add": True, + }, + }) + tree_response: SerialCommand.Response = await client.call_async(request) + uuid_maps = json.loads(tree_response.response) + self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) + self.lab_logger().info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes") final_response = { - "created_resources": [ROS2MessageInstance(i).get_python_dict() for i in request.resources], + "created_resources": rts.dump(), "liquid_input_resources": [], } res.response = json.dumps(final_response) @@ -458,59 +463,60 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) res.response = get_result_info_str(traceback.format_exc(), False, {}) return res - # 接下来该根据bind_parent_id进行assign了,目前只有plr可以进行assign,不然没有办法输入到物料系统中 - if bind_parent_id != self.node_name: - resource = self.resource_tracker.figure_resource( - {"name": bind_parent_id} - ) # 拿到父节点,进行具体assign等操作 - # request.resources = [convert_to_ros_msg(Resource, resources)] - try: - from pylabrobot.resources.resource import Resource as ResourcePLR - from pylabrobot.resources.deck import Deck - from pylabrobot.resources import Coordinate - from pylabrobot.resources import OTDeck - from pylabrobot.resources import Plate - - contain_model = not isinstance(resource, Deck) - if isinstance(resource, ResourcePLR): - # resources.list() - plr_instance = ResourceTreeSet.from_raw_list(resources).to_plr_resources()[0] - # resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources})) - # plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model) - + if len(rts.root_nodes) == 1 and parent_resource is not None: + plr_instance = rts.to_plr_resources()[0] if isinstance(plr_instance, Plate): - empty_liquid_info_in = [(None, 0)] * plr_instance.num_items + empty_liquid_info_in: List[Tuple[Optional[str], float]] = [(None, 0)] * plr_instance.num_items + if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1: + ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT) + LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT) + self.lab_logger().warning(f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个") for liquid_type, liquid_volume, liquid_input_slot in zip( ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT ): empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume) plr_instance.set_well_liquids(empty_liquid_info_in) - input_wells_ulr = [ - convert_to_ros_msg( - Resource, - resource_plr_to_ulab(plr_instance.get_well(LIQUID_INPUT_SLOT), with_children=False), - ) - for r in LIQUID_INPUT_SLOT - ] - final_response["liquid_input_resources"] = [ - ROS2MessageInstance(i).get_python_dict() for i in input_wells_ulr - ] + try: + # noinspection PyProtectedMember + keys = list(plr_instance._ordering.keys()) + for ind, r in enumerate(LIQUID_INPUT_SLOT[:]): + if isinstance(r, int): + # noinspection PyTypeChecker + LIQUID_INPUT_SLOT[ind] = keys[r] + input_wells = [plr_instance.get_well(r) for r in LIQUID_INPUT_SLOT] + except AttributeError: + # 按照id回去失败,回退到children + input_wells = [] + for r in LIQUID_INPUT_SLOT: + input_wells.append(plr_instance.children[r]) + final_response["liquid_input_resources"] = ResourceTreeSet.from_plr_resources(input_wells).dump() res.response = json.dumps(final_response) - if isinstance(resource, OTDeck) and "slot" in other_calling_param: + if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param: other_calling_param["slot"] = int(other_calling_param["slot"]) - resource.assign_child_at_slot(plr_instance, **other_calling_param) + parent_resource.assign_child_at_slot(plr_instance, **other_calling_param) else: - _discard_slot = other_calling_param.pop("slot", "-1") - resource.assign_child_resource( + _discard_slot = other_calling_param.pop("slot", -1) + parent_resource.assign_child_resource( plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param, ) - request2.resources = [ - convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)]) - ] - rclient2.call(request2) + # 调整了液体以及Deck之后要重新Assign + # noinspection PyUnresolvedReferences + request.command = json.dumps({ + "action": "add", + "data": { + "data": ResourceTreeSet.from_plr_resources([parent_resource]).dump(), + "mount_uuid": parent_resource.parent.unilabos_uuid if parent_resource.parent is not None else self.uuid, + "first_add": False, + }, + }) + tree_response: SerialCommand.Response = await client.call_async(request) + uuid_maps = json.loads(tree_response.response) + self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) + self._lab_logger.info(f"Resource tree added. UUID mapping: {len(uuid_maps)} nodes") + # 这里created_resources不包含parent_resource # 发送给ResourceMeshManager action_client = ActionClient( self, @@ -521,7 +527,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): goal = SendCmd.Goal() goal.command = json.dumps( { - "resources": resources, + "resources": input_resources, "bind_parent_id": bind_parent_id, } ) @@ -614,7 +620,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) ) # type: ignore raw_nodes = json.loads(response.response) - tree_set = ResourceTreeSet.from_raw_list(raw_nodes) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_nodes) self.lab_logger().debug(f"获取资源结果: {len(tree_set.trees)} 个资源树") return tree_set @@ -642,7 +648,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): raw_data = json.loads(response.response) # 转换为 PLR 资源 - tree_set = ResourceTreeSet.from_raw_list(raw_data) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) plr_resource = tree_set.to_plr_resources()[0] self.lab_logger().debug(f"获取资源 {resource_id} 成功") return plr_resource @@ -1523,7 +1529,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): raw_data = json.loads(response.response) # 转换为 PLR 资源 - tree_set = ResourceTreeSet.from_raw_list(raw_data) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) plr_resource = tree_set.to_plr_resources()[0] # 通过资源跟踪器获取本地实例 diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index ebc4492..971b6dd 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -45,6 +45,7 @@ from unilabos.ros.nodes.resource_tracker import ( ) from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid +from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot @@ -180,7 +181,7 @@ class HostNode(BaseROS2DeviceNode): for plr_resource in ResourceTreeSet([tree]).to_plr_resources(): self._resource_tracker.add_resource(plr_resource) except Exception as ex: - self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!") + warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!") except Exception as ex: logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}") # 初始化Node基类,传递空参数覆盖列表 @@ -455,10 +456,10 @@ class HostNode(BaseROS2DeviceNode): async def create_resource( self, - device_id: str, + device_id: DeviceSlot, res_id: str, class_name: str, - parent: str, + parent: ResourceSlot, bind_locations: Point, liquid_input_slot: list[int] = [], liquid_type: list[str] = [], @@ -805,7 +806,7 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}") if goal_status != GoalStatus.STATUS_CANCELED: - self.lab_logger().debug(f"[Host Node] Result data: {result_data}") + self.lab_logger().trace(f"[Host Node] Result data: {result_data}") # 清理 _goals 中的记录 if job_id in self._goals: diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 7325dda..91cd0ef 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -244,7 +244,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): r ) # type: ignore raw_data = json.loads(response.response) - tree_set = ResourceTreeSet.from_raw_list(raw_data) + tree_set = ResourceTreeSet.from_raw_dict_list(raw_data) target = tree_set.dump() protocol_kwargs[k] = target[0][0] if v == "unilabos_msgs/Resource" else target except Exception as ex: diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index aaa8079..09e7749 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -523,7 +523,7 @@ class ResourceTreeSet(object): return plr_resources @classmethod - def from_raw_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet": + def from_raw_dict_list(cls, raw_list: List[Dict[str, Any]]) -> "ResourceTreeSet": """ 从原始字典列表创建 ResourceTreeSet,自动建立 parent-children 关系 @@ -573,10 +573,10 @@ class ResourceTreeSet(object): parent_instance.children.append(instance) # 第四步:使用 from_nested_list 创建 ResourceTreeSet - return cls.from_nested_list(instances) + return cls.from_nested_instance_list(instances) @classmethod - def from_nested_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet": + def from_nested_instance_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet": """ 从扁平化的资源列表创建ResourceTreeSet,自动按根节点分组 @@ -785,7 +785,7 @@ class ResourceTreeSet(object): """ nested_lists = [] for tree_data in data: - nested_lists.extend(ResourceTreeSet.from_raw_list(tree_data).trees) + nested_lists.extend(ResourceTreeSet.from_raw_dict_list(tree_data).trees) return cls(nested_lists) @@ -965,7 +965,7 @@ class DeviceNodeResourceTracker(object): if current_uuid in self.uuid_to_resources: self.uuid_to_resources.pop(current_uuid) self.uuid_to_resources[new_uuid] = res - logger.debug(f"更新uuid: {current_uuid} -> {new_uuid}") + logger.trace(f"更新uuid: {current_uuid} -> {new_uuid}") replaced = 1 return replaced