diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..5c26f48 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,7 @@ +""" +测试包根目录。 + +让 `tests.*` 模块可以被正常 import(例如给 `unilabos` 下的测试入口使用)。 +""" + + diff --git a/tests/devices/__init__.py b/tests/devices/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/devices/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/devices/liquid_handling/__init__.py b/tests/devices/liquid_handling/__init__.py new file mode 100644 index 0000000..b16b30e --- /dev/null +++ b/tests/devices/liquid_handling/__init__.py @@ -0,0 +1,5 @@ +""" +液体处理设备相关测试。 +""" + + diff --git a/tests/devices/liquid_handling/test_transfer_liquid.py b/tests/devices/liquid_handling/test_transfer_liquid.py new file mode 100644 index 0000000..9896aac --- /dev/null +++ b/tests/devices/liquid_handling/test_transfer_liquid.py @@ -0,0 +1,505 @@ +import asyncio +from dataclasses import dataclass +from typing import Any, Iterable, List, Optional, Sequence, Tuple + +import pytest + +from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract + + +@dataclass(frozen=True) +class DummyContainer: + name: str + + def __repr__(self) -> str: # pragma: no cover + return f"DummyContainer({self.name})" + + +@dataclass(frozen=True) +class DummyTipSpot: + name: str + + def __repr__(self) -> str: # pragma: no cover + return f"DummyTipSpot({self.name})" + + +def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]: + """Yield lists so code can safely call `tip.extend(next(self.current_tip))`.""" + for i in range(n): + yield [DummyTipSpot(f"tip_{i}")] + + +class FakeLiquidHandler(LiquidHandlerAbstract): + """不初始化真实 backend/deck;仅用来记录 transfer_liquid 内部调用序列。""" + + def __init__(self, channel_num: int = 8): + # 不调用 super().__init__,避免真实硬件/后端依赖 + self.channel_num = channel_num + self.support_touch_tip = True + self.current_tip = iter(make_tip_iter()) + self.calls: List[Tuple[str, Any]] = [] + + async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs): + self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels})) + + async def aspirate( + self, + resources: Sequence[Any], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Any = None, + liquid_height: Any = None, + blow_out_air_volume: Any = None, + spread: str = "wide", + **backend_kwargs, + ): + self.calls.append( + ( + "aspirate", + { + "resources": list(resources), + "vols": list(vols), + "use_channels": list(use_channels) if use_channels is not None else None, + "flow_rates": list(flow_rates) if flow_rates is not None else None, + "offsets": list(offsets) if offsets is not None else None, + "liquid_height": list(liquid_height) if liquid_height is not None else None, + "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, + }, + ) + ) + + async def dispense( + self, + resources: Sequence[Any], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Any = None, + liquid_height: Any = None, + blow_out_air_volume: Any = None, + spread: str = "wide", + **backend_kwargs, + ): + self.calls.append( + ( + "dispense", + { + "resources": list(resources), + "vols": list(vols), + "use_channels": list(use_channels) if use_channels is not None else None, + "flow_rates": list(flow_rates) if flow_rates is not None else None, + "offsets": list(offsets) if offsets is not None else None, + "liquid_height": list(liquid_height) if liquid_height is not None else None, + "blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None, + }, + ) + ) + + async def discard_tips(self, use_channels=None, *args, **kwargs): + # 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数) + self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None})) + + async def custom_delay(self, seconds=0, msg=None): + self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg})) + + async def touch_tip(self, targets): + # 原实现会访问 targets.get_size_x() 等;测试里只记录调用 + self.calls.append(("touch_tip", {"targets": targets})) + + async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None): + self.calls.append( + ( + "mix", + { + "targets": targets, + "mix_time": mix_time, + "mix_vol": mix_vol, + }, + ) + ) + + +def run(coro): + return asyncio.run(coro) + + +def test_one_to_one_single_channel_basic_calls(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + targets = [DummyContainer(f"T{i}") for i in range(3)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=[1, 2, 3], + dis_vols=[4, 5, 6], + mix_times=None, # 应该仍能执行(不 mix) + ) + ) + + assert [c[0] for c in lh.calls].count("pick_up_tips") == 3 + assert [c[0] for c in lh.calls].count("aspirate") == 3 + assert [c[0] for c in lh.calls].count("dispense") == 3 + assert [c[0] for c in lh.calls].count("discard_tips") == 3 + + # 每次 aspirate/dispense 都是单孔列表 + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert aspirates[0]["resources"] == [sources[0]] + assert aspirates[0]["vols"] == [1.0] + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert dispenses[2]["resources"] == [targets[2]] + assert dispenses[2]["vols"] == [6.0] + + +def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(16)) + + source = DummyContainer("S0") + target = DummyContainer("T0") + + run( + lh.transfer_liquid( + sources=[source], + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=[5], + dis_vols=[5], + mix_stage="before", + mix_times=1, + mix_vol=3, + ) + ) + + names = [name for name, _ in lh.calls] + assert names.count("mix") == 1 + assert names.index("mix") < names.index("aspirate") + + +def test_one_to_one_eight_channel_groups_by_8(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(256)) + + sources = [DummyContainer(f"S{i}") for i in range(16)] + targets = [DummyContainer(f"T{i}") for i in range(16)] + asp_vols = list(range(1, 17)) + dis_vols = list(range(101, 117)) + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + mix_times=0, # 触发逻辑但不 mix + ) + ) + + # 16 个任务 -> 2 组,每组 8 通道一起做 + assert [c[0] for c in lh.calls].count("pick_up_tips") == 2 + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == 2 + assert len(dispenses) == 2 + + assert aspirates[0]["resources"] == sources[0:8] + assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]] + assert dispenses[1]["resources"] == targets[8:16] + assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]] + + +def test_one_to_one_eight_channel_requires_multiple_of_8_targets(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer(f"S{i}") for i in range(9)] + targets = [DummyContainer(f"T{i}") for i in range(9)] + + with pytest.raises(ValueError, match="multiple of 8"): + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=[1] * 9, + dis_vols=[1] * 9, + mix_times=0, + ) + ) + + +def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(512)) + + sources = [DummyContainer(f"S{i}") for i in range(16)] + targets = [DummyContainer(f"T{i}") for i in range(16)] + asp_vols = [i + 1 for i in range(16)] + dis_vols = [200 + i for i in range(16)] + asp_flow_rates = [0.1 * (i + 1) for i in range(16)] + dis_flow_rates = [0.2 * (i + 1) for i in range(16)] + offsets = [f"offset_{i}" for i in range(16)] + liquid_heights = [i * 0.5 for i in range(16)] + blow_out_air_volume = [i + 0.05 for i in range(16)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + asp_flow_rates=asp_flow_rates, + dis_flow_rates=dis_flow_rates, + offsets=offsets, + liquid_height=liquid_heights, + blow_out_air_volume=blow_out_air_volume, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(aspirates) == len(dispenses) == 2 + + for batch_idx in range(2): + start = batch_idx * 8 + end = start + 8 + asp_call = aspirates[batch_idx] + dis_call = dispenses[batch_idx] + assert asp_call["resources"] == sources[start:end] + assert asp_call["flow_rates"] == asp_flow_rates[start:end] + assert asp_call["offsets"] == offsets[start:end] + assert asp_call["liquid_height"] == liquid_heights[start:end] + assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end] + assert dis_call["flow_rates"] == dis_flow_rates[start:end] + assert dis_call["offsets"] == offsets[start:end] + assert dis_call["liquid_height"] == liquid_heights[start:end] + assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end] + + +def test_one_to_one_eight_channel_handles_32_tasks_four_batches(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(1024)) + + sources = [DummyContainer(f"S{i}") for i in range(32)] + targets = [DummyContainer(f"T{i}") for i in range(32)] + asp_vols = [i + 1 for i in range(32)] + dis_vols = [300 + i for i in range(32)] + + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=dis_vols, + mix_times=0, + ) + ) + + pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"] + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert len(pick_calls) == 4 + assert len(aspirates) == len(dispenses) == 4 + assert aspirates[0]["resources"] == sources[0:8] + assert aspirates[-1]["resources"] == sources[24:32] + assert dispenses[0]["resources"] == targets[0:8] + assert dispenses[-1]["resources"] == targets[24:32] + + +def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(64)) + + source = DummyContainer("SRC") + targets = [DummyContainer(f"T{i}") for i in range(3)] + dis_vols = [10, 20, 30] # sum=60 + + run( + lh.transfer_liquid( + sources=[source], + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60 + dis_vols=dis_vols, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert len(aspirates) == 1 + assert aspirates[0]["resources"] == [source] + assert aspirates[0]["vols"] == [60.0] + assert aspirates[0]["use_channels"] == [0] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0] + + +def test_one_to_many_eight_channel_basic(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(128)) + + source = DummyContainer("SRC") + targets = [DummyContainer(f"T{i}") for i in range(8)] + dis_vols = [i + 1 for i in range(8)] + + run( + lh.transfer_liquid( + sources=[source], + targets=targets, + tip_racks=[], + use_channels=list(range(8)), + asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自) + dis_vols=dis_vols, + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + assert aspirates[0]["resources"] == [source] * 8 + assert aspirates[0]["vols"] == [float(v) for v in dis_vols] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert dispenses[0]["resources"] == targets + assert dispenses[0]["vols"] == [float(v) for v in dis_vols] + + +def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + target = DummyContainer("T") + asp_vols = [5, 6, 7] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=asp_vols, + dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol + mix_times=0, + ) + ) + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols] + assert all(d["resources"] == [target] for d in dispenses) + + +def test_many_to_one_single_channel_before_stage_mixes_target_once(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer("S0"), DummyContainer("S1")] + target = DummyContainer("T") + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=[5, 6], + dis_vols=1, + mix_stage="before", + mix_times=2, + mix_vol=4, + ) + ) + + names = [name for name, _ in lh.calls] + assert names[0] == "mix" + assert names.count("mix") == 1 + + +def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source(): + lh = FakeLiquidHandler(channel_num=1) + lh.current_tip = iter(make_tip_iter(128)) + + sources = [DummyContainer(f"S{i}") for i in range(3)] + target = DummyContainer("T") + asp_vols = [5, 6, 7] + dis_vols = [1, 2, 3] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=[0], + asp_vols=asp_vols, + dis_vols=dis_vols, # 比例模式 + mix_times=0, + ) + ) + + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols] + + +def test_many_to_one_eight_channel_basic(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(256)) + + sources = [DummyContainer(f"S{i}") for i in range(8)] + target = DummyContainer("T") + asp_vols = [10 + i for i in range(8)] + + run( + lh.transfer_liquid( + sources=sources, + targets=[target], + tip_racks=[], + use_channels=list(range(8)), + asp_vols=asp_vols, + dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol + mix_times=0, + ) + ) + + aspirates = [payload for name, payload in lh.calls if name == "aspirate"] + dispenses = [payload for name, payload in lh.calls if name == "dispense"] + assert aspirates[0]["resources"] == sources + assert aspirates[0]["vols"] == [float(v) for v in asp_vols] + assert dispenses[0]["resources"] == [target] * 8 + assert dispenses[0]["vols"] == [float(v) for v in asp_vols] + + +def test_transfer_liquid_mode_detection_unsupported_shape_raises(): + lh = FakeLiquidHandler(channel_num=8) + lh.current_tip = iter(make_tip_iter(64)) + + sources = [DummyContainer("S0"), DummyContainer("S1")] + targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")] + + with pytest.raises(ValueError, match="Unsupported transfer mode"): + run( + lh.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=[], + use_channels=[0], + asp_vols=[1, 1], + dis_vols=[1, 1, 1], + mix_times=0, + ) + ) + diff --git a/tests/ros/msgs/test_runner.py b/tests/ros/msgs/test_runner.py index fe4cb09..02d352d 100644 --- a/tests/ros/msgs/test_runner.py +++ b/tests/ros/msgs/test_runner.py @@ -11,10 +11,10 @@ import os # 添加项目根目录到路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) -# 导入测试模块 -from test.ros.msgs.test_basic import TestBasicFunctionality -from test.ros.msgs.test_conversion import TestBasicConversion, TestMappingConversion -from test.ros.msgs.test_mapping import TestTypeMapping, TestFieldMapping +# 导入测试模块(统一从 tests 包获取) +from tests.ros.msgs.test_basic import TestBasicFunctionality +from tests.ros.msgs.test_conversion import TestBasicConversion, TestMappingConversion +from tests.ros.msgs.test_mapping import TestTypeMapping, TestFieldMapping def run_tests(): diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 8c44712..6da4f5f 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -1240,7 +1240,7 @@ class WebSocketClient(BaseCommunicationClient): }, } self.message_processor.send_message(message) - logger.debug(f"[WebSocketClient] Device status published: {device_id}.{property_name}") + logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") def publish_job_status( self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 96289ff..d02129c 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -1042,11 +1042,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): One or more TipRacks providing fresh tips. is_96_well Set *True* to use the 96‑channel head. + mix_stage + When to mix the target wells relative to dispensing. Default "none" means + no mixing occurs even if mix_times is provided. Use "before", "after", or + "both" to mix at the corresponding stage(s). + mix_times + Number of mix cycles. If *None* (default) no mixing occurs regardless of + mix_stage. """ # 确保 use_channels 有默认值 if use_channels is None: - use_channels = [0] if self.channel_num >= 1 else list(range(self.channel_num)) + # 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7) + use_channels = list(range(self.channel_num)) if self.channel_num > 0 else [0] if is_96_well: pass # This mode is not verified. @@ -1074,42 +1082,42 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if mix_times is not None: mix_times = int(mix_times) - # 识别传输模式 - num_sources = len(sources) - num_targets = len(targets) - - if num_sources == 1 and num_targets > 1: - # 模式1: 一对多 (1 source -> N targets) - await self._transfer_one_to_many( - sources[0], targets, tip_racks, use_channels, - asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, - offsets, touch_tip, liquid_height, blow_out_air_volume, - spread, mix_stage, mix_times, mix_vol, mix_rate, - mix_liquid_height, delays - ) - elif num_sources > 1 and num_targets == 1: - # 模式2: 多对一 (N sources -> 1 target) - await self._transfer_many_to_one( - sources, targets[0], tip_racks, use_channels, - asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, - offsets, touch_tip, liquid_height, blow_out_air_volume, - spread, mix_stage, mix_times, mix_vol, mix_rate, - mix_liquid_height, delays - ) - elif num_sources == num_targets: - # 模式3: 一对一 (N sources -> N targets) - 原有逻辑 - await self._transfer_one_to_one( - sources, targets, tip_racks, use_channels, - asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, - offsets, touch_tip, liquid_height, blow_out_air_volume, - spread, mix_stage, mix_times, mix_vol, mix_rate, - mix_liquid_height, delays - ) - else: - raise ValueError( - f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. " - "Supported modes: 1->N, N->1, or N->N." - ) + # 识别传输模式(mix_times 为 None 也应该能正常移液,只是不做 mix) + num_sources = len(sources) + num_targets = len(targets) + + if num_sources == 1 and num_targets > 1: + # 模式1: 一对多 (1 source -> N targets) + await self._transfer_one_to_many( + sources[0], targets, tip_racks, use_channels, + asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, + offsets, touch_tip, liquid_height, blow_out_air_volume, + spread, mix_stage, mix_times, mix_vol, mix_rate, + mix_liquid_height, delays + ) + elif num_sources > 1 and num_targets == 1: + # 模式2: 多对一 (N sources -> 1 target) + await self._transfer_many_to_one( + sources, targets[0], tip_racks, use_channels, + asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, + offsets, touch_tip, liquid_height, blow_out_air_volume, + spread, mix_stage, mix_times, mix_vol, mix_rate, + mix_liquid_height, delays + ) + elif num_sources == num_targets: + # 模式3: 一对一 (N sources -> N targets) + await self._transfer_one_to_one( + sources, targets, tip_racks, use_channels, + asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, + offsets, touch_tip, liquid_height, blow_out_air_volume, + spread, mix_stage, mix_times, mix_vol, mix_rate, + mix_liquid_height, delays + ) + else: + raise ValueError( + f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. " + "Supported modes: 1->N, N->1, or N->N." + ) async def _transfer_one_to_one( self, @@ -1149,6 +1157,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=[targets[_]], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + await self.aspirate( resources=[sources[_]], vols=[asp_vols[_]], @@ -1209,6 +1227,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=current_targets, + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + await self.aspirate( resources=current_reagent_sources, vols=current_asp_vols, @@ -1290,6 +1318,17 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + for idx, target in enumerate(targets): + await self.mix( + targets=[target], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[idx:idx + 1] if offsets and len(offsets) > idx else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + # 从源容器吸液(总体积) await self.aspirate( resources=[source], @@ -1354,6 +1393,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8 current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8 + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=current_targets, + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[i:i + 8] if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + # 从源容器吸液(8个通道都从同一个源,但每个通道的吸液体积不同) await self.aspirate( resources=[source] * 8, # 8个通道都从同一个源 @@ -1452,8 +1501,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): # 单通道模式:多次吸液,一次分液 # 先混合前(如果需要) if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: - # 注意:在吸液前混合源容器通常不常见,这里跳过 - pass + await self.mix( + targets=[target], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[0:1] if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) # 从每个源容器吸液并分液到目标容器 for idx, source in enumerate(sources): @@ -1528,6 +1583,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.") # 每次处理8个源 + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=[target], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[0:1] if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + for i in range(0, len(sources), 8): tip = [] for _ in range(len(use_channels)): diff --git a/unilabos/devices/liquid_handling/test_transfer_liquid.py b/unilabos/devices/liquid_handling/test_transfer_liquid.py new file mode 100644 index 0000000..f13980f --- /dev/null +++ b/unilabos/devices/liquid_handling/test_transfer_liquid.py @@ -0,0 +1,13 @@ +""" +说明: +这里放一个“入口文件”,方便在 `unilabos/devices/liquid_handling` 目录下直接找到 +`transfer_liquid` 的测试。 + +实际测试用例实现放在仓库标准测试目录: +`tests/devices/liquid_handling/test_transfer_liquid.py` +""" + +# 让 pytest 能从这里发现同一套测试(避免复制两份测试代码)。 +from tests.devices.liquid_handling.test_transfer_liquid import * # noqa: F401,F403 + + diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 7c53163..16aba6d 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -21,6 +21,7 @@ from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.service import Service from unilabos_msgs.action import SendCmd from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response +from unilabos.utils.decorator import get_topic_config, get_all_subscriptions from unilabos.resources.container import RegularContainer from unilabos.resources.graphio import ( @@ -48,7 +49,8 @@ from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import ( DeviceNodeResourceTracker, ResourceTreeSet, - ResourceTreeInstance, ResourceDictInstance, + ResourceTreeInstance, + ResourceDictInstance, ) from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator @@ -168,6 +170,7 @@ class PropertyPublisher: msg_type, initial_period: float = 5.0, print_publish=True, + qos: int = 10, ): self.node = node self.name = name @@ -175,10 +178,11 @@ class PropertyPublisher: self.get_method = get_method self.timer_period = initial_period self.print_publish = print_publish + self.qos = qos self._value = None try: - self.publisher_ = node.create_publisher(msg_type, f"{name}", 10) + self.publisher_ = node.create_publisher(msg_type, f"{name}", qos) except AttributeError as ex: self.node.lab_logger().error( f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}" @@ -186,7 +190,7 @@ class PropertyPublisher: self.timer = node.create_timer(self.timer_period, self.publish_property) self.__loop = get_event_loop() str_msg_type = str(msg_type)[8:-2] - self.node.lab_logger().trace(f"发布属性: {name}, 类型: {str_msg_type}, 周期: {initial_period}秒") + self.node.lab_logger().trace(f"发布属性: {name}, 类型: {str_msg_type}, 周期: {initial_period}秒, QoS: {qos}") def get_property(self): if asyncio.iscoroutinefunction(self.get_method): @@ -326,6 +330,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): continue self.create_ros_action_server(action_name, action_value_mapping) + # 创建订阅者(通过 @subscribe 装饰器) + self._topic_subscribers: Dict[str, Any] = {} + self._setup_decorated_subscribers() + # 创建线程池执行器 self._executor = ThreadPoolExecutor( max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}" @@ -1043,6 +1051,29 @@ class BaseROS2DeviceNode(Node, Generic[T]): def create_ros_publisher(self, attr_name, msg_type, initial_period=5.0): """创建ROS发布者""" + # 检测装饰器配置(支持 get_{attr_name} 方法和 @property) + topic_config = {} + + # 优先检测 get_{attr_name} 方法 + if hasattr(self.driver_instance, f"get_{attr_name}"): + getter_method = getattr(self.driver_instance, f"get_{attr_name}") + topic_config = get_topic_config(getter_method) + + # 如果没有配置,检测 @property 装饰的属性 + if not topic_config: + driver_class = type(self.driver_instance) + if hasattr(driver_class, attr_name): + class_attr = getattr(driver_class, attr_name) + if isinstance(class_attr, property) and class_attr.fget is not None: + topic_config = get_topic_config(class_attr.fget) + + # 使用装饰器配置或默认值 + cfg_period = topic_config.get("period") + cfg_print = topic_config.get("print_publish") + cfg_qos = topic_config.get("qos") + period: float = cfg_period if cfg_period is not None else initial_period + print_publish: bool = cfg_print if cfg_print is not None else self._print_publish + qos: int = cfg_qos if cfg_qos is not None else 10 # 获取属性值的方法 def get_device_attr(): @@ -1063,7 +1094,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().error(traceback.format_exc()) self._property_publishers[attr_name] = PropertyPublisher( - self, attr_name, get_device_attr, msg_type, initial_period, self._print_publish + self, attr_name, get_device_attr, msg_type, period, print_publish, qos ) def create_ros_action_server(self, action_name, action_value_mapping): @@ -1081,6 +1112,76 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") + def _setup_decorated_subscribers(self): + """扫描 driver_instance 中带有 @subscribe 装饰器的方法并创建订阅者""" + subscriptions = get_all_subscriptions(self.driver_instance) + + for method_name, method, config in subscriptions: + topic_template = config.get("topic") + msg_type = config.get("msg_type") + qos = config.get("qos", 10) + + if not topic_template: + self.lab_logger().warning(f"订阅方法 {method_name} 缺少 topic 配置,跳过") + continue + + # 如果没有指定 msg_type,尝试从类型注解推断 + if msg_type is None: + try: + hints = get_type_hints(method) + # 第一个参数是 self,第二个是 msg + param_names = list(hints.keys()) + if param_names: + msg_type = hints[param_names[0]] + except Exception: + pass + + if msg_type is None: + self.lab_logger().warning(f"订阅方法 {method_name} 缺少 msg_type 配置且无法从类型注解推断,跳过") + continue + + # 替换 topic 模板中的占位符 + topic = self._resolve_topic_template(topic_template) + + self.create_ros_subscriber(topic, msg_type, method, qos) + + def _resolve_topic_template(self, topic_template: str) -> str: + """ + 解析 topic 模板,替换占位符 + + 支持的占位符: + - {device_id}: 设备ID + - {namespace}: 完整命名空间 + """ + return topic_template.format( + device_id=self.device_id, + namespace=self.namespace, + ) + + def create_ros_subscriber(self, topic: str, msg_type, callback, qos: int = 10): + """ + 创建ROS订阅者 + + Args: + topic: Topic 名称 + msg_type: ROS 消息类型 + callback: 回调方法(会自动绑定到 driver_instance) + qos: QoS 深度配置 + """ + try: + subscription = self.create_subscription( + msg_type, + topic, + callback, + qos, + callback_group=self.callback_group, + ) + self._topic_subscribers[topic] = subscription + str_msg_type = str(msg_type)[8:-2] if str(msg_type).startswith(" str: + return "Idle" - 支持多种泵送模式,具有高精度流量控制和自动校准功能。 - 适用于实验室自动化系统中的液体处理任务。 - """ - - _ros_node: BaseROS2DeviceNode - - def __init__(self, device_id: str = "smart_pump_01", port: str = "/dev/ttyUSB0"): - """ - 初始化智能泵控制器 - - Args: - device_id: 设备唯一标识符 - port: 通信端口 - """ - self.device_id = device_id - self.port = port - self.is_connected = False - self.current_flow_rate = 0.0 - self.total_volume_pumped = 0.0 - self.calibration_factor = 1.0 - self.pump_mode = "continuous" # continuous, volume, rate - - def post_init(self, ros_node: BaseROS2DeviceNode): - self._ros_node = ros_node - - def connect_device(self, timeout: int = 10) -> bool: - """ - 连接到泵设备 - - Args: - timeout: 连接超时时间(秒) - - Returns: - bool: 连接是否成功 - """ - # 模拟连接过程 - self.is_connected = True + async def action(self, addr: str) -> bool: return True + + + + def disconnect_device(self) -> bool: """ 断开设备连接 diff --git a/unilabos/utils/decorator.py b/unilabos/utils/decorator.py index 77e473c..667f353 100644 --- a/unilabos/utils/decorator.py +++ b/unilabos/utils/decorator.py @@ -1,3 +1,9 @@ +from functools import wraps +from typing import Any, Callable, Optional, TypeVar + +F = TypeVar("F", bound=Callable[..., Any]) + + def singleton(cls): """ 单例装饰器 @@ -12,3 +18,167 @@ def singleton(cls): return get_instance + +def topic_config( + period: Optional[float] = None, + print_publish: Optional[bool] = None, + qos: Optional[int] = None, +) -> Callable[[F], F]: + """ + Topic发布配置装饰器 + + 用于装饰 get_{attr_name} 方法或 @property,控制对应属性的ROS topic发布行为。 + + Args: + period: 发布周期(秒)。None 表示使用默认值 5.0 + print_publish: 是否打印发布日志。None 表示使用节点默认配置 + qos: QoS深度配置。None 表示使用默认值 10 + + Example: + class MyDriver: + # 方式1: 装饰 get_{attr_name} 方法 + @topic_config(period=1.0, print_publish=False, qos=5) + def get_temperature(self): + return self._temperature + + # 方式2: 与 @property 连用(topic_config 放在下面) + @property + @topic_config(period=0.1) + def position(self): + return self._position + + Note: + 与 @property 连用时,@topic_config 必须放在 @property 下面, + 这样装饰器执行顺序为:先 topic_config 添加配置,再 property 包装。 + """ + + def decorator(func: F) -> F: + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + # 在函数上附加配置属性 (type: ignore 用于动态属性) + wrapper._topic_period = period # type: ignore[attr-defined] + wrapper._topic_print_publish = print_publish # type: ignore[attr-defined] + wrapper._topic_qos = qos # type: ignore[attr-defined] + wrapper._has_topic_config = True # type: ignore[attr-defined] + + return wrapper # type: ignore[return-value] + + return decorator + + +def get_topic_config(func) -> dict: + """ + 获取函数上的topic配置 + + Args: + func: 被装饰的函数 + + Returns: + 包含 period, print_publish, qos 的配置字典 + """ + if hasattr(func, "_has_topic_config") and getattr(func, "_has_topic_config", False): + return { + "period": getattr(func, "_topic_period", None), + "print_publish": getattr(func, "_topic_print_publish", None), + "qos": getattr(func, "_topic_qos", None), + } + return {} + + +def subscribe( + topic: str, + msg_type: Optional[type] = None, + qos: int = 10, +) -> Callable[[F], F]: + """ + Topic订阅装饰器 + + 用于装饰 driver 类中的方法,使其成为 ROS topic 的订阅回调。 + 当 ROS2DeviceNode 初始化时,会自动扫描并创建对应的订阅者。 + + Args: + topic: Topic 名称模板,支持以下占位符: + - {device_id}: 设备ID (如 "pump_1") + - {namespace}: 完整命名空间 (如 "/devices/pump_1") + msg_type: ROS 消息类型。如果为 None,需要在回调函数的类型注解中指定 + qos: QoS 深度配置,默认为 10 + + Example: + from std_msgs.msg import String, Float64 + + class MyDriver: + @subscribe(topic="/devices/{device_id}/set_speed", msg_type=Float64) + def on_speed_update(self, msg: Float64): + self._speed = msg.data + print(f"Speed updated to: {self._speed}") + + @subscribe(topic="{namespace}/command") + def on_command(self, msg: String): + # msg_type 可从类型注解推断 + self.execute_command(msg.data) + + Note: + - 回调方法的第一个参数是 self,第二个参数是收到的 ROS 消息 + - topic 中的占位符会在创建订阅时被实际值替换 + """ + + def decorator(func: F) -> F: + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + # 在函数上附加订阅配置 + wrapper._subscribe_topic = topic # type: ignore[attr-defined] + wrapper._subscribe_msg_type = msg_type # type: ignore[attr-defined] + wrapper._subscribe_qos = qos # type: ignore[attr-defined] + wrapper._has_subscribe = True # type: ignore[attr-defined] + + return wrapper # type: ignore[return-value] + + return decorator + + +def get_subscribe_config(func) -> dict: + """ + 获取函数上的订阅配置 + + Args: + func: 被装饰的函数 + + Returns: + 包含 topic, msg_type, qos 的配置字典 + """ + if hasattr(func, "_has_subscribe") and getattr(func, "_has_subscribe", False): + return { + "topic": getattr(func, "_subscribe_topic", None), + "msg_type": getattr(func, "_subscribe_msg_type", None), + "qos": getattr(func, "_subscribe_qos", 10), + } + return {} + + +def get_all_subscriptions(instance) -> list: + """ + 扫描实例的所有方法,获取带有 @subscribe 装饰器的方法及其配置 + + Args: + instance: 要扫描的实例 + + Returns: + 包含 (method_name, method, config) 元组的列表 + """ + subscriptions = [] + for attr_name in dir(instance): + if attr_name.startswith("_"): + continue + try: + attr = getattr(instance, attr_name) + if callable(attr): + config = get_subscribe_config(attr) + if config: + subscriptions.append((attr_name, attr, config)) + except Exception: + pass + return subscriptions