3 Commits

Author SHA1 Message Date
Xuwznln
39bb7dc627 adjust with_children param 2025-11-03 16:31:37 +08:00
Xuwznln
0fda155f55 modify devices to use correct executor (sleep, create_task) 2025-11-03 15:49:11 +08:00
Xuwznln
6e3eacd2f0 support sleep and create_task in node 2025-11-03 15:42:12 +08:00
19 changed files with 855 additions and 617 deletions

View File

@@ -3,7 +3,8 @@
"""
import asyncio
from typing import Dict, Any, Optional, List
from typing import Dict, Any, List
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class SmartPumpController:
@@ -14,6 +15,8 @@ class SmartPumpController:
适用于实验室自动化系统中的液体处理任务。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = "smart_pump_01", port: str = "/dev/ttyUSB0"):
"""
初始化智能泵控制器
@@ -30,6 +33,9 @@ class SmartPumpController:
self.calibration_factor = 1.0
self.pump_mode = "continuous" # continuous, volume, rate
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def connect_device(self, timeout: int = 10) -> bool:
"""
连接到泵设备
@@ -90,7 +96,7 @@ class SmartPumpController:
pump_time = (volume / flow_rate) * 60 # 转换为秒
self.current_flow_rate = flow_rate
await asyncio.sleep(min(pump_time, 3.0)) # 模拟泵送过程
await self._ros_node.sleep(min(pump_time, 3.0)) # 模拟泵送过程
self.total_volume_pumped += volume
self.current_flow_rate = 0.0
@@ -170,6 +176,8 @@ class AdvancedTemperatureController:
适用于需要精确温度控制的化学反应和材料处理过程。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, controller_id: str = "temp_controller_01"):
"""
初始化温度控制器
@@ -185,6 +193,9 @@ class AdvancedTemperatureController:
self.pid_enabled = True
self.temperature_history: List[Dict] = []
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def set_target_temperature(self, temperature: float, rate: float = 10.0) -> bool:
"""
设置目标温度
@@ -238,7 +249,7 @@ class AdvancedTemperatureController:
}
)
await asyncio.sleep(step_time)
await self._ros_node.sleep(step_time)
# 保持历史记录不超过100条
if len(self.temperature_history) > 100:
@@ -330,6 +341,8 @@ class MultiChannelAnalyzer:
常用于光谱分析、电化学测量等应用场景。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, analyzer_id: str = "analyzer_01", channels: int = 8):
"""
初始化多通道分析仪
@@ -344,6 +357,9 @@ class MultiChannelAnalyzer:
self.is_measuring = False
self.sample_rate = 1000 # Hz
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def configure_channel(self, channel: int, enabled: bool = True, unit: str = "V") -> bool:
"""
配置通道
@@ -376,7 +392,7 @@ class MultiChannelAnalyzer:
# 模拟数据采集
measurements = []
for second in range(duration):
for _ in range(duration):
timestamp = asyncio.get_event_loop().time()
frame_data = {}
@@ -391,7 +407,7 @@ class MultiChannelAnalyzer:
measurements.append({"timestamp": timestamp, "data": frame_data})
await asyncio.sleep(1.0) # 每秒采集一次
await self._ros_node.sleep(1.0) # 每秒采集一次
self.is_measuring = False
@@ -465,6 +481,8 @@ class AutomatedDispenser:
集成称重功能,确保分配精度和重现性。
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, dispenser_id: str = "dispenser_01"):
"""
初始化自动分配器
@@ -479,6 +497,9 @@ class AutomatedDispenser:
self.container_capacity = 1000.0 # mL
self.precision_mode = True
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def move_to_position(self, x: float, y: float, z: float) -> bool:
"""
移动到指定位置
@@ -517,7 +538,7 @@ class AutomatedDispenser:
if viscosity == "high":
dispense_time *= 2 # 高粘度液体需要更长时间
await asyncio.sleep(min(dispense_time, 5.0)) # 最多等待5秒
await self._ros_node.sleep(min(dispense_time, 5.0)) # 最多等待5秒
self.dispensed_total += volume

View File

@@ -12,6 +12,7 @@ from serial import Serial
from serial.serialutil import SerialException
from unilabos.messages import Point3D
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class GrblCNCConnectionError(Exception):
@@ -32,6 +33,7 @@ class GrblCNCInfo:
class GrblCNCAsync:
_status: str = "Offline"
_position: Point3D = Point3D(x=0.0, y=0.0, z=0.0)
_ros_node: BaseROS2DeviceNode
def __init__(self, port: str, address: str = "1", limits: tuple[int, int, int, int, int, int] = (-150, 150, -200, 0, 0, 60)):
self.port = port
@@ -58,6 +60,9 @@ class GrblCNCAsync:
self._run_future: Optional[Future[Any]] = None
self._run_lock = Lock()
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def _read_all(self):
data = self._serial.read_until(b"\n")
data_decoded = data.decode()
@@ -148,7 +153,7 @@ class GrblCNCAsync:
try:
await self._query(command)
while True:
await asyncio.sleep(0.2) # Wait for 0.5 seconds before polling again
await self._ros_node.sleep(0.2) # Wait for 0.5 seconds before polling again
status = await self.get_status()
if "Idle" in status:
@@ -214,7 +219,7 @@ class GrblCNCAsync:
self._pose_number = i
self.pose_number_remaining = len(points) - i
await self.set_position(point)
await asyncio.sleep(0.5)
await self._ros_node.sleep(0.5)
self._step_number = -1
async def stop_operation(self):
@@ -235,7 +240,7 @@ class GrblCNCAsync:
async def open(self):
if self._read_task:
raise GrblCNCConnectionError
self._read_task = asyncio.create_task(self._read_loop())
self._read_task = self._ros_node.create_task(self._read_loop())
try:
await self.get_status()

View File

@@ -2,6 +2,8 @@ import time
import asyncio
from pydantic import BaseModel
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class Point3D(BaseModel):
x: float
@@ -14,9 +16,14 @@ def d(a: Point3D, b: Point3D) -> float:
class MockCNCAsync:
_ros_node: BaseROS2DeviceNode["MockCNCAsync"]
def __init__(self):
self._position: Point3D = Point3D(x=0.0, y=0.0, z=0.0)
self._status = "Idle"
def post_create(self, ros_node):
self._ros_node = ros_node
@property
def position(self) -> Point3D:
@@ -38,5 +45,5 @@ class MockCNCAsync:
self._position.x = current_pos.x + (position.x - current_pos.x) / 20 * (i+1)
self._position.y = current_pos.y + (position.y - current_pos.y) / 20 * (i+1)
self._position.z = current_pos.z + (position.z - current_pos.z) / 20 * (i+1)
await asyncio.sleep(move_time / 20)
await self._ros_node.sleep(move_time / 20)
self._status = "Idle"

File diff suppressed because it is too large Load Diff

View File

@@ -25,6 +25,8 @@ from pylabrobot.resources import (
Tip,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class LiquidHandlerMiddleware(LiquidHandler):
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8):
@@ -536,6 +538,7 @@ class LiquidHandlerMiddleware(LiquidHandler):
class LiquidHandlerAbstract(LiquidHandlerMiddleware):
"""Extended LiquidHandler with additional operations."""
support_touch_tip = True
_ros_node: BaseROS2DeviceNode
def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8):
"""Initialize a LiquidHandler.
@@ -548,8 +551,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
self.group_info = dict()
super().__init__(backend, deck, simulator, channel_num)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
@classmethod
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]):
def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]):
"""Set the liquid in a well."""
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
well.set_liquids([(liquid_name, volume)]) # type: ignore
@@ -1081,7 +1087,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
print(f"Waiting time: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}")
print(f"Time to finish: {time.strftime('%H:%M:%S', time.localtime(time.time() + seconds))}")
await asyncio.sleep(seconds)
await self._ros_node.sleep(seconds)
if msg:
print(f"Done: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}")

