mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-04 13:25:13 +00:00
Merge remote-tracking branch 'origin/dev' into prcix9320
This commit is contained in:
7
tests/__init__.py
Normal file
7
tests/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
"""
|
||||||
|
测试包根目录。
|
||||||
|
|
||||||
|
让 `tests.*` 模块可以被正常 import(例如给 `unilabos` 下的测试入口使用)。
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
1
tests/devices/__init__.py
Normal file
1
tests/devices/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
5
tests/devices/liquid_handling/__init__.py
Normal file
5
tests/devices/liquid_handling/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""
|
||||||
|
液体处理设备相关测试。
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
505
tests/devices/liquid_handling/test_transfer_liquid.py
Normal file
505
tests/devices/liquid_handling/test_transfer_liquid.py
Normal file
@@ -0,0 +1,505 @@
|
|||||||
|
import asyncio
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Iterable, List, Optional, Sequence, Tuple
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class DummyContainer:
|
||||||
|
name: str
|
||||||
|
|
||||||
|
def __repr__(self) -> str: # pragma: no cover
|
||||||
|
return f"DummyContainer({self.name})"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class DummyTipSpot:
|
||||||
|
name: str
|
||||||
|
|
||||||
|
def __repr__(self) -> str: # pragma: no cover
|
||||||
|
return f"DummyTipSpot({self.name})"
|
||||||
|
|
||||||
|
|
||||||
|
def make_tip_iter(n: int = 256) -> Iterable[List[DummyTipSpot]]:
|
||||||
|
"""Yield lists so code can safely call `tip.extend(next(self.current_tip))`."""
|
||||||
|
for i in range(n):
|
||||||
|
yield [DummyTipSpot(f"tip_{i}")]
|
||||||
|
|
||||||
|
|
||||||
|
class FakeLiquidHandler(LiquidHandlerAbstract):
|
||||||
|
"""不初始化真实 backend/deck;仅用来记录 transfer_liquid 内部调用序列。"""
|
||||||
|
|
||||||
|
def __init__(self, channel_num: int = 8):
|
||||||
|
# 不调用 super().__init__,避免真实硬件/后端依赖
|
||||||
|
self.channel_num = channel_num
|
||||||
|
self.support_touch_tip = True
|
||||||
|
self.current_tip = iter(make_tip_iter())
|
||||||
|
self.calls: List[Tuple[str, Any]] = []
|
||||||
|
|
||||||
|
async def pick_up_tips(self, tip_spots, use_channels=None, offsets=None, **backend_kwargs):
|
||||||
|
self.calls.append(("pick_up_tips", {"tips": list(tip_spots), "use_channels": use_channels}))
|
||||||
|
|
||||||
|
async def aspirate(
|
||||||
|
self,
|
||||||
|
resources: Sequence[Any],
|
||||||
|
vols: List[float],
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
flow_rates: Optional[List[Optional[float]]] = None,
|
||||||
|
offsets: Any = None,
|
||||||
|
liquid_height: Any = None,
|
||||||
|
blow_out_air_volume: Any = None,
|
||||||
|
spread: str = "wide",
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
self.calls.append(
|
||||||
|
(
|
||||||
|
"aspirate",
|
||||||
|
{
|
||||||
|
"resources": list(resources),
|
||||||
|
"vols": list(vols),
|
||||||
|
"use_channels": list(use_channels) if use_channels is not None else None,
|
||||||
|
"flow_rates": list(flow_rates) if flow_rates is not None else None,
|
||||||
|
"offsets": list(offsets) if offsets is not None else None,
|
||||||
|
"liquid_height": list(liquid_height) if liquid_height is not None else None,
|
||||||
|
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def dispense(
|
||||||
|
self,
|
||||||
|
resources: Sequence[Any],
|
||||||
|
vols: List[float],
|
||||||
|
use_channels: Optional[List[int]] = None,
|
||||||
|
flow_rates: Optional[List[Optional[float]]] = None,
|
||||||
|
offsets: Any = None,
|
||||||
|
liquid_height: Any = None,
|
||||||
|
blow_out_air_volume: Any = None,
|
||||||
|
spread: str = "wide",
|
||||||
|
**backend_kwargs,
|
||||||
|
):
|
||||||
|
self.calls.append(
|
||||||
|
(
|
||||||
|
"dispense",
|
||||||
|
{
|
||||||
|
"resources": list(resources),
|
||||||
|
"vols": list(vols),
|
||||||
|
"use_channels": list(use_channels) if use_channels is not None else None,
|
||||||
|
"flow_rates": list(flow_rates) if flow_rates is not None else None,
|
||||||
|
"offsets": list(offsets) if offsets is not None else None,
|
||||||
|
"liquid_height": list(liquid_height) if liquid_height is not None else None,
|
||||||
|
"blow_out_air_volume": list(blow_out_air_volume) if blow_out_air_volume is not None else None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def discard_tips(self, use_channels=None, *args, **kwargs):
|
||||||
|
# 有的分支是 discard_tips(use_channels=[0]),有的分支是 discard_tips([0..7])(位置参数)
|
||||||
|
self.calls.append(("discard_tips", {"use_channels": list(use_channels) if use_channels is not None else None}))
|
||||||
|
|
||||||
|
async def custom_delay(self, seconds=0, msg=None):
|
||||||
|
self.calls.append(("custom_delay", {"seconds": seconds, "msg": msg}))
|
||||||
|
|
||||||
|
async def touch_tip(self, targets):
|
||||||
|
# 原实现会访问 targets.get_size_x() 等;测试里只记录调用
|
||||||
|
self.calls.append(("touch_tip", {"targets": targets}))
|
||||||
|
|
||||||
|
async def mix(self, targets, mix_time=None, mix_vol=None, height_to_bottom=None, offsets=None, mix_rate=None, none_keys=None):
|
||||||
|
self.calls.append(
|
||||||
|
(
|
||||||
|
"mix",
|
||||||
|
{
|
||||||
|
"targets": targets,
|
||||||
|
"mix_time": mix_time,
|
||||||
|
"mix_vol": mix_vol,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def run(coro):
|
||||||
|
return asyncio.run(coro)
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_one_single_channel_basic_calls():
|
||||||
|
lh = FakeLiquidHandler(channel_num=1)
|
||||||
|
lh.current_tip = iter(make_tip_iter(64))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(3)]
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(3)]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=[1, 2, 3],
|
||||||
|
dis_vols=[4, 5, 6],
|
||||||
|
mix_times=None, # 应该仍能执行(不 mix)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert [c[0] for c in lh.calls].count("pick_up_tips") == 3
|
||||||
|
assert [c[0] for c in lh.calls].count("aspirate") == 3
|
||||||
|
assert [c[0] for c in lh.calls].count("dispense") == 3
|
||||||
|
assert [c[0] for c in lh.calls].count("discard_tips") == 3
|
||||||
|
|
||||||
|
# 每次 aspirate/dispense 都是单孔列表
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
assert aspirates[0]["resources"] == [sources[0]]
|
||||||
|
assert aspirates[0]["vols"] == [1.0]
|
||||||
|
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert dispenses[2]["resources"] == [targets[2]]
|
||||||
|
assert dispenses[2]["vols"] == [6.0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_one_single_channel_before_stage_mixes_prior_to_aspirate():
|
||||||
|
lh = FakeLiquidHandler(channel_num=1)
|
||||||
|
lh.current_tip = iter(make_tip_iter(16))
|
||||||
|
|
||||||
|
source = DummyContainer("S0")
|
||||||
|
target = DummyContainer("T0")
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=[source],
|
||||||
|
targets=[target],
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=[5],
|
||||||
|
dis_vols=[5],
|
||||||
|
mix_stage="before",
|
||||||
|
mix_times=1,
|
||||||
|
mix_vol=3,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
names = [name for name, _ in lh.calls]
|
||||||
|
assert names.count("mix") == 1
|
||||||
|
assert names.index("mix") < names.index("aspirate")
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_one_eight_channel_groups_by_8():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(256))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(16)]
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(16)]
|
||||||
|
asp_vols = list(range(1, 17))
|
||||||
|
dis_vols = list(range(101, 117))
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=list(range(8)),
|
||||||
|
asp_vols=asp_vols,
|
||||||
|
dis_vols=dis_vols,
|
||||||
|
mix_times=0, # 触发逻辑但不 mix
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# 16 个任务 -> 2 组,每组 8 通道一起做
|
||||||
|
assert [c[0] for c in lh.calls].count("pick_up_tips") == 2
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert len(aspirates) == 2
|
||||||
|
assert len(dispenses) == 2
|
||||||
|
|
||||||
|
assert aspirates[0]["resources"] == sources[0:8]
|
||||||
|
assert aspirates[0]["vols"] == [float(v) for v in asp_vols[0:8]]
|
||||||
|
assert dispenses[1]["resources"] == targets[8:16]
|
||||||
|
assert dispenses[1]["vols"] == [float(v) for v in dis_vols[8:16]]
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_one_eight_channel_requires_multiple_of_8_targets():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(64))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(9)]
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(9)]
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="multiple of 8"):
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=list(range(8)),
|
||||||
|
asp_vols=[1] * 9,
|
||||||
|
dis_vols=[1] * 9,
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_one_eight_channel_parameter_lists_are_chunked_per_8():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(512))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(16)]
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(16)]
|
||||||
|
asp_vols = [i + 1 for i in range(16)]
|
||||||
|
dis_vols = [200 + i for i in range(16)]
|
||||||
|
asp_flow_rates = [0.1 * (i + 1) for i in range(16)]
|
||||||
|
dis_flow_rates = [0.2 * (i + 1) for i in range(16)]
|
||||||
|
offsets = [f"offset_{i}" for i in range(16)]
|
||||||
|
liquid_heights = [i * 0.5 for i in range(16)]
|
||||||
|
blow_out_air_volume = [i + 0.05 for i in range(16)]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=list(range(8)),
|
||||||
|
asp_vols=asp_vols,
|
||||||
|
dis_vols=dis_vols,
|
||||||
|
asp_flow_rates=asp_flow_rates,
|
||||||
|
dis_flow_rates=dis_flow_rates,
|
||||||
|
offsets=offsets,
|
||||||
|
liquid_height=liquid_heights,
|
||||||
|
blow_out_air_volume=blow_out_air_volume,
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert len(aspirates) == len(dispenses) == 2
|
||||||
|
|
||||||
|
for batch_idx in range(2):
|
||||||
|
start = batch_idx * 8
|
||||||
|
end = start + 8
|
||||||
|
asp_call = aspirates[batch_idx]
|
||||||
|
dis_call = dispenses[batch_idx]
|
||||||
|
assert asp_call["resources"] == sources[start:end]
|
||||||
|
assert asp_call["flow_rates"] == asp_flow_rates[start:end]
|
||||||
|
assert asp_call["offsets"] == offsets[start:end]
|
||||||
|
assert asp_call["liquid_height"] == liquid_heights[start:end]
|
||||||
|
assert asp_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
|
||||||
|
assert dis_call["flow_rates"] == dis_flow_rates[start:end]
|
||||||
|
assert dis_call["offsets"] == offsets[start:end]
|
||||||
|
assert dis_call["liquid_height"] == liquid_heights[start:end]
|
||||||
|
assert dis_call["blow_out_air_volume"] == blow_out_air_volume[start:end]
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_one_eight_channel_handles_32_tasks_four_batches():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(1024))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(32)]
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(32)]
|
||||||
|
asp_vols = [i + 1 for i in range(32)]
|
||||||
|
dis_vols = [300 + i for i in range(32)]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=list(range(8)),
|
||||||
|
asp_vols=asp_vols,
|
||||||
|
dis_vols=dis_vols,
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
pick_calls = [name for name, _ in lh.calls if name == "pick_up_tips"]
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert len(pick_calls) == 4
|
||||||
|
assert len(aspirates) == len(dispenses) == 4
|
||||||
|
assert aspirates[0]["resources"] == sources[0:8]
|
||||||
|
assert aspirates[-1]["resources"] == sources[24:32]
|
||||||
|
assert dispenses[0]["resources"] == targets[0:8]
|
||||||
|
assert dispenses[-1]["resources"] == targets[24:32]
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_many_single_channel_aspirates_total_when_asp_vol_too_small():
|
||||||
|
lh = FakeLiquidHandler(channel_num=1)
|
||||||
|
lh.current_tip = iter(make_tip_iter(64))
|
||||||
|
|
||||||
|
source = DummyContainer("SRC")
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(3)]
|
||||||
|
dis_vols = [10, 20, 30] # sum=60
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=[source],
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=10, # 小于 sum(dis_vols) -> 应吸 60
|
||||||
|
dis_vols=dis_vols,
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
assert len(aspirates) == 1
|
||||||
|
assert aspirates[0]["resources"] == [source]
|
||||||
|
assert aspirates[0]["vols"] == [60.0]
|
||||||
|
assert aspirates[0]["use_channels"] == [0]
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert [d["vols"][0] for d in dispenses] == [10.0, 20.0, 30.0]
|
||||||
|
|
||||||
|
|
||||||
|
def test_one_to_many_eight_channel_basic():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(128))
|
||||||
|
|
||||||
|
source = DummyContainer("SRC")
|
||||||
|
targets = [DummyContainer(f"T{i}") for i in range(8)]
|
||||||
|
dis_vols = [i + 1 for i in range(8)]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=[source],
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=list(range(8)),
|
||||||
|
asp_vols=999, # one-to-many 8ch 会按 dis_vols 吸(每通道各自)
|
||||||
|
dis_vols=dis_vols,
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
assert aspirates[0]["resources"] == [source] * 8
|
||||||
|
assert aspirates[0]["vols"] == [float(v) for v in dis_vols]
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert dispenses[0]["resources"] == targets
|
||||||
|
assert dispenses[0]["vols"] == [float(v) for v in dis_vols]
|
||||||
|
|
||||||
|
|
||||||
|
def test_many_to_one_single_channel_standard_dispense_equals_asp_by_default():
|
||||||
|
lh = FakeLiquidHandler(channel_num=1)
|
||||||
|
lh.current_tip = iter(make_tip_iter(128))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(3)]
|
||||||
|
target = DummyContainer("T")
|
||||||
|
asp_vols = [5, 6, 7]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=[target],
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=asp_vols,
|
||||||
|
dis_vols=1, # many-to-one 允许标量;非比例模式下实际每次分液=对应 asp_vol
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert [d["vols"][0] for d in dispenses] == [float(v) for v in asp_vols]
|
||||||
|
assert all(d["resources"] == [target] for d in dispenses)
|
||||||
|
|
||||||
|
|
||||||
|
def test_many_to_one_single_channel_before_stage_mixes_target_once():
|
||||||
|
lh = FakeLiquidHandler(channel_num=1)
|
||||||
|
lh.current_tip = iter(make_tip_iter(128))
|
||||||
|
|
||||||
|
sources = [DummyContainer("S0"), DummyContainer("S1")]
|
||||||
|
target = DummyContainer("T")
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=[target],
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=[5, 6],
|
||||||
|
dis_vols=1,
|
||||||
|
mix_stage="before",
|
||||||
|
mix_times=2,
|
||||||
|
mix_vol=4,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
names = [name for name, _ in lh.calls]
|
||||||
|
assert names[0] == "mix"
|
||||||
|
assert names.count("mix") == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_many_to_one_single_channel_proportional_mixing_uses_dis_vols_per_source():
|
||||||
|
lh = FakeLiquidHandler(channel_num=1)
|
||||||
|
lh.current_tip = iter(make_tip_iter(128))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(3)]
|
||||||
|
target = DummyContainer("T")
|
||||||
|
asp_vols = [5, 6, 7]
|
||||||
|
dis_vols = [1, 2, 3]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=[target],
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=asp_vols,
|
||||||
|
dis_vols=dis_vols, # 比例模式
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert [d["vols"][0] for d in dispenses] == [float(v) for v in dis_vols]
|
||||||
|
|
||||||
|
|
||||||
|
def test_many_to_one_eight_channel_basic():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(256))
|
||||||
|
|
||||||
|
sources = [DummyContainer(f"S{i}") for i in range(8)]
|
||||||
|
target = DummyContainer("T")
|
||||||
|
asp_vols = [10 + i for i in range(8)]
|
||||||
|
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=[target],
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=list(range(8)),
|
||||||
|
asp_vols=asp_vols,
|
||||||
|
dis_vols=999, # 非比例模式下每通道分液=对应 asp_vol
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
aspirates = [payload for name, payload in lh.calls if name == "aspirate"]
|
||||||
|
dispenses = [payload for name, payload in lh.calls if name == "dispense"]
|
||||||
|
assert aspirates[0]["resources"] == sources
|
||||||
|
assert aspirates[0]["vols"] == [float(v) for v in asp_vols]
|
||||||
|
assert dispenses[0]["resources"] == [target] * 8
|
||||||
|
assert dispenses[0]["vols"] == [float(v) for v in asp_vols]
|
||||||
|
|
||||||
|
|
||||||
|
def test_transfer_liquid_mode_detection_unsupported_shape_raises():
|
||||||
|
lh = FakeLiquidHandler(channel_num=8)
|
||||||
|
lh.current_tip = iter(make_tip_iter(64))
|
||||||
|
|
||||||
|
sources = [DummyContainer("S0"), DummyContainer("S1")]
|
||||||
|
targets = [DummyContainer("T0"), DummyContainer("T1"), DummyContainer("T2")]
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Unsupported transfer mode"):
|
||||||
|
run(
|
||||||
|
lh.transfer_liquid(
|
||||||
|
sources=sources,
|
||||||
|
targets=targets,
|
||||||
|
tip_racks=[],
|
||||||
|
use_channels=[0],
|
||||||
|
asp_vols=[1, 1],
|
||||||
|
dis_vols=[1, 1, 1],
|
||||||
|
mix_times=0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
@@ -11,10 +11,10 @@ import os
|
|||||||
# 添加项目根目录到路径
|
# 添加项目根目录到路径
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
|
||||||
|
|
||||||
# 导入测试模块
|
# 导入测试模块(统一从 tests 包获取)
|
||||||
from test.ros.msgs.test_basic import TestBasicFunctionality
|
from tests.ros.msgs.test_basic import TestBasicFunctionality
|
||||||
from test.ros.msgs.test_conversion import TestBasicConversion, TestMappingConversion
|
from tests.ros.msgs.test_conversion import TestBasicConversion, TestMappingConversion
|
||||||
from test.ros.msgs.test_mapping import TestTypeMapping, TestFieldMapping
|
from tests.ros.msgs.test_mapping import TestTypeMapping, TestFieldMapping
|
||||||
|
|
||||||
|
|
||||||
def run_tests():
|
def run_tests():
|
||||||
|
|||||||
@@ -1240,7 +1240,7 @@ class WebSocketClient(BaseCommunicationClient):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
self.message_processor.send_message(message)
|
self.message_processor.send_message(message)
|
||||||
logger.debug(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
|
logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}")
|
||||||
|
|
||||||
def publish_job_status(
|
def publish_job_status(
|
||||||
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None
|
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None
|
||||||
|
|||||||
@@ -1042,11 +1042,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
One or more TipRacks providing fresh tips.
|
One or more TipRacks providing fresh tips.
|
||||||
is_96_well
|
is_96_well
|
||||||
Set *True* to use the 96‑channel head.
|
Set *True* to use the 96‑channel head.
|
||||||
|
mix_stage
|
||||||
|
When to mix the target wells relative to dispensing. Default "none" means
|
||||||
|
no mixing occurs even if mix_times is provided. Use "before", "after", or
|
||||||
|
"both" to mix at the corresponding stage(s).
|
||||||
|
mix_times
|
||||||
|
Number of mix cycles. If *None* (default) no mixing occurs regardless of
|
||||||
|
mix_stage.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# 确保 use_channels 有默认值
|
# 确保 use_channels 有默认值
|
||||||
if use_channels is None:
|
if use_channels is None:
|
||||||
use_channels = [0] if self.channel_num >= 1 else list(range(self.channel_num))
|
# 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7)
|
||||||
|
use_channels = list(range(self.channel_num)) if self.channel_num > 0 else [0]
|
||||||
|
|
||||||
if is_96_well:
|
if is_96_well:
|
||||||
pass # This mode is not verified.
|
pass # This mode is not verified.
|
||||||
@@ -1074,42 +1082,42 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
if mix_times is not None:
|
if mix_times is not None:
|
||||||
mix_times = int(mix_times)
|
mix_times = int(mix_times)
|
||||||
|
|
||||||
# 识别传输模式
|
# 识别传输模式(mix_times 为 None 也应该能正常移液,只是不做 mix)
|
||||||
num_sources = len(sources)
|
num_sources = len(sources)
|
||||||
num_targets = len(targets)
|
num_targets = len(targets)
|
||||||
|
|
||||||
if num_sources == 1 and num_targets > 1:
|
if num_sources == 1 and num_targets > 1:
|
||||||
# 模式1: 一对多 (1 source -> N targets)
|
# 模式1: 一对多 (1 source -> N targets)
|
||||||
await self._transfer_one_to_many(
|
await self._transfer_one_to_many(
|
||||||
sources[0], targets, tip_racks, use_channels,
|
sources[0], targets, tip_racks, use_channels,
|
||||||
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
|
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
|
||||||
offsets, touch_tip, liquid_height, blow_out_air_volume,
|
offsets, touch_tip, liquid_height, blow_out_air_volume,
|
||||||
spread, mix_stage, mix_times, mix_vol, mix_rate,
|
spread, mix_stage, mix_times, mix_vol, mix_rate,
|
||||||
mix_liquid_height, delays
|
mix_liquid_height, delays
|
||||||
)
|
)
|
||||||
elif num_sources > 1 and num_targets == 1:
|
elif num_sources > 1 and num_targets == 1:
|
||||||
# 模式2: 多对一 (N sources -> 1 target)
|
# 模式2: 多对一 (N sources -> 1 target)
|
||||||
await self._transfer_many_to_one(
|
await self._transfer_many_to_one(
|
||||||
sources, targets[0], tip_racks, use_channels,
|
sources, targets[0], tip_racks, use_channels,
|
||||||
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
|
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
|
||||||
offsets, touch_tip, liquid_height, blow_out_air_volume,
|
offsets, touch_tip, liquid_height, blow_out_air_volume,
|
||||||
spread, mix_stage, mix_times, mix_vol, mix_rate,
|
spread, mix_stage, mix_times, mix_vol, mix_rate,
|
||||||
mix_liquid_height, delays
|
mix_liquid_height, delays
|
||||||
)
|
)
|
||||||
elif num_sources == num_targets:
|
elif num_sources == num_targets:
|
||||||
# 模式3: 一对一 (N sources -> N targets) - 原有逻辑
|
# 模式3: 一对一 (N sources -> N targets)
|
||||||
await self._transfer_one_to_one(
|
await self._transfer_one_to_one(
|
||||||
sources, targets, tip_racks, use_channels,
|
sources, targets, tip_racks, use_channels,
|
||||||
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
|
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
|
||||||
offsets, touch_tip, liquid_height, blow_out_air_volume,
|
offsets, touch_tip, liquid_height, blow_out_air_volume,
|
||||||
spread, mix_stage, mix_times, mix_vol, mix_rate,
|
spread, mix_stage, mix_times, mix_vol, mix_rate,
|
||||||
mix_liquid_height, delays
|
mix_liquid_height, delays
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
|
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
|
||||||
"Supported modes: 1->N, N->1, or N->N."
|
"Supported modes: 1->N, N->1, or N->N."
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _transfer_one_to_one(
|
async def _transfer_one_to_one(
|
||||||
self,
|
self,
|
||||||
@@ -1149,6 +1157,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
tip.extend(next(self.current_tip))
|
tip.extend(next(self.current_tip))
|
||||||
await self.pick_up_tips(tip)
|
await self.pick_up_tips(tip)
|
||||||
|
|
||||||
|
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||||||
|
await self.mix(
|
||||||
|
targets=[targets[_]],
|
||||||
|
mix_time=mix_times,
|
||||||
|
mix_vol=mix_vol,
|
||||||
|
offsets=offsets if offsets else None,
|
||||||
|
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||||||
|
mix_rate=mix_rate if mix_rate else None,
|
||||||
|
)
|
||||||
|
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=[sources[_]],
|
resources=[sources[_]],
|
||||||
vols=[asp_vols[_]],
|
vols=[asp_vols[_]],
|
||||||
@@ -1209,6 +1227,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
|
||||||
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
|
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
|
||||||
|
|
||||||
|
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||||||
|
await self.mix(
|
||||||
|
targets=current_targets,
|
||||||
|
mix_time=mix_times,
|
||||||
|
mix_vol=mix_vol,
|
||||||
|
offsets=offsets if offsets else None,
|
||||||
|
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||||||
|
mix_rate=mix_rate if mix_rate else None,
|
||||||
|
)
|
||||||
|
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=current_reagent_sources,
|
resources=current_reagent_sources,
|
||||||
vols=current_asp_vols,
|
vols=current_asp_vols,
|
||||||
@@ -1290,6 +1318,17 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
tip.extend(next(self.current_tip))
|
tip.extend(next(self.current_tip))
|
||||||
await self.pick_up_tips(tip)
|
await self.pick_up_tips(tip)
|
||||||
|
|
||||||
|
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||||||
|
for idx, target in enumerate(targets):
|
||||||
|
await self.mix(
|
||||||
|
targets=[target],
|
||||||
|
mix_time=mix_times,
|
||||||
|
mix_vol=mix_vol,
|
||||||
|
offsets=offsets[idx:idx + 1] if offsets and len(offsets) > idx else None,
|
||||||
|
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||||||
|
mix_rate=mix_rate if mix_rate else None,
|
||||||
|
)
|
||||||
|
|
||||||
# 从源容器吸液(总体积)
|
# 从源容器吸液(总体积)
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=[source],
|
resources=[source],
|
||||||
@@ -1354,6 +1393,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8
|
current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8
|
||||||
current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8
|
current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8
|
||||||
|
|
||||||
|
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||||||
|
await self.mix(
|
||||||
|
targets=current_targets,
|
||||||
|
mix_time=mix_times,
|
||||||
|
mix_vol=mix_vol,
|
||||||
|
offsets=offsets[i:i + 8] if offsets else None,
|
||||||
|
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||||||
|
mix_rate=mix_rate if mix_rate else None,
|
||||||
|
)
|
||||||
|
|
||||||
# 从源容器吸液(8个通道都从同一个源,但每个通道的吸液体积不同)
|
# 从源容器吸液(8个通道都从同一个源,但每个通道的吸液体积不同)
|
||||||
await self.aspirate(
|
await self.aspirate(
|
||||||
resources=[source] * 8, # 8个通道都从同一个源
|
resources=[source] * 8, # 8个通道都从同一个源
|
||||||
@@ -1452,8 +1501,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
# 单通道模式:多次吸液,一次分液
|
# 单通道模式:多次吸液,一次分液
|
||||||
# 先混合前(如果需要)
|
# 先混合前(如果需要)
|
||||||
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||||||
# 注意:在吸液前混合源容器通常不常见,这里跳过
|
await self.mix(
|
||||||
pass
|
targets=[target],
|
||||||
|
mix_time=mix_times,
|
||||||
|
mix_vol=mix_vol,
|
||||||
|
offsets=offsets[0:1] if offsets else None,
|
||||||
|
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||||||
|
mix_rate=mix_rate if mix_rate else None,
|
||||||
|
)
|
||||||
|
|
||||||
# 从每个源容器吸液并分液到目标容器
|
# 从每个源容器吸液并分液到目标容器
|
||||||
for idx, source in enumerate(sources):
|
for idx, source in enumerate(sources):
|
||||||
@@ -1528,6 +1583,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
|||||||
raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.")
|
raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.")
|
||||||
|
|
||||||
# 每次处理8个源
|
# 每次处理8个源
|
||||||
|
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
|
||||||
|
await self.mix(
|
||||||
|
targets=[target],
|
||||||
|
mix_time=mix_times,
|
||||||
|
mix_vol=mix_vol,
|
||||||
|
offsets=offsets[0:1] if offsets else None,
|
||||||
|
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
|
||||||
|
mix_rate=mix_rate if mix_rate else None,
|
||||||
|
)
|
||||||
|
|
||||||
for i in range(0, len(sources), 8):
|
for i in range(0, len(sources), 8):
|
||||||
tip = []
|
tip = []
|
||||||
for _ in range(len(use_channels)):
|
for _ in range(len(use_channels)):
|
||||||
|
|||||||
13
unilabos/devices/liquid_handling/test_transfer_liquid.py
Normal file
13
unilabos/devices/liquid_handling/test_transfer_liquid.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
"""
|
||||||
|
说明:
|
||||||
|
这里放一个“入口文件”,方便在 `unilabos/devices/liquid_handling` 目录下直接找到
|
||||||
|
`transfer_liquid` 的测试。
|
||||||
|
|
||||||
|
实际测试用例实现放在仓库标准测试目录:
|
||||||
|
`tests/devices/liquid_handling/test_transfer_liquid.py`
|
||||||
|
"""
|
||||||
|
|
||||||
|
# 让 pytest 能从这里发现同一套测试(避免复制两份测试代码)。
|
||||||
|
from tests.devices.liquid_handling.test_transfer_liquid import * # noqa: F401,F403
|
||||||
|
|
||||||
|
|
||||||
@@ -21,6 +21,7 @@ from rclpy.callback_groups import ReentrantCallbackGroup
|
|||||||
from rclpy.service import Service
|
from rclpy.service import Service
|
||||||
from unilabos_msgs.action import SendCmd
|
from unilabos_msgs.action import SendCmd
|
||||||
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
|
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
|
||||||
|
from unilabos.utils.decorator import get_topic_config, get_all_subscriptions
|
||||||
|
|
||||||
from unilabos.resources.container import RegularContainer
|
from unilabos.resources.container import RegularContainer
|
||||||
from unilabos.resources.graphio import (
|
from unilabos.resources.graphio import (
|
||||||
@@ -48,7 +49,8 @@ from unilabos_msgs.msg import Resource # type: ignore
|
|||||||
from unilabos.ros.nodes.resource_tracker import (
|
from unilabos.ros.nodes.resource_tracker import (
|
||||||
DeviceNodeResourceTracker,
|
DeviceNodeResourceTracker,
|
||||||
ResourceTreeSet,
|
ResourceTreeSet,
|
||||||
ResourceTreeInstance, ResourceDictInstance,
|
ResourceTreeInstance,
|
||||||
|
ResourceDictInstance,
|
||||||
)
|
)
|
||||||
from unilabos.ros.x.rclpyx import get_event_loop
|
from unilabos.ros.x.rclpyx import get_event_loop
|
||||||
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
|
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
|
||||||
@@ -168,6 +170,7 @@ class PropertyPublisher:
|
|||||||
msg_type,
|
msg_type,
|
||||||
initial_period: float = 5.0,
|
initial_period: float = 5.0,
|
||||||
print_publish=True,
|
print_publish=True,
|
||||||
|
qos: int = 10,
|
||||||
):
|
):
|
||||||
self.node = node
|
self.node = node
|
||||||
self.name = name
|
self.name = name
|
||||||
@@ -175,10 +178,11 @@ class PropertyPublisher:
|
|||||||
self.get_method = get_method
|
self.get_method = get_method
|
||||||
self.timer_period = initial_period
|
self.timer_period = initial_period
|
||||||
self.print_publish = print_publish
|
self.print_publish = print_publish
|
||||||
|
self.qos = qos
|
||||||
|
|
||||||
self._value = None
|
self._value = None
|
||||||
try:
|
try:
|
||||||
self.publisher_ = node.create_publisher(msg_type, f"{name}", 10)
|
self.publisher_ = node.create_publisher(msg_type, f"{name}", qos)
|
||||||
except AttributeError as ex:
|
except AttributeError as ex:
|
||||||
self.node.lab_logger().error(
|
self.node.lab_logger().error(
|
||||||
f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}"
|
f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}"
|
||||||
@@ -186,7 +190,7 @@ class PropertyPublisher:
|
|||||||
self.timer = node.create_timer(self.timer_period, self.publish_property)
|
self.timer = node.create_timer(self.timer_period, self.publish_property)
|
||||||
self.__loop = get_event_loop()
|
self.__loop = get_event_loop()
|
||||||
str_msg_type = str(msg_type)[8:-2]
|
str_msg_type = str(msg_type)[8:-2]
|
||||||
self.node.lab_logger().trace(f"发布属性: {name}, 类型: {str_msg_type}, 周期: {initial_period}秒")
|
self.node.lab_logger().trace(f"发布属性: {name}, 类型: {str_msg_type}, 周期: {initial_period}秒, QoS: {qos}")
|
||||||
|
|
||||||
def get_property(self):
|
def get_property(self):
|
||||||
if asyncio.iscoroutinefunction(self.get_method):
|
if asyncio.iscoroutinefunction(self.get_method):
|
||||||
@@ -326,6 +330,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
continue
|
continue
|
||||||
self.create_ros_action_server(action_name, action_value_mapping)
|
self.create_ros_action_server(action_name, action_value_mapping)
|
||||||
|
|
||||||
|
# 创建订阅者(通过 @subscribe 装饰器)
|
||||||
|
self._topic_subscribers: Dict[str, Any] = {}
|
||||||
|
self._setup_decorated_subscribers()
|
||||||
|
|
||||||
# 创建线程池执行器
|
# 创建线程池执行器
|
||||||
self._executor = ThreadPoolExecutor(
|
self._executor = ThreadPoolExecutor(
|
||||||
max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}"
|
max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}"
|
||||||
@@ -1043,6 +1051,29 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
|
|
||||||
def create_ros_publisher(self, attr_name, msg_type, initial_period=5.0):
|
def create_ros_publisher(self, attr_name, msg_type, initial_period=5.0):
|
||||||
"""创建ROS发布者"""
|
"""创建ROS发布者"""
|
||||||
|
# 检测装饰器配置(支持 get_{attr_name} 方法和 @property)
|
||||||
|
topic_config = {}
|
||||||
|
|
||||||
|
# 优先检测 get_{attr_name} 方法
|
||||||
|
if hasattr(self.driver_instance, f"get_{attr_name}"):
|
||||||
|
getter_method = getattr(self.driver_instance, f"get_{attr_name}")
|
||||||
|
topic_config = get_topic_config(getter_method)
|
||||||
|
|
||||||
|
# 如果没有配置,检测 @property 装饰的属性
|
||||||
|
if not topic_config:
|
||||||
|
driver_class = type(self.driver_instance)
|
||||||
|
if hasattr(driver_class, attr_name):
|
||||||
|
class_attr = getattr(driver_class, attr_name)
|
||||||
|
if isinstance(class_attr, property) and class_attr.fget is not None:
|
||||||
|
topic_config = get_topic_config(class_attr.fget)
|
||||||
|
|
||||||
|
# 使用装饰器配置或默认值
|
||||||
|
cfg_period = topic_config.get("period")
|
||||||
|
cfg_print = topic_config.get("print_publish")
|
||||||
|
cfg_qos = topic_config.get("qos")
|
||||||
|
period: float = cfg_period if cfg_period is not None else initial_period
|
||||||
|
print_publish: bool = cfg_print if cfg_print is not None else self._print_publish
|
||||||
|
qos: int = cfg_qos if cfg_qos is not None else 10
|
||||||
|
|
||||||
# 获取属性值的方法
|
# 获取属性值的方法
|
||||||
def get_device_attr():
|
def get_device_attr():
|
||||||
@@ -1063,7 +1094,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
self.lab_logger().error(traceback.format_exc())
|
self.lab_logger().error(traceback.format_exc())
|
||||||
|
|
||||||
self._property_publishers[attr_name] = PropertyPublisher(
|
self._property_publishers[attr_name] = PropertyPublisher(
|
||||||
self, attr_name, get_device_attr, msg_type, initial_period, self._print_publish
|
self, attr_name, get_device_attr, msg_type, period, print_publish, qos
|
||||||
)
|
)
|
||||||
|
|
||||||
def create_ros_action_server(self, action_name, action_value_mapping):
|
def create_ros_action_server(self, action_name, action_value_mapping):
|
||||||
@@ -1081,6 +1112,76 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
|
|
||||||
self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}")
|
self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}")
|
||||||
|
|
||||||
|
def _setup_decorated_subscribers(self):
|
||||||
|
"""扫描 driver_instance 中带有 @subscribe 装饰器的方法并创建订阅者"""
|
||||||
|
subscriptions = get_all_subscriptions(self.driver_instance)
|
||||||
|
|
||||||
|
for method_name, method, config in subscriptions:
|
||||||
|
topic_template = config.get("topic")
|
||||||
|
msg_type = config.get("msg_type")
|
||||||
|
qos = config.get("qos", 10)
|
||||||
|
|
||||||
|
if not topic_template:
|
||||||
|
self.lab_logger().warning(f"订阅方法 {method_name} 缺少 topic 配置,跳过")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 如果没有指定 msg_type,尝试从类型注解推断
|
||||||
|
if msg_type is None:
|
||||||
|
try:
|
||||||
|
hints = get_type_hints(method)
|
||||||
|
# 第一个参数是 self,第二个是 msg
|
||||||
|
param_names = list(hints.keys())
|
||||||
|
if param_names:
|
||||||
|
msg_type = hints[param_names[0]]
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if msg_type is None:
|
||||||
|
self.lab_logger().warning(f"订阅方法 {method_name} 缺少 msg_type 配置且无法从类型注解推断,跳过")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 替换 topic 模板中的占位符
|
||||||
|
topic = self._resolve_topic_template(topic_template)
|
||||||
|
|
||||||
|
self.create_ros_subscriber(topic, msg_type, method, qos)
|
||||||
|
|
||||||
|
def _resolve_topic_template(self, topic_template: str) -> str:
|
||||||
|
"""
|
||||||
|
解析 topic 模板,替换占位符
|
||||||
|
|
||||||
|
支持的占位符:
|
||||||
|
- {device_id}: 设备ID
|
||||||
|
- {namespace}: 完整命名空间
|
||||||
|
"""
|
||||||
|
return topic_template.format(
|
||||||
|
device_id=self.device_id,
|
||||||
|
namespace=self.namespace,
|
||||||
|
)
|
||||||
|
|
||||||
|
def create_ros_subscriber(self, topic: str, msg_type, callback, qos: int = 10):
|
||||||
|
"""
|
||||||
|
创建ROS订阅者
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: Topic 名称
|
||||||
|
msg_type: ROS 消息类型
|
||||||
|
callback: 回调方法(会自动绑定到 driver_instance)
|
||||||
|
qos: QoS 深度配置
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
subscription = self.create_subscription(
|
||||||
|
msg_type,
|
||||||
|
topic,
|
||||||
|
callback,
|
||||||
|
qos,
|
||||||
|
callback_group=self.callback_group,
|
||||||
|
)
|
||||||
|
self._topic_subscribers[topic] = subscription
|
||||||
|
str_msg_type = str(msg_type)[8:-2] if str(msg_type).startswith("<class") else str(msg_type)
|
||||||
|
self.lab_logger().trace(f"订阅Topic: {topic}, 类型: {str_msg_type}, QoS: {qos}")
|
||||||
|
except Exception as ex:
|
||||||
|
self.lab_logger().error(f"创建订阅者 {topic} 失败,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}")
|
||||||
|
|
||||||
def get_real_function(self, instance, attr_name):
|
def get_real_function(self, instance, attr_name):
|
||||||
if hasattr(instance.__class__, attr_name):
|
if hasattr(instance.__class__, attr_name):
|
||||||
obj = getattr(instance.__class__, attr_name)
|
obj = getattr(instance.__class__, attr_name)
|
||||||
@@ -1180,6 +1281,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
|||||||
if asyncio.iscoroutinefunction(ACTION):
|
if asyncio.iscoroutinefunction(ACTION):
|
||||||
try:
|
try:
|
||||||
self.lab_logger().trace(f"异步执行动作 {ACTION}")
|
self.lab_logger().trace(f"异步执行动作 {ACTION}")
|
||||||
|
|
||||||
def _handle_future_exception(fut: Future):
|
def _handle_future_exception(fut: Future):
|
||||||
nonlocal execution_error, execution_success, action_return_value
|
nonlocal execution_error, execution_success, action_return_value
|
||||||
try:
|
try:
|
||||||
@@ -1552,6 +1654,7 @@ class ROS2DeviceNode:
|
|||||||
这个类封装了设备类实例和ROS2节点的功能,提供ROS2接口。
|
这个类封装了设备类实例和ROS2节点的功能,提供ROS2接口。
|
||||||
它不继承设备类,而是通过代理模式访问设备类的属性和方法。
|
它不继承设备类,而是通过代理模式访问设备类的属性和方法。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def safe_task_wrapper(trace_callback, func, **kwargs):
|
async def safe_task_wrapper(trace_callback, func, **kwargs):
|
||||||
try:
|
try:
|
||||||
@@ -1574,7 +1677,9 @@ class ROS2DeviceNode:
|
|||||||
error(f"异步任务 {func.__name__} 获取结果失败")
|
error(f"异步任务 {func.__name__} 获取结果失败")
|
||||||
error(traceback.format_exc())
|
error(traceback.format_exc())
|
||||||
|
|
||||||
future = rclpy.get_global_executor().create_task(ROS2DeviceNode.safe_task_wrapper(inner_trace_callback, func, **kwargs))
|
future = rclpy.get_global_executor().create_task(
|
||||||
|
ROS2DeviceNode.safe_task_wrapper(inner_trace_callback, func, **kwargs)
|
||||||
|
)
|
||||||
if trace_error:
|
if trace_error:
|
||||||
future.add_done_callback(_handle_future_exception)
|
future.add_done_callback(_handle_future_exception)
|
||||||
return future
|
return future
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
],
|
],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_chiller",
|
"class": "virtual_heatchill",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -49,7 +49,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_filter",
|
"class": "virtual_filter",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -80,7 +80,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_heater",
|
"class": "virtual_heatchill",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -108,7 +108,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_pump",
|
"class": "virtual_transfer_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -147,7 +147,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_rotavap",
|
"class": "virtual_rotavap",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -175,7 +175,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_separator",
|
"class": "virtual_separator",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -213,7 +213,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_solenoid_valve",
|
"class": "virtual_solenoid_valve",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -233,7 +233,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_stirrer_new",
|
"class": "virtual_stirrer",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -261,7 +261,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_stirrer",
|
"class": "virtual_stirrer",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
@@ -289,7 +289,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_vacuum",
|
"class": "virtual_vacuum_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_chiller",
|
"class": "virtual_heatchill",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_filter",
|
"class": "virtual_filter",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_heater",
|
"class": "virtual_heatchill",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_pump",
|
"class": "virtual_transfer_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_rotavap",
|
"class": "virtual_rotavap",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_separator",
|
"class": "virtual_separator",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_solenoid_valve",
|
"class": "virtual_solenoid_valve",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_stirrer",
|
"class": "virtual_stirrer",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_stirrer_new",
|
"class": "virtual_stirrer",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": null,
|
"parent": null,
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "mock_vacuum",
|
"class": "virtual_vacuum_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 620.6111111111111,
|
"x": 620.6111111111111,
|
||||||
"y": 171,
|
"y": 171,
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": "CentrifugeTestStation",
|
"parent": "CentrifugeTestStation",
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "virtual_pump",
|
"class": "virtual_transfer_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 520.6111111111111,
|
"x": 520.6111111111111,
|
||||||
"y": 300,
|
"y": 300,
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": "FilterTestStation",
|
"parent": "FilterTestStation",
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "virtual_pump",
|
"class": "virtual_transfer_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 520.6111111111111,
|
"x": 520.6111111111111,
|
||||||
"y": 300,
|
"y": 300,
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": "HeatChillTestStation",
|
"parent": "HeatChillTestStation",
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "virtual_pump",
|
"class": "virtual_transfer_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 520.6111111111111,
|
"x": 520.6111111111111,
|
||||||
"y": 300,
|
"y": 300,
|
||||||
|
|||||||
@@ -32,7 +32,7 @@
|
|||||||
"children": [],
|
"children": [],
|
||||||
"parent": "StirTestStation",
|
"parent": "StirTestStation",
|
||||||
"type": "device",
|
"type": "device",
|
||||||
"class": "virtual_pump",
|
"class": "virtual_transfer_pump",
|
||||||
"position": {
|
"position": {
|
||||||
"x": 520.6111111111111,
|
"x": 520.6111111111111,
|
||||||
"y": 300,
|
"y": 300,
|
||||||
|
|||||||
@@ -7,49 +7,18 @@ from typing import Dict, Any, List
|
|||||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
|
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
|
||||||
|
|
||||||
|
|
||||||
class SmartPumpController:
|
class AnyDevice:
|
||||||
"""
|
@property
|
||||||
智能泵控制器
|
def status(self) -> str:
|
||||||
|
return "Idle"
|
||||||
|
|
||||||
支持多种泵送模式,具有高精度流量控制和自动校准功能。
|
async def action(self, addr: str) -> bool:
|
||||||
适用于实验室自动化系统中的液体处理任务。
|
|
||||||
"""
|
|
||||||
|
|
||||||
_ros_node: BaseROS2DeviceNode
|
|
||||||
|
|
||||||
def __init__(self, device_id: str = "smart_pump_01", port: str = "/dev/ttyUSB0"):
|
|
||||||
"""
|
|
||||||
初始化智能泵控制器
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_id: 设备唯一标识符
|
|
||||||
port: 通信端口
|
|
||||||
"""
|
|
||||||
self.device_id = device_id
|
|
||||||
self.port = port
|
|
||||||
self.is_connected = False
|
|
||||||
self.current_flow_rate = 0.0
|
|
||||||
self.total_volume_pumped = 0.0
|
|
||||||
self.calibration_factor = 1.0
|
|
||||||
self.pump_mode = "continuous" # continuous, volume, rate
|
|
||||||
|
|
||||||
def post_init(self, ros_node: BaseROS2DeviceNode):
|
|
||||||
self._ros_node = ros_node
|
|
||||||
|
|
||||||
def connect_device(self, timeout: int = 10) -> bool:
|
|
||||||
"""
|
|
||||||
连接到泵设备
|
|
||||||
|
|
||||||
Args:
|
|
||||||
timeout: 连接超时时间(秒)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: 连接是否成功
|
|
||||||
"""
|
|
||||||
# 模拟连接过程
|
|
||||||
self.is_connected = True
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def disconnect_device(self) -> bool:
|
def disconnect_device(self) -> bool:
|
||||||
"""
|
"""
|
||||||
断开设备连接
|
断开设备连接
|
||||||
|
|||||||
@@ -1,3 +1,9 @@
|
|||||||
|
from functools import wraps
|
||||||
|
from typing import Any, Callable, Optional, TypeVar
|
||||||
|
|
||||||
|
F = TypeVar("F", bound=Callable[..., Any])
|
||||||
|
|
||||||
|
|
||||||
def singleton(cls):
|
def singleton(cls):
|
||||||
"""
|
"""
|
||||||
单例装饰器
|
单例装饰器
|
||||||
@@ -12,3 +18,167 @@ def singleton(cls):
|
|||||||
|
|
||||||
return get_instance
|
return get_instance
|
||||||
|
|
||||||
|
|
||||||
|
def topic_config(
|
||||||
|
period: Optional[float] = None,
|
||||||
|
print_publish: Optional[bool] = None,
|
||||||
|
qos: Optional[int] = None,
|
||||||
|
) -> Callable[[F], F]:
|
||||||
|
"""
|
||||||
|
Topic发布配置装饰器
|
||||||
|
|
||||||
|
用于装饰 get_{attr_name} 方法或 @property,控制对应属性的ROS topic发布行为。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
period: 发布周期(秒)。None 表示使用默认值 5.0
|
||||||
|
print_publish: 是否打印发布日志。None 表示使用节点默认配置
|
||||||
|
qos: QoS深度配置。None 表示使用默认值 10
|
||||||
|
|
||||||
|
Example:
|
||||||
|
class MyDriver:
|
||||||
|
# 方式1: 装饰 get_{attr_name} 方法
|
||||||
|
@topic_config(period=1.0, print_publish=False, qos=5)
|
||||||
|
def get_temperature(self):
|
||||||
|
return self._temperature
|
||||||
|
|
||||||
|
# 方式2: 与 @property 连用(topic_config 放在下面)
|
||||||
|
@property
|
||||||
|
@topic_config(period=0.1)
|
||||||
|
def position(self):
|
||||||
|
return self._position
|
||||||
|
|
||||||
|
Note:
|
||||||
|
与 @property 连用时,@topic_config 必须放在 @property 下面,
|
||||||
|
这样装饰器执行顺序为:先 topic_config 添加配置,再 property 包装。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(func: F) -> F:
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
# 在函数上附加配置属性 (type: ignore 用于动态属性)
|
||||||
|
wrapper._topic_period = period # type: ignore[attr-defined]
|
||||||
|
wrapper._topic_print_publish = print_publish # type: ignore[attr-defined]
|
||||||
|
wrapper._topic_qos = qos # type: ignore[attr-defined]
|
||||||
|
wrapper._has_topic_config = True # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
return wrapper # type: ignore[return-value]
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
def get_topic_config(func) -> dict:
|
||||||
|
"""
|
||||||
|
获取函数上的topic配置
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: 被装饰的函数
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含 period, print_publish, qos 的配置字典
|
||||||
|
"""
|
||||||
|
if hasattr(func, "_has_topic_config") and getattr(func, "_has_topic_config", False):
|
||||||
|
return {
|
||||||
|
"period": getattr(func, "_topic_period", None),
|
||||||
|
"print_publish": getattr(func, "_topic_print_publish", None),
|
||||||
|
"qos": getattr(func, "_topic_qos", None),
|
||||||
|
}
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def subscribe(
|
||||||
|
topic: str,
|
||||||
|
msg_type: Optional[type] = None,
|
||||||
|
qos: int = 10,
|
||||||
|
) -> Callable[[F], F]:
|
||||||
|
"""
|
||||||
|
Topic订阅装饰器
|
||||||
|
|
||||||
|
用于装饰 driver 类中的方法,使其成为 ROS topic 的订阅回调。
|
||||||
|
当 ROS2DeviceNode 初始化时,会自动扫描并创建对应的订阅者。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: Topic 名称模板,支持以下占位符:
|
||||||
|
- {device_id}: 设备ID (如 "pump_1")
|
||||||
|
- {namespace}: 完整命名空间 (如 "/devices/pump_1")
|
||||||
|
msg_type: ROS 消息类型。如果为 None,需要在回调函数的类型注解中指定
|
||||||
|
qos: QoS 深度配置,默认为 10
|
||||||
|
|
||||||
|
Example:
|
||||||
|
from std_msgs.msg import String, Float64
|
||||||
|
|
||||||
|
class MyDriver:
|
||||||
|
@subscribe(topic="/devices/{device_id}/set_speed", msg_type=Float64)
|
||||||
|
def on_speed_update(self, msg: Float64):
|
||||||
|
self._speed = msg.data
|
||||||
|
print(f"Speed updated to: {self._speed}")
|
||||||
|
|
||||||
|
@subscribe(topic="{namespace}/command")
|
||||||
|
def on_command(self, msg: String):
|
||||||
|
# msg_type 可从类型注解推断
|
||||||
|
self.execute_command(msg.data)
|
||||||
|
|
||||||
|
Note:
|
||||||
|
- 回调方法的第一个参数是 self,第二个参数是收到的 ROS 消息
|
||||||
|
- topic 中的占位符会在创建订阅时被实际值替换
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(func: F) -> F:
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
# 在函数上附加订阅配置
|
||||||
|
wrapper._subscribe_topic = topic # type: ignore[attr-defined]
|
||||||
|
wrapper._subscribe_msg_type = msg_type # type: ignore[attr-defined]
|
||||||
|
wrapper._subscribe_qos = qos # type: ignore[attr-defined]
|
||||||
|
wrapper._has_subscribe = True # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
return wrapper # type: ignore[return-value]
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
def get_subscribe_config(func) -> dict:
|
||||||
|
"""
|
||||||
|
获取函数上的订阅配置
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: 被装饰的函数
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含 topic, msg_type, qos 的配置字典
|
||||||
|
"""
|
||||||
|
if hasattr(func, "_has_subscribe") and getattr(func, "_has_subscribe", False):
|
||||||
|
return {
|
||||||
|
"topic": getattr(func, "_subscribe_topic", None),
|
||||||
|
"msg_type": getattr(func, "_subscribe_msg_type", None),
|
||||||
|
"qos": getattr(func, "_subscribe_qos", 10),
|
||||||
|
}
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_subscriptions(instance) -> list:
|
||||||
|
"""
|
||||||
|
扫描实例的所有方法,获取带有 @subscribe 装饰器的方法及其配置
|
||||||
|
|
||||||
|
Args:
|
||||||
|
instance: 要扫描的实例
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含 (method_name, method, config) 元组的列表
|
||||||
|
"""
|
||||||
|
subscriptions = []
|
||||||
|
for attr_name in dir(instance):
|
||||||
|
if attr_name.startswith("_"):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
attr = getattr(instance, attr_name)
|
||||||
|
if callable(attr):
|
||||||
|
config = get_subscribe_config(attr)
|
||||||
|
if config:
|
||||||
|
subscriptions.append((attr_name, attr, config))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return subscriptions
|
||||||
|
|||||||
Reference in New Issue
Block a user