View File

@@ -30,6 +30,7 @@ from pylabrobot.liquid_handling.standard import (
from pylabrobot.resources import Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash
from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class PRCXIError(RuntimeError):
@@ -162,6 +163,10 @@ class PRCXI9300Handler(LiquidHandlerAbstract):
)
super().__init__(backend=self._unilabos_backend, deck=deck, simulator=simulator, channel_num=channel_num)
def post_init(self, ros_node: BaseROS2DeviceNode):
super().post_init(ros_node)
self._unilabos_backend.post_init(ros_node)
def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]):
return super().set_liquid(wells, liquid_names, volumes)
@@ -424,6 +429,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
_num_channels = 8 # 默认通道数为 8
_is_reset_ok = False
_ros_node: BaseROS2DeviceNode
@property
def is_reset_ok(self) -> bool:
@@ -456,6 +462,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self._execute_setup = setup
self.debug = debug
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def create_protocol(self, protocol_name):
self.protocol_name = protocol_name
self.steps_todo_list = []
@@ -500,7 +509,7 @@ class PRCXI9300Backend(LiquidHandlerBackend):
self.api_client.call("IAutomation", "Reset")
while not self.is_reset_ok:
print("Waiting for PRCXI9300 to reset...")
await asyncio.sleep(1)
await self._ros_node.sleep(1)
print("PRCXI9300 reset successfully.")
except ConnectionRefusedError as e:
raise RuntimeError(
@@ -533,7 +542,9 @@ class PRCXI9300Backend(LiquidHandlerBackend):
tipspot_index = tipspot.parent.children.index(tipspot)
tip_columns.append(tipspot_index // 8)
if len(set(tip_columns)) != 1:
raise ValueError("All pickups must be from the same tip column. Found different columns: " + str(tip_columns))
raise ValueError(
"All pickups must be from the same tip column. Found different columns: " + str(tip_columns)
)
PlateNo = plate_indexes[0] + 1
hole_col = tip_columns[0] + 1
hole_row = 1
@@ -1109,12 +1120,15 @@ class PRCXI9300Api:
"LiquidDispensingMethod": liquid_method,
}
class DefaultLayout:
def __init__(self, product_name: str = "PRCXI9300"):
self.labresource = {}
if product_name not in ["PRCXI9300", "PRCXI9320"]:
raise ValueError(f"Unsupported product_name: {product_name}. Only 'PRCXI9300' and 'PRCXI9320' are supported.")
raise ValueError(
f"Unsupported product_name: {product_name}. Only 'PRCXI9300' and 'PRCXI9320' are supported."
)
if product_name == "PRCXI9300":
self.rows = 2
@@ -1129,25 +1143,93 @@ class DefaultLayout:
self.layout = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
self.trash_slot = 16
self.waste_liquid_slot = 12
self.default_layout = {"MatrixId":f"{time.time()}","MatrixName":f"{time.time()}","MatrixCount":16,"WorkTablets":
[{"Number": 1, "Code": "T1", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 2, "Code": "T2", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 3, "Code": "T3", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 4, "Code": "T4", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 5, "Code": "T5", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 6, "Code": "T6", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 7, "Code": "T7", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 8, "Code": "T8", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 9, "Code": "T9", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 10, "Code": "T10", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 11, "Code": "T11", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 12, "Code": "T12", "Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0}}, # 这个设置成废液槽,用储液槽表示
{"Number": 13, "Code": "T13", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 14, "Code": "T14", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 15, "Code": "T15", "Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0}},
{"Number": 16, "Code": "T16", "Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0}} # 这个设置成垃圾桶,用储液槽表示
]
}
self.default_layout = {
"MatrixId": f"{time.time()}",
"MatrixName": f"{time.time()}",
"MatrixCount": 16,
"WorkTablets": [
{
"Number": 1,
"Code": "T1",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 2,
"Code": "T2",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 3,
"Code": "T3",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 4,
"Code": "T4",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 5,
"Code": "T5",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 6,
"Code": "T6",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 7,
"Code": "T7",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 8,
"Code": "T8",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 9,
"Code": "T9",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 10,
"Code": "T10",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 11,
"Code": "T11",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 12,
"Code": "T12",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
}, # 这个设置成废液槽,用储液槽表示
{
"Number": 13,
"Code": "T13",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 14,
"Code": "T14",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 15,
"Code": "T15",
"Material": {"uuid": "57b1e4711e9e4a32b529f3132fc5931f", "materialEnum": 0},
},
{
"Number": 16,
"Code": "T16",
"Material": {"uuid": "730067cf07ae43849ddf4034299030e9", "materialEnum": 0},
}, # 这个设置成垃圾桶,用储液槽表示
],
}
def get_layout(self) -> Dict[str, Any]:
return {
@@ -1155,7 +1237,7 @@ class DefaultLayout:
"columns": self.columns,
"layout": self.layout,
"trash_slot": self.trash_slot,
"waste_liquid_slot": self.waste_liquid_slot
"waste_liquid_slot": self.waste_liquid_slot,
}
def get_trash_slot(self) -> int:
@@ -1178,17 +1260,19 @@ class DefaultLayout:
reserved_positions = {12, 16}
available_positions = [i for i in range(1, 17) if i not in reserved_positions]
# 计算总需求
# 计算总需求
total_needed = sum(count for _, _, count in needs)
if total_needed > len(available_positions):
raise ValueError(f"需要 {total_needed} 个位置,但只有 {len(available_positions)} 个可用位置排除位置12和16")
raise ValueError(
f"需要 {total_needed} 个位置,但只有 {len(available_positions)} 个可用位置排除位置12和16"
)
# 依次分配位置
current_pos = 0
for reagent_name, material_name, count in needs:
material_uuid = self.labresource[material_name]['uuid']
material_enum = self.labresource[material_name]['materialEnum']
material_uuid = self.labresource[material_name]["uuid"]
material_enum = self.labresource[material_name]["materialEnum"]
for _ in range(count):
if current_pos >= len(available_positions):
@@ -1196,17 +1280,18 @@ class DefaultLayout:
position = available_positions[current_pos]
# 找到对应的tablet并更新
for tablet in self.default_layout['WorkTablets']:
if tablet['Number'] == position:
tablet['Material']['uuid'] = material_uuid
tablet['Material']['materialEnum'] = material_enum
layout_list.append(dict(reagent_name=reagent_name, material_name=material_name, positions=position))
for tablet in self.default_layout["WorkTablets"]:
if tablet["Number"] == position:
tablet["Material"]["uuid"] = material_uuid
tablet["Material"]["materialEnum"] = material_enum
layout_list.append(
dict(reagent_name=reagent_name, material_name=material_name, positions=position)
)
break
current_pos += 1
return self.default_layout, layout_list
if __name__ == "__main__":
# Example usage
# 1. 用导出的json给每个T1 T2板子设定相应的物料如果是孔板和枪头盒要对应区分
@@ -1302,10 +1387,7 @@ if __name__ == "__main__":
# # # plate2.set_well_liquids(plate_2_liquids)
# handler = PRCXI9300Handler(deck=deck, host="10.181.214.132", port=9999,
# handler = PRCXI9300Handler(deck=deck, host="10.181.214.132", port=9999,
# timeout=10.0, setup=False, debug=False,
# simulator=True,
# matrix_id="71593",
@@ -1391,10 +1473,7 @@ if __name__ == "__main__":
# # input("Press Enter to continue...") # Wait for user input before proceeding
# # print("PRCXI9300Handler initialized with deck and host settings.")
### 9320 ###
### 9320 ###
deck = PRCXI9300Deck(name="PRCXI_Deck", size_x=100, size_y=100, size_z=100)
@@ -1412,12 +1491,15 @@ if __name__ == "__main__":
new_plate: PRCXI9300Container = PRCXI9300Container.deserialize(well_containers)
return new_plate
def get_tip_rack(name: str, child_prefix: str="tip") -> PRCXI9300Container:
def get_tip_rack(name: str, child_prefix: str = "tip") -> PRCXI9300Container:
tip_racks = opentrons_96_tiprack_10ul(name).serialize()
tip_rack = PRCXI9300Container(
name=name, size_x=50, size_y=50, size_z=10, category="tip_rack", ordering=collections.OrderedDict({
k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()
})
name=name,
size_x=50,
size_y=50,
size_z=10,
category="tip_rack",
ordering=collections.OrderedDict({k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()}),
)
tip_rack_serialized = tip_rack.serialize()
tip_rack_serialized["parent_name"] = deck.name
@@ -1629,6 +1711,7 @@ if __name__ == "__main__":
)
backend: PRCXI9300Backend = handler.backend
from pylabrobot.resources import set_volume_tracking
set_volume_tracking(enabled=True)
# res = backend.api_client.get_all_materials()
asyncio.run(handler.setup()) # Initialize the handler and setup the connection
@@ -1640,10 +1723,10 @@ if __name__ == "__main__":
for well in plate13.get_all_items():
# well_pos = well.name.split("_")[1] # 走一行
# if well_pos.startswith("A"):
if well.name.startswith("PlateT13"): # 走整个Plate
# if well_pos.startswith("A"):
if well.name.startswith("PlateT13"): # 走整个Plate
asyncio.run(handler.dispense([well], [0.01], [0]))
# asyncio.run(handler.dispense([plate10.get_item("H12")], [1], [0]))
# asyncio.run(handler.dispense([plate13.get_item("A1")], [1], [0]))
# asyncio.run(handler.dispense([plate14.get_item("C5")], [1], [0]))
@@ -1652,26 +1735,25 @@ if __name__ == "__main__":
asyncio.run(handler.run_protocol())
time.sleep(5)
os._exit(0)
# 第一种情景:一个孔往多个孔加液
# 第一种情景:一个孔往多个孔加液
# plate_2_liquids = handler.set_group("water", [plate2.children[0]], [300])
# plate5_liquids = handler.set_group("master_mix", plate5.children[:23], [100]*23)
# 第二个情景:多个孔往多个孔加液(但是个数得对应)
plate_2_liquids = handler.set_group("water", plate2.children[:23], [300]*23)
plate5_liquids = handler.set_group("master_mix", plate5.children[:23], [100]*23)
# 第二个情景:多个孔往多个孔加液(但是个数得对应)
plate_2_liquids = handler.set_group("water", plate2.children[:23], [300] * 23)
plate5_liquids = handler.set_group("master_mix", plate5.children[:23], [100] * 23)
# plate11.set_well_liquids([("Water", 100) if (i % 8 == 0 and i // 8 < 6) else (None, 100) for i in range(96)]) # Set liquids for every 8 wells in plate8
# plate11.set_well_liquids([("Water", 100) if (i % 8 == 0 and i // 8 < 6) else (None, 100) for i in range(96)]) # Set liquids for every 8 wells in plate8
# A = tree_to_list([resource_plr_to_ulab(deck)])
# # with open("deck.json", "w", encoding="utf-8") as f:
# # json.dump(A, f, indent=4, ensure_ascii=False)
# A = tree_to_list([resource_plr_to_ulab(deck)])
# # with open("deck.json", "w", encoding="utf-8") as f:
# # json.dump(A, f, indent=4, ensure_ascii=False)
# print(plate11.get_well(0).tracker.get_used_volume())
# Initialize the backend and setup the connection
# print(plate11.get_well(0).tracker.get_used_volume())
# Initialize the backend and setup the connection
asyncio.run(handler.transfer_group("water", "master_mix", 10)) # Reset tip tracking
# asyncio.run(handler.pick_up_tips([plate8.children[8]],[0]))
# print(plate8.children[8])
# asyncio.run(handler.run_protocol())
@@ -1685,121 +1767,118 @@ if __name__ == "__main__":
# print(plate1.children[0])
# asyncio.run(handler.discard_tips([0]))
# asyncio.run(handler.add_liquid(
# asp_vols=[10]*7,
# dis_vols=[10]*7,
# reagent_sources=plate11.children[:7],
# targets=plate1.children[2:9],
# use_channels=[0],
# flow_rates=[None] * 7,
# offsets=[Coordinate(0, 0, 0)] * 7,
# liquid_height=[None] * 7,
# blow_out_air_volume=[None] * 2,
# delays=None,
# mix_time=3,
# mix_vol=5,
# spread="custom",
# ))
# asyncio.run(handler.add_liquid(
# asp_vols=[10]*7,
# dis_vols=[10]*7,
# reagent_sources=plate11.children[:7],
# targets=plate1.children[2:9],
# use_channels=[0],
# flow_rates=[None] * 7,
# offsets=[Coordinate(0, 0, 0)] * 7,
# liquid_height=[None] * 7,
# blow_out_air_volume=[None] * 2,
# delays=None,
# mix_time=3,
# mix_vol=5,
# spread="custom",
# ))
# asyncio.run(handler.run_protocol()) # Run the protocol
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[10]*2,
# # # dis_vols=[10]*2,
# # # sources=plate11.children[:2],
# # # targets=plate11.children[-2:],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # mix_times=3,
# # # mix_vol=5,
# # # spread="wide",
# # # tip_racks=[plate8]
# # # ))
# # # asyncio.run(handler.remove_liquid(
# # # vols=[10]*2,
# # # sources=plate11.children[:2],
# # # waste_liquid=plate11.children[43],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # spread="wide"
# # # ))
# # asyncio.run(handler.run_protocol())
# # # asyncio.run(handler.discard_tips())
# # # asyncio.run(handler.mix(well_containers.children[:8
# # # ], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
# # #print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[10]*2,
# # # dis_vols=[10]*2,
# # # sources=plate11.children[:2],
# # # targets=plate11.children[-2:],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # mix_times=3,
# # # mix_vol=5,
# # # spread="wide",
# # # tip_racks=[plate8]
# # # ))
# # # asyncio.run(handler.remove_liquid(
# # # vols=[10]*2,
# # # sources=plate11.children[:2],
# # # waste_liquid=plate11.children[43],
# # # use_channels=[0],
# # # offsets=[Coordinate(0, 0, 0)] * 4,
# # # liquid_height=[None] * 2,
# # # blow_out_air_volume=[None] * 2,
# # # delays=None,
# # # spread="wide"
# # # ))
# # asyncio.run(handler.run_protocol())
# # # asyncio.run(handler.discard_tips())
# # # asyncio.run(handler.mix(well_containers.children[:8
# # # ], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100))
# # #print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # asyncio.run(handler.remove_liquid(
# # # vols=[100]*16,
# # # sources=well_containers.children[-16:],
# # # waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # flow_rates=[None] * 32,
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # spread="wide",
# # # ))
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[100]*16,
# # # dis_vols=[100]*16,
# # # tip_racks=[tip_rack],
# # # sources=well_containers.children[-16:],
# # # targets=well_containers.children[:16],
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # asp_flow_rates=[None] * 16,
# # # dis_flow_rates=[None] * 16,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # mix_times=3,
# # # mix_vol=50,
# # # spread="wide",
# # # ))
# # print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # input("pick_up_tips add step")
#asyncio.run(handler.run_protocol()) # Run the protocol
# # # input("Running protocol...")
# # # input("Press Enter to continue...") # Wait for user input before proceeding
# # # print("PRCXI9300Handler initialized with deck and host settings.")
# 一些推荐版位组合的测试样例:
# 一些推荐版位组合的测试样例:
# # # asyncio.run(handler.remove_liquid(
# # # vols=[100]*16,
# # # sources=well_containers.children[-16:],
# # # waste_liquid=well_containers.children[:16], # 这个有些奇怪,但是好像也只能这么写
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # flow_rates=[None] * 32,
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # spread="wide",
# # # ))
# # # asyncio.run(handler.transfer_liquid(
# # # asp_vols=[100]*16,
# # # dis_vols=[100]*16,
# # # tip_racks=[tip_rack],
# # # sources=well_containers.children[-16:],
# # # targets=well_containers.children[:16],
# # # use_channels=[0, 1, 2, 3, 4, 5, 6, 7],
# # # offsets=[Coordinate(0, 0, 0)] * 32,
# # # asp_flow_rates=[None] * 16,
# # # dis_flow_rates=[None] * 16,
# # # liquid_height=[None] * 32,
# # # blow_out_air_volume=[None] * 32,
# # # mix_times=3,
# # # mix_vol=50,
# # # spread="wide",
# # # ))
# # print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info
# # # input("pick_up_tips add step")
# asyncio.run(handler.run_protocol()) # Run the protocol
# # # input("Running protocol...")
# # # input("Press Enter to continue...") # Wait for user input before proceeding
# # # print("PRCXI9300Handler initialized with deck and host settings.")
# 一些推荐版位组合的测试样例:
# 一些推荐版位组合的测试样例:
with open("prcxi_material.json", "r") as f:
material_info = json.load(f)
layout = DefaultLayout("PRCXI9320")
layout.add_lab_resource(material_info)
MatrixLayout_1, dict_1 = layout.recommend_layout([
("reagent_1", "96 细胞培养皿", 3),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 7),
("reagent_4", "10μL加长 Tip头", 1),
])
MatrixLayout_1, dict_1 = layout.recommend_layout(
[
("reagent_1", "96 细胞培养皿", 3),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 7),
("reagent_4", "10μL加长 Tip头", 1),
]
)
print(dict_1)
MatrixLayout_2, dict_2 = layout.recommend_layout([
("reagent_1", "96深孔板", 4),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 1),
("reagent_4", "10μL加长 Tip头", 1),
])
MatrixLayout_2, dict_2 = layout.recommend_layout(
[
("reagent_1", "96深孔板", 4),
("reagent_2", "12道储液槽", 1),
("reagent_3", "200μL Tip头", 1),
("reagent_4", "10μL加长 Tip头", 1),
]
)
# with open("prcxi_material.json", "r") as f:
# material_info = json.load(f)

View File

@@ -8,6 +8,8 @@ import serial.tools.list_ports
from serial import Serial
from serial.serialutil import SerialException
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class RunzeSyringePumpMode(Enum):
Normal = 0
@@ -77,6 +79,8 @@ class RunzeSyringePumpInfo:
class RunzeSyringePumpAsync:
_ros_node: BaseROS2DeviceNode
def __init__(self, port: str, address: str = "1", volume: float = 25000, mode: RunzeSyringePumpMode = None):
self.port = port
self.address = address
@@ -102,6 +106,9 @@ class RunzeSyringePumpAsync:
self._run_future: Optional[Future[Any]] = None
self._run_lock = Lock()
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
def _adjust_total_steps(self):
self.total_steps = 6000 if self.mode == RunzeSyringePumpMode.Normal else 48000
self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000
@@ -182,7 +189,7 @@ class RunzeSyringePumpAsync:
try:
await self._query(command)
while True:
await asyncio.sleep(0.5) # Wait for 0.5 seconds before polling again
await self._ros_node.sleep(0.5) # Wait for 0.5 seconds before polling again
status = await self.query_device_status()
if status == '`':
@@ -364,7 +371,7 @@ class RunzeSyringePumpAsync:
if self._read_task:
raise RunzeSyringePumpConnectionError
self._read_task = asyncio.create_task(self._read_loop())
self._read_task = self._ros_node.create_task(self._read_loop())
try:
await self.query_device_status()

View File

@@ -3,9 +3,13 @@ import logging
import time as time_module
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualCentrifuge:
"""Virtual centrifuge device - 简化版,只保留核心功能"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
@@ -32,6 +36,9 @@ class VirtualCentrifuge:
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual centrifuge"""
@@ -132,7 +139,7 @@ class VirtualCentrifuge:
break
# 每秒更新一次
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 离心完成
self.data.update({

View File

@@ -2,9 +2,13 @@ import asyncio
import logging
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualColumn:
"""Virtual column device for RunColumn protocol 🏛️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and 'id' in kwargs:
@@ -28,6 +32,9 @@ class VirtualColumn:
print(f"🏛️ === 虚拟色谱柱 {self.device_id} 已创建 === ✨")
print(f"📏 柱参数: 流速={self._max_flow_rate}mL/min | 长度={self._column_length}cm | 直径={self._column_diameter}cm 🔬")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual column 🚀"""
self.logger.info(f"🔧 初始化虚拟色谱柱 {self.device_id}")
@@ -101,7 +108,7 @@ class VirtualColumn:
step_time = separation_time / steps
for i in range(steps):
await asyncio.sleep(step_time)
await self._ros_node.sleep(step_time)
progress = (i + 1) / steps * 100
volume_processed = (i + 1) * 5.0 # 假设每步处理5mL

View File

@@ -4,70 +4,76 @@ import time as time_module
from typing import Dict, Any, Optional
from unilabos.compile.utils.vessel_parser import get_vessel
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualFilter:
"""Virtual filter device - 完全按照 Filter.action 规范 🌊"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
if device_id is None and 'id' in kwargs:
device_id = kwargs.pop('id')
if config is None and 'config' in kwargs:
config = kwargs.pop('config')
if device_id is None and "id" in kwargs:
device_id = kwargs.pop("id")
if config is None and "config" in kwargs:
config = kwargs.pop("config")
self.device_id = device_id or "unknown_filter"
self.config = config or {}
self.logger = logging.getLogger(f"VirtualFilter.{self.device_id}")
self.data = {}
# 从config或kwargs中获取配置参数
self.port = self.config.get('port') or kwargs.get('port', 'VIRTUAL')
self._max_temp = self.config.get('max_temp') or kwargs.get('max_temp', 100.0)
self._max_stir_speed = self.config.get('max_stir_speed') or kwargs.get('max_stir_speed', 1000.0)
self._max_volume = self.config.get('max_volume') or kwargs.get('max_volume', 500.0)
self.port = self.config.get("port") or kwargs.get("port", "VIRTUAL")
self._max_temp = self.config.get("max_temp") or kwargs.get("max_temp", 100.0)
self._max_stir_speed = self.config.get("max_stir_speed") or kwargs.get("max_stir_speed", 1000.0)
self._max_volume = self.config.get("max_volume") or kwargs.get("max_volume", 500.0)
# 处理其他kwargs参数
skip_keys = {'port', 'max_temp', 'max_stir_speed', 'max_volume'}
skip_keys = {"port", "max_temp", "max_stir_speed", "max_volume"}
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual filter 🚀"""
self.logger.info(f"🔧 初始化虚拟过滤器 {self.device_id}")
# 按照 Filter.action 的 feedback 字段初始化
self.data.update({
"status": "Idle",
"progress": 0.0, # Filter.action feedback
"current_temp": 25.0, # Filter.action feedback
"filtered_volume": 0.0, # Filter.action feedback
"message": "Ready for filtration"
})
self.data.update(
{
"status": "Idle",
"progress": 0.0, # Filter.action feedback
"current_temp": 25.0, # Filter.action feedback
"filtered_volume": 0.0, # Filter.action feedback
"message": "Ready for filtration",
}
)
self.logger.info(f"✅ 过滤器 {self.device_id} 初始化完成 🌊")
return True
async def cleanup(self) -> bool:
"""Cleanup virtual filter 🧹"""
self.logger.info(f"🧹 清理虚拟过滤器 {self.device_id} 🔚")
self.data.update({
"status": "Offline"
})
self.data.update({"status": "Offline"})
self.logger.info(f"✅ 过滤器 {self.device_id} 清理完成 💤")
return True
async def filter(
self,
self,
vessel: dict,
filtrate_vessel: dict = {},
stir: bool = False,
stir_speed: float = 300.0,
temp: float = 25.0,
continue_heatchill: bool = False,
volume: float = 0.0
stir: bool = False,
stir_speed: float = 300.0,
temp: float = 25.0,
continue_heatchill: bool = False,
volume: float = 0.0,
) -> bool:
"""Execute filter action - 完全按照 Filter.action 参数 🌊"""
vessel_id, _ = get_vessel(vessel)
@@ -79,59 +85,52 @@ class VirtualFilter:
temp = 25.0 # 0度自动设置为室温
self.logger.info(f"🌡️ 温度自动调整: {original_temp}°C → {temp}°C (室温) 🏠")
elif temp < 4.0:
temp = 4.0 # 小于4度自动设置为4度
temp = 4.0 # 小于4度自动设置为4度
self.logger.info(f"🌡️ 温度自动调整: {original_temp}°C → {temp}°C (最低温度) ❄️")
self.logger.info(f"🌊 开始过滤操作: {vessel_id}{filtrate_vessel_id} 🚰")
self.logger.info(f" 🌪️ 搅拌: {stir} ({stir_speed} RPM)")
self.logger.info(f" 🌡️ 温度: {temp}°C")
self.logger.info(f" 💧 体积: {volume}mL")
self.logger.info(f" 🔥 保持加热: {continue_heatchill}")
# 验证参数
if temp > self._max_temp or temp < 4.0:
error_msg = f"🌡️ 温度 {temp}°C 超出范围 (4-{self._max_temp}°C) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error: 温度超出范围 ⚠️",
"message": error_msg
})
self.data.update({"status": f"Error: 温度超出范围 ⚠️", "message": error_msg})
return False
if stir and stir_speed > self._max_stir_speed:
error_msg = f"🌪️ 搅拌速度 {stir_speed} RPM 超出范围 (0-{self._max_stir_speed} RPM) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error: 搅拌速度超出范围 ⚠️",
"message": error_msg
})
self.data.update({"status": f"Error: 搅拌速度超出范围 ⚠️", "message": error_msg})
return False
if volume > self._max_volume:
error_msg = f"💧 过滤体积 {volume} mL 超出范围 (0-{self._max_volume} mL) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error",
"message": error_msg
})
self.data.update({"status": f"Error", "message": error_msg})
return False
# 开始过滤
filter_volume = volume if volume > 0 else 50.0
self.logger.info(f"🚀 开始过滤 {filter_volume}mL 液体 💧")
self.data.update({
"status": f"Running",
"current_temp": temp,
"filtered_volume": 0.0,
"progress": 0.0,
"message": f"🚀 Starting filtration: {vessel_id}{filtrate_vessel_id}"
})
self.data.update(
{
"status": f"Running",
"current_temp": temp,
"filtered_volume": 0.0,
"progress": 0.0,
"message": f"🚀 Starting filtration: {vessel_id}{filtrate_vessel_id}",
}
)
try:
# 过滤过程 - 实时更新进度
start_time = time_module.time()
# 根据体积和搅拌估算过滤时间
base_time = filter_volume / 5.0 # 5mL/s 基础速度
if stir:
@@ -140,78 +139,79 @@ class VirtualFilter:
if temp > 50.0:
base_time *= 0.7 # 高温加速过滤
self.logger.info(f"🔥 高温加速过滤预计时间减少30% ⚡")
filter_time = max(base_time, 10.0) # 最少10秒
self.logger.info(f"⏱️ 预计过滤时间: {filter_time:.1f}秒 ⌛")
while True:
current_time = time_module.time()
elapsed = current_time - start_time
remaining = max(0, filter_time - elapsed)
progress = min(100.0, (elapsed / filter_time) * 100)
current_filtered = (progress / 100.0) * filter_volume
# 更新状态 - 按照 Filter.action feedback 字段
status_msg = f"🌊 过滤中: {vessel}"
if stir:
status_msg += f" | 🌪️ 搅拌: {stir_speed} RPM"
status_msg += f" | 🌡️ {temp}°C | 📊 {progress:.1f}% | 💧 已过滤: {current_filtered:.1f}mL"
self.data.update({
"progress": progress, # Filter.action feedback
"current_temp": temp, # Filter.action feedback
"filtered_volume": current_filtered, # Filter.action feedback
"status": "Running",
"message": f"🌊 Filtering: {progress:.1f}% complete, {current_filtered:.1f}mL filtered"
})
self.data.update(
{
"progress": progress, # Filter.action feedback
"current_temp": temp, # Filter.action feedback
"filtered_volume": current_filtered, # Filter.action feedback
"status": "Running",
"message": f"🌊 Filtering: {progress:.1f}% complete, {current_filtered:.1f}mL filtered",
}
)
# 进度日志每25%打印一次)
if progress >= 25 and progress % 25 < 1:
self.logger.info(f"📊 过滤进度: {progress:.0f}% | 💧 {current_filtered:.1f}mL 完成 ✨")
if remaining <= 0:
break
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 过滤完成
final_temp = temp if continue_heatchill else 25.0
final_status = f"✅ 过滤完成: {vessel} | 💧 {filter_volume}mL → {filtrate_vessel}"
if continue_heatchill:
final_status += " | 🔥 继续加热搅拌"
self.logger.info(f"🔥 继续保持加热搅拌状态 🌪️")
self.data.update({
"status": final_status,
"progress": 100.0, # Filter.action feedback
"current_temp": final_temp, # Filter.action feedback
"filtered_volume": filter_volume, # Filter.action feedback
"message": f"✅ Filtration completed: {filter_volume}mL filtered from {vessel_id}"
})
self.data.update(
{
"status": final_status,
"progress": 100.0, # Filter.action feedback
"current_temp": final_temp, # Filter.action feedback
"filtered_volume": filter_volume, # Filter.action feedback
"message": f"✅ Filtration completed: {filter_volume}mL filtered from {vessel_id}",
}
)
self.logger.info(f"🎉 过滤完成! 💧 {filter_volume}mL 从 {vessel_id} 过滤到 {filtrate_vessel_id}")
self.logger.info(f"📊 最终状态: 温度 {final_temp}°C | 进度 100% | 体积 {filter_volume}mL 🏁")
return True
except Exception as e:
error_msg = f"过滤过程中发生错误: {str(e)} 💥"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"Error",
"message": f"❌ Filtration failed: {str(e)}"
})
self.data.update({"status": f"Error", "message": f"❌ Filtration failed: {str(e)}"})
return False
# === 核心状态属性 - 按照 Filter.action feedback 字段 ===
@property
def status(self) -> str:
return self.data.get("status", "❓ Unknown")
@property
def progress(self) -> float:
"""Filter.action feedback 字段 📊"""
return self.data.get("progress", 0.0)
@property
def current_temp(self) -> float:
"""Filter.action feedback 字段 🌡️"""
@@ -230,15 +230,15 @@ class VirtualFilter:
@property
def message(self) -> str:
return self.data.get("message", "")
@property
def max_temp(self) -> float:
return self._max_temp
@property
def max_stir_speed(self) -> float:
return self._max_stir_speed
@property
def max_volume(self) -> float:
return self._max_volume
return self._max_volume

View File

@@ -3,9 +3,13 @@ import logging
import time as time_module # 重命名time模块避免与参数冲突
from typing import Dict, Any
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualHeatChill:
"""Virtual heat chill device for HeatChillProtocol testing 🌡️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and 'id' in kwargs:
@@ -35,6 +39,9 @@ class VirtualHeatChill:
print(f"🌡️ === 虚拟温控设备 {self.device_id} 已创建 === ✨")
print(f"🔥 温度范围: {self._min_temp}°C ~ {self._max_temp}°C | 🌪️ 最大搅拌: {self._max_stir_speed} RPM")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual heat chill 🚀"""
self.logger.info(f"🔧 初始化虚拟温控设备 {self.device_id}")
@@ -177,7 +184,7 @@ class VirtualHeatChill:
break
# 等待1秒后再次检查
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 操作完成
final_stir_info = f" | 🌪️ 搅拌: {stir_speed} RPM" if stir else ""

View File

@@ -3,13 +3,19 @@ import logging
import time as time_module
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
def debug_print(message):
"""调试输出 🔍"""
print(f"🌪️ [ROTAVAP] {message}", flush=True)
class VirtualRotavap:
"""Virtual rotary evaporator device - 简化版,只保留核心功能 🌪️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and "id" in kwargs:
@@ -38,56 +44,65 @@ class VirtualRotavap:
print(f"🌪️ === 虚拟旋转蒸发仪 {self.device_id} 已创建 === ✨")
print(f"🔥 温度范围: 10°C ~ {self._max_temp}°C | 🌀 转速范围: 10 ~ {self._max_rotation_speed} RPM")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual rotary evaporator 🚀"""
self.logger.info(f"🔧 初始化虚拟旋转蒸发仪 {self.device_id}")
# 只保留核心状态
self.data.update({
"status": "🏠 待机中",
"rotavap_state": "Ready", # Ready, Evaporating, Completed, Error
"current_temp": 25.0,
"target_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0, # 大气压
"evaporated_volume": 0.0,
"progress": 0.0,
"remaining_time": 0.0,
"message": "🌪️ Ready for evaporation"
})
self.data.update(
{
"status": "🏠 待机中",
"rotavap_state": "Ready", # Ready, Evaporating, Completed, Error
"current_temp": 25.0,
"target_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0, # 大气压
"evaporated_volume": 0.0,
"progress": 0.0,
"remaining_time": 0.0,
"message": "🌪️ Ready for evaporation",
}
)
self.logger.info(f"✅ 旋转蒸发仪 {self.device_id} 初始化完成 🌪️")
self.logger.info(f"📊 设备规格: 温度范围 10°C ~ {self._max_temp}°C | 转速范围 10 ~ {self._max_rotation_speed} RPM")
self.logger.info(
f"📊 设备规格: 温度范围 10°C ~ {self._max_temp}°C | 转速范围 10 ~ {self._max_rotation_speed} RPM"
)
return True
async def cleanup(self) -> bool:
"""Cleanup virtual rotary evaporator 🧹"""
self.logger.info(f"🧹 清理虚拟旋转蒸发仪 {self.device_id} 🔚")
self.data.update({
"status": "💤 离线",
"rotavap_state": "Offline",
"current_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": "💤 System offline"
})
self.data.update(
{
"status": "💤 离线",
"rotavap_state": "Offline",
"current_temp": 25.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": "💤 System offline",
}
)
self.logger.info(f"✅ 旋转蒸发仪 {self.device_id} 清理完成 💤")
return True
async def evaporate(
self,
vessel: str,
pressure: float = 0.1,
temp: float = 60.0,
self,
vessel: str,
pressure: float = 0.1,
temp: float = 60.0,
time: float = 180.0,
stir_speed: float = 100.0,
solvent: str = "",
**kwargs
**kwargs,
) -> bool:
"""Execute evaporate action - 简化版 🌪️"""
# 🔧 新增确保time参数是数值类型
if isinstance(time, str):
try:
@@ -98,31 +113,31 @@ class VirtualRotavap:
elif not isinstance(time, (int, float)):
self.logger.error(f"❌ 时间参数类型无效: {type(time)}使用默认值180.0秒")
time = 180.0
# 确保time是float类型; 并加速
time = float(time) / 10.0
# 🔧 简化处理如果vessel就是设备自己直接操作
if vessel == self.device_id:
debug_print(f"🎯 在设备 {self.device_id} 上直接执行蒸发操作")
actual_vessel = self.device_id
else:
actual_vessel = vessel
# 参数预处理
if solvent:
self.logger.info(f"🧪 识别到溶剂: {solvent}")
# 根据溶剂调整参数
solvent_lower = solvent.lower()
if any(s in solvent_lower for s in ['water', 'aqueous']):
if any(s in solvent_lower for s in ["water", "aqueous"]):
temp = max(temp, 80.0)
pressure = max(pressure, 0.2)
self.logger.info(f"💧 水系溶剂:调整参数 → 温度 {temp}°C, 压力 {pressure} bar")
elif any(s in solvent_lower for s in ['ethanol', 'methanol', 'acetone']):
elif any(s in solvent_lower for s in ["ethanol", "methanol", "acetone"]):
temp = min(temp, 50.0)
pressure = min(pressure, 0.05)
self.logger.info(f"⚡ 易挥发溶剂:调整参数 → 温度 {temp}°C, 压力 {pressure} bar")
self.logger.info(f"🌪️ 开始蒸发操作: {actual_vessel}")
self.logger.info(f" 🥽 容器: {actual_vessel}")
self.logger.info(f" 🌡️ 温度: {temp}°C")
@@ -131,126 +146,140 @@ class VirtualRotavap:
self.logger.info(f" 🌀 转速: {stir_speed} RPM")
if solvent:
self.logger.info(f" 🧪 溶剂: {solvent}")
# 验证参数
if temp > self._max_temp or temp < 10.0:
error_msg = f"🌡️ 温度 {temp}°C 超出范围 (10-{self._max_temp}°C) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 错误: 温度超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg
})
self.data.update(
{
"status": f"❌ 错误: 温度超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg,
}
)
return False
if stir_speed > self._max_rotation_speed or stir_speed < 10.0:
error_msg = f"🌀 旋转速度 {stir_speed} RPM 超出范围 (10-{self._max_rotation_speed} RPM) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 错误: 转速超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg
})
self.data.update(
{
"status": f"❌ 错误: 转速超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg,
}
)
return False
if pressure < 0.01 or pressure > 1.0:
error_msg = f"💨 真空度 {pressure} bar 超出范围 (0.01-1.0 bar) ⚠️"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 错误: 压力超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg
})
self.data.update(
{
"status": f"❌ 错误: 压力超出范围",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": error_msg,
}
)
return False
# 开始蒸发 - 🔧 现在time已经确保是float类型
self.logger.info(f"🚀 启动蒸发程序! 预计用时 {time/60:.1f}分钟 ⏱️")
self.data.update({
"status": f"🌪️ 蒸发中: {actual_vessel}",
"rotavap_state": "Evaporating",
"current_temp": temp,
"target_temp": temp,
"rotation_speed": stir_speed,
"vacuum_pressure": pressure,
"remaining_time": time,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": f"🌪️ Evaporating {actual_vessel} at {temp}°C, {pressure} bar, {stir_speed} RPM"
})
self.data.update(
{
"status": f"🌪️ 蒸发中: {actual_vessel}",
"rotavap_state": "Evaporating",
"current_temp": temp,
"target_temp": temp,
"rotation_speed": stir_speed,
"vacuum_pressure": pressure,
"remaining_time": time,
"progress": 0.0,
"evaporated_volume": 0.0,
"message": f"🌪️ Evaporating {actual_vessel} at {temp}°C, {pressure} bar, {stir_speed} RPM",
}
)
try:
# 蒸发过程 - 实时更新进度
start_time = time_module.time()
total_time = time
last_logged_progress = 0
while True:
current_time = time_module.time()
elapsed = current_time - start_time
remaining = max(0, total_time - elapsed)
progress = min(100.0, (elapsed / total_time) * 100)
# 模拟蒸发体积 - 根据溶剂类型调整
if solvent and any(s in solvent.lower() for s in ['water', 'aqueous']):
if solvent and any(s in solvent.lower() for s in ["water", "aqueous"]):
evaporated_vol = progress * 0.6 # 水系溶剂蒸发慢
elif solvent and any(s in solvent.lower() for s in ['ethanol', 'methanol', 'acetone']):
elif solvent and any(s in solvent.lower() for s in ["ethanol", "methanol", "acetone"]):
evaporated_vol = progress * 1.0 # 易挥发溶剂蒸发快
else:
evaporated_vol = progress * 0.8 # 默认蒸发量
# 🔧 更新状态 - 确保包含所有必需字段
status_msg = f"🌪️ 蒸发中: {actual_vessel} | 🌡️ {temp}°C | 💨 {pressure} bar | 🌀 {stir_speed} RPM | 📊 {progress:.1f}% | ⏰ 剩余: {remaining:.0f}s"
self.data.update({
"remaining_time": remaining,
"progress": progress,
"evaporated_volume": evaporated_vol,
"current_temp": temp,
"status": status_msg,
"message": f"🌪️ Evaporating: {progress:.1f}% complete, 💧 {evaporated_vol:.1f}mL evaporated, ⏰ {remaining:.0f}s remaining"
})
self.data.update(
{
"remaining_time": remaining,
"progress": progress,
"evaporated_volume": evaporated_vol,
"current_temp": temp,
"status": status_msg,
"message": f"🌪️ Evaporating: {progress:.1f}% complete, 💧 {evaporated_vol:.1f}mL evaporated, ⏰ {remaining:.0f}s remaining",
}
)
# 进度日志每25%打印一次)
if progress >= 25 and int(progress) % 25 == 0 and int(progress) != last_logged_progress:
self.logger.info(f"📊 蒸发进度: {progress:.0f}% | 💧 已蒸发: {evaporated_vol:.1f}mL | ⏰ 剩余: {remaining:.0f}s ✨")
self.logger.info(
f"📊 蒸发进度: {progress:.0f}% | 💧 已蒸发: {evaporated_vol:.1f}mL | ⏰ 剩余: {remaining:.0f}s ✨"
)
last_logged_progress = int(progress)
# 时间到了,退出循环
if remaining <= 0:
break
# 每秒更新一次
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
# 蒸发完成
if solvent and any(s in solvent.lower() for s in ['water', 'aqueous']):
if solvent and any(s in solvent.lower() for s in ["water", "aqueous"]):
final_evaporated = 60.0 # 水系溶剂
elif solvent and any(s in solvent.lower() for s in ['ethanol', 'methanol', 'acetone']):
elif solvent and any(s in solvent.lower() for s in ["ethanol", "methanol", "acetone"]):
final_evaporated = 100.0 # 易挥发溶剂
else:
final_evaporated = 80.0 # 默认
self.data.update({
"status": f"✅ 蒸发完成: {actual_vessel} | 💧 蒸发量: {final_evaporated:.1f}mL",
"rotavap_state": "Completed",
"evaporated_volume": final_evaporated,
"progress": 100.0,
"current_temp": temp,
"remaining_time": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"✅ Evaporation completed: {final_evaporated}mL evaporated from {actual_vessel}"
})
self.data.update(
{
"status": f"✅ 蒸发完成: {actual_vessel} | 💧 蒸发量: {final_evaporated:.1f}mL",
"rotavap_state": "Completed",
"evaporated_volume": final_evaporated,
"progress": 100.0,
"current_temp": temp,
"remaining_time": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"✅ Evaporation completed: {final_evaporated}mL evaporated from {actual_vessel}",
}
)
self.logger.info(f"🎉 蒸发操作完成! ✨")
self.logger.info(f"📊 蒸发结果:")
@@ -262,24 +291,26 @@ class VirtualRotavap:
self.logger.info(f" ⏱️ 总用时: {total_time:.0f}s")
if solvent:
self.logger.info(f" 🧪 处理溶剂: {solvent} 🏁")
return True
except Exception as e:
# 出错处理
error_msg = f"蒸发过程中发生错误: {str(e)} 💥"
self.logger.error(f"{error_msg}")
self.data.update({
"status": f"❌ 蒸发错误: {str(e)}",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"❌ Evaporation failed: {str(e)}"
})
self.data.update(
{
"status": f"❌ 蒸发错误: {str(e)}",
"rotavap_state": "Error",
"current_temp": 25.0,
"progress": 0.0,
"evaporated_volume": 0.0,
"rotation_speed": 0.0,
"vacuum_pressure": 1.0,
"message": f"❌ Evaporation failed: {str(e)}",
}
)
return False
# === 核心状态属性 ===

View File

@@ -2,9 +2,13 @@ import asyncio
import logging
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualSeparator:
"""Virtual separator device for SeparateProtocol testing"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs):
# 处理可能的不同调用方式
@@ -35,6 +39,9 @@ class VirtualSeparator:
for key, value in kwargs.items():
if key not in skip_keys and not hasattr(self, key):
setattr(self, key, value)
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual separator"""
@@ -119,14 +126,14 @@ class VirtualSeparator:
for repeat in range(repeats):
# 搅拌阶段
for progress in range(0, 51, 10):
await asyncio.sleep(simulation_time / (repeats * 10))
await self._ros_node.sleep(simulation_time / (repeats * 10))
overall_progress = ((repeat * 100) + (progress * 0.5)) / repeats
self.data["progress"] = overall_progress
self.data["message"] = f"{repeat+1}次分离 - 搅拌中 ({progress}%)"
# 静置分相阶段
for progress in range(50, 101, 10):
await asyncio.sleep(simulation_time / (repeats * 10))
await self._ros_node.sleep(simulation_time / (repeats * 10))
overall_progress = ((repeat * 100) + (progress * 0.5)) / repeats
self.data["progress"] = overall_progress
self.data["message"] = f"{repeat+1}次分离 - 静置分相中 ({progress}%)"

View File

@@ -2,11 +2,16 @@ import time
import asyncio
from typing import Union
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualSolenoidValve:
"""
虚拟电磁阀门 - 简单的开关型阀门,只有开启和关闭两个状态
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: dict = None, **kwargs):
# 从配置中获取参数,提供默认值
if config is None:
@@ -21,6 +26,9 @@ class VirtualSolenoidValve:
self._status = "Idle"
self._valve_state = "Closed" # "Open" or "Closed"
self._is_open = False
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化设备"""
@@ -63,7 +71,7 @@ class VirtualSolenoidValve:
self._status = "Busy"
# 模拟阀门响应时间
await asyncio.sleep(self.response_time)
await self._ros_node.sleep(self.response_time)
# 处理不同的命令格式
if isinstance(command, str):

View File

@@ -3,6 +3,8 @@ import logging
import re
from typing import Dict, Any, Optional
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualSolidDispenser:
"""
虚拟固体粉末加样器 - 用于处理 Add Protocol 中的固体试剂添加 ⚗️
@@ -13,6 +15,8 @@ class VirtualSolidDispenser:
- 简单反馈:成功/失败 + 消息 📊
"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
self.device_id = device_id or "virtual_solid_dispenser"
self.config = config or {}
@@ -32,6 +36,9 @@ class VirtualSolidDispenser:
print(f"⚗️ === 虚拟固体分配器 {self.device_id} 创建成功! === ✨")
print(f"📊 设备规格: 最大容量 {self.max_capacity}g | 精度 {self.precision}g 🎯")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化固体加样器 🚀"""
self.logger.info(f"🔧 初始化固体分配器 {self.device_id}")
@@ -263,7 +270,7 @@ class VirtualSolidDispenser:
for i in range(steps):
progress = (i + 1) / steps * 100
await asyncio.sleep(step_time)
await self._ros_node.sleep(step_time)
if i % 2 == 0: # 每隔一步显示进度
self.logger.debug(f"📊 加样进度: {progress:.0f}% | {amount_emoji} 正在分配 {reagent}...")

View File

@@ -3,9 +3,13 @@ import logging
import time as time_module
from typing import Dict, Any
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualStirrer:
"""Virtual stirrer device for StirProtocol testing - 功能完整版 🌪️"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs):
# 处理可能的不同调用方式
if device_id is None and 'id' in kwargs:
@@ -34,6 +38,9 @@ class VirtualStirrer:
print(f"🌪️ === 虚拟搅拌器 {self.device_id} 已创建 === ✨")
print(f"🔧 速度范围: {self._min_speed} ~ {self._max_speed} RPM | 📱 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""Initialize virtual stirrer 🚀"""
self.logger.info(f"🔧 初始化虚拟搅拌器 {self.device_id}")
@@ -134,7 +141,7 @@ class VirtualStirrer:
if remaining <= 0:
break
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
self.logger.info(f"✅ 搅拌阶段完成! 🌪️ {stir_speed} RPM × {stir_time}s")
@@ -176,7 +183,7 @@ class VirtualStirrer:
if remaining <= 0:
break
await asyncio.sleep(1.0)
await self._ros_node.sleep(1.0)
self.logger.info(f"✅ 沉降阶段完成! 🛑 静置 {settling_time}s")

View File

@@ -4,6 +4,8 @@ from enum import Enum
from typing import Union, Optional
import logging
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
class VirtualPumpMode(Enum):
Normal = 0
@@ -14,6 +16,8 @@ class VirtualPumpMode(Enum):
class VirtualTransferPump:
"""虚拟转移泵类 - 模拟泵的基本功能,无需实际硬件 🚰"""
_ros_node: BaseROS2DeviceNode
def __init__(self, device_id: str = None, config: dict = None, **kwargs):
"""
初始化虚拟转移泵
@@ -53,6 +57,9 @@ class VirtualTransferPump:
print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s")
print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}")
def post_init(self, ros_node: BaseROS2DeviceNode):
self._ros_node = ros_node
async def initialize(self) -> bool:
"""初始化虚拟泵 🚀"""
self.logger.info(f"🔧 初始化虚拟转移泵 {self.device_id}")
@@ -104,7 +111,7 @@ class VirtualTransferPump:
async def _simulate_operation(self, duration: float):
"""模拟操作延时 ⏱️"""
self._status = "Busy"
await asyncio.sleep(duration)
await self._ros_node.sleep(duration)
self._status = "Idle"
def _calculate_duration(self, volume: float, velocity: float = None) -> float:
@@ -223,7 +230,7 @@ class VirtualTransferPump:
# 等待一小步时间
if i < steps and step_duration > 0:
await asyncio.sleep(step_duration)
await self._ros_node.sleep(step_duration)
else:
# 移动距离很小,直接完成
self._position = target_position
@@ -341,7 +348,7 @@ class VirtualTransferPump:
# 短暂停顿
self.logger.debug("⏸️ 短暂停顿...")
await asyncio.sleep(0.1)
await self._ros_node.sleep(0.1)
# 排液
await self.dispense(volume, dispense_velocity)

View File

@@ -53,7 +53,7 @@ from unilabos.ros.nodes.resource_tracker import (
)
from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
from rclpy.task import Task
from rclpy.task import Task, Future
from unilabos.utils.import_manager import default_manager
from unilabos.utils.log import info, debug, warning, error, critical, logger, trace
from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str
@@ -555,6 +555,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
rclpy.get_global_executor().add_node(self)
self.lab_logger().debug(f"ROS节点初始化完成")
async def sleep(self, rel_time: float, callback_group=None):
if callback_group is None:
callback_group = self.callback_group
await ROS2DeviceNode.async_wait_for(self, rel_time, callback_group)
@classmethod
async def create_task(cls, func, trace_error=True, **kwargs) -> Task:
return ROS2DeviceNode.run_async_func(func, trace_error, **kwargs)
async def update_resource(self, resources: List["ResourcePLR"]):
r = SerialCommand.Request()
tree_set = ResourceTreeSet.from_plr_resources(resources)
@@ -647,7 +656,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
].call_async(
SerialCommand.Request(
command=json.dumps(
{"data": {"data": resources_uuid, "with_children": False}, "action": "get"}
{"data": {"data": resources_uuid, "with_children": True if action == "add" else "update"}, "action": "get"}
)
)
) # type: ignore
@@ -1399,6 +1408,14 @@ class ROS2DeviceNode:
future.add_done_callback(_handle_future_exception)
return future
@classmethod
async def async_wait_for(cls, node: Node, wait_time: float, callback_group=None):
future = Future()
timer = node.create_timer(wait_time, lambda : future.set_result(None), callback_group=callback_group, clock=node.get_clock())
await future
timer.cancel()
node.destroy_timer(timer)
@property
def driver_instance(self):
return self._driver_instance

View File

@@ -18,7 +18,8 @@ from unilabos_msgs.srv import (
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand, ResourceGet,
SerialCommand,
ResourceGet,
) # type: ignore
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID