diff --git a/docs/user_guide/best_practice.md b/docs/user_guide/best_practice.md index 767dc4d..499ee9e 100644 --- a/docs/user_guide/best_practice.md +++ b/docs/user_guide/best_practice.md @@ -452,8 +452,9 @@ unilab --ak your_ak --sk your_sk -g test/experiments/mock_devices/mock_all.json **操作步骤:** 1. 将两个 `container` 拖拽到 `workstation` 中 -2. 将 `virtual_transfer_pump` 拖拽到 `workstation` 中 -3. 在画布上连接它们(建立父子关系) +2. 将 `virtual_multiway_valve` 拖拽到 `workstation` 中 +3. 将 `virtual_transfer_pump` 拖拽到 `workstation` 中 +4. 在画布上连接它们(建立父子关系) ![设备连接](image/links.png) diff --git a/docs/user_guide/image/links.png b/docs/user_guide/image/links.png index 7e5e2bb..c5fc245 100644 Binary files a/docs/user_guide/image/links.png and b/docs/user_guide/image/links.png differ diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index 7215fc5..d2dd497 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -95,8 +95,29 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: return total_volume -def is_integrated_pump(node_name): - return "pump" in node_name and "valve" in node_name +def is_integrated_pump(node_class: str, node_name: str = "") -> bool: + """ + 判断是否为泵阀一体设备 + """ + class_lower = (node_class or "").lower() + name_lower = (node_name or "").lower() + + if "pump" not in class_lower and "pump" not in name_lower: + return False + + integrated_markers = [ + "valve", + "pump_valve", + "pumpvalve", + "integrated", + "transfer_pump", + ] + + for marker in integrated_markers: + if marker in class_lower or marker in name_lower: + return True + + return False def find_connected_pump(G, valve_node): @@ -186,7 +207,9 @@ def build_pump_valve_maps(G, pump_backbone): debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") for node in filtered_backbone: - if is_integrated_pump(G.nodes[node]["class"]): + node_data = G.nodes.get(node, {}) + node_class = node_data.get("class", "") or "" + if is_integrated_pump(node_class, node): pumps_from_node[node] = node valve_from_node[node] = node debug_print(f" - 集成泵-阀: {node}") diff --git a/unilabos/devices/virtual/virtual_transferpump.py b/unilabos/devices/virtual/virtual_transferpump.py index 7b8eea8..2d3c9d8 100644 --- a/unilabos/devices/virtual/virtual_transferpump.py +++ b/unilabos/devices/virtual/virtual_transferpump.py @@ -15,35 +15,35 @@ class VirtualPumpMode(Enum): class VirtualTransferPump: """虚拟转移泵类 - 模拟泵的基本功能,无需实际硬件 🚰""" - + _ros_node: BaseROS2DeviceNode - + def __init__(self, device_id: str = None, config: dict = None, **kwargs): """ 初始化虚拟转移泵 - + Args: device_id: 设备ID config: 配置字典,包含max_volume, port等参数 **kwargs: 其他参数,确保兼容性 """ self.device_id = device_id or "virtual_transfer_pump" - + # 从config或kwargs中获取参数,确保类型正确 if config: - self.max_volume = float(config.get('max_volume', 25.0)) - self.port = config.get('port', 'VIRTUAL') + self.max_volume = float(config.get("max_volume", 25.0)) + self.port = config.get("port", "VIRTUAL") else: - self.max_volume = float(kwargs.get('max_volume', 25.0)) - self.port = kwargs.get('port', 'VIRTUAL') - - self._transfer_rate = float(kwargs.get('transfer_rate', 0)) - self.mode = kwargs.get('mode', VirtualPumpMode.Normal) - + self.max_volume = float(kwargs.get("max_volume", 25.0)) + self.port = kwargs.get("port", "VIRTUAL") + + self._transfer_rate = float(kwargs.get("transfer_rate", 0)) + self.mode = kwargs.get("mode", VirtualPumpMode.Normal) + # 状态变量 - 确保都是正确类型 self._status = "Idle" self._position = 0.0 # float - self._max_velocity = 5.0 # float + self._max_velocity = 5.0 # float self._current_volume = 0.0 # float # 🚀 新增:快速模式设置 - 大幅缩短执行时间 @@ -52,14 +52,16 @@ class VirtualTransferPump: self._fast_dispense_time = 1.0 # 快速喷射时间(秒) self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}") - + print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨") - print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s") + print( + f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s" + ) print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}") - + def post_init(self, ros_node: BaseROS2DeviceNode): self._ros_node = ros_node - + async def initialize(self) -> bool: """初始化虚拟泵 🚀""" self.logger.info(f"🔧 初始化虚拟转移泵 {self.device_id} ✨") @@ -68,33 +70,33 @@ class VirtualTransferPump: self._current_volume = 0.0 self.logger.info(f"✅ 转移泵 {self.device_id} 初始化完成 🚰") return True - + async def cleanup(self) -> bool: """清理虚拟泵 🧹""" self.logger.info(f"🧹 清理虚拟转移泵 {self.device_id} 🔚") self._status = "Idle" self.logger.info(f"✅ 转移泵 {self.device_id} 清理完成 💤") return True - + # 基本属性 @property def status(self) -> str: return self._status - + @property def position(self) -> float: """当前柱塞位置 (ml) 📍""" return self._position - + @property def current_volume(self) -> float: """当前注射器中的体积 (ml) 💧""" return self._current_volume - + @property def max_velocity(self) -> float: return self._max_velocity - + @property def transfer_rate(self) -> float: return self._transfer_rate @@ -103,17 +105,17 @@ class VirtualTransferPump: """设置最大速度 (ml/s) 🌊""" self._max_velocity = max(0.1, min(50.0, velocity)) # 限制在合理范围内 self.logger.info(f"🌊 设置最大速度为 {self._max_velocity} mL/s") - + def get_status(self) -> str: """获取泵状态 📋""" return self._status - + async def _simulate_operation(self, duration: float): """模拟操作延时 ⏱️""" self._status = "Busy" await self._ros_node.sleep(duration) self._status = "Idle" - + def _calculate_duration(self, volume: float, velocity: float = None) -> float: """ 计算操作持续时间 ⏰ @@ -121,10 +123,10 @@ class VirtualTransferPump: """ if velocity is None: velocity = self._max_velocity - + # 📊 计算理论时间(用于日志显示) theoretical_duration = abs(volume) / velocity - + # 🚀 如果启用快速模式,使用固定的快速时间 if self._fast_mode: # 根据操作类型选择快速时间 @@ -132,13 +134,13 @@ class VirtualTransferPump: actual_duration = self._fast_move_time else: # 很小的操作 actual_duration = 0.5 - + self.logger.debug(f"⚡ 快速模式: 理论时间 {theoretical_duration:.2f}s → 实际时间 {actual_duration:.2f}s") return actual_duration else: # 正常模式使用理论时间 return theoretical_duration - + def _calculate_display_duration(self, volume: float, velocity: float = None) -> float: """ 计算显示用的持续时间(用于日志) 📊 @@ -147,16 +149,16 @@ class VirtualTransferPump: if velocity is None: velocity = self._max_velocity return abs(volume) / velocity - + # 新的set_position方法 - 专门用于SetPumpPosition动作 async def set_position(self, position: float, max_velocity: float = None): """ 移动到绝对位置 - 专门用于SetPumpPosition动作 🎯 - + Args: position (float): 目标位置 (ml) max_velocity (float): 移动速度 (ml/s) - + Returns: dict: 符合SetPumpPosition.action定义的结果 """ @@ -164,19 +166,19 @@ class VirtualTransferPump: # 验证并转换参数 target_position = float(position) velocity = float(max_velocity) if max_velocity is not None else self._max_velocity - + # 限制位置在有效范围内 target_position = max(0.0, min(float(self.max_volume), target_position)) - + # 计算移动距离 volume_to_move = abs(target_position - self._position) - + # 📊 计算显示用的时间(用于日志) display_duration = self._calculate_display_duration(volume_to_move, velocity) - + # ⚡ 计算实际执行时间(快速模式) actual_duration = self._calculate_duration(volume_to_move, velocity) - + # 🎯 确定操作类型和emoji if target_position > self._position: operation_type = "吸液" @@ -187,28 +189,34 @@ class VirtualTransferPump: else: operation_type = "保持" operation_emoji = "📍" - + self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}") - self.logger.info(f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)") + self.logger.info( + f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)" + ) self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + # 🚀 模拟移动过程 if volume_to_move > 0.01: # 只有当移动距离足够大时才显示进度 start_position = self._position steps = 5 if actual_duration > 0.5 else 2 # 根据实际时间调整步数 step_duration = actual_duration / steps - + self.logger.info(f"🚀 开始{operation_type}... {operation_emoji}") - + for i in range(steps + 1): # 计算当前位置和进度 progress = (i / steps) * 100 if steps > 0 else 100 - current_pos = start_position + (target_position - start_position) * (i / steps) if steps > 0 else target_position - + current_pos = ( + start_position + (target_position - start_position) * (i / steps) + if steps > 0 + else target_position + ) + # 更新状态 if i < steps: self._status = f"{operation_type}中" @@ -216,10 +224,10 @@ class VirtualTransferPump: else: self._status = "Idle" status_emoji = "✅" - + self._position = current_pos self._current_volume = current_pos - + # 显示进度(每25%或最后一步) if i == 0: self.logger.debug(f" 🔄 {operation_type}开始: {progress:.0f}%") @@ -227,7 +235,7 @@ class VirtualTransferPump: self.logger.debug(f" 🔄 {operation_type}进度: {progress:.0f}%") elif i == steps: self.logger.info(f" ✅ {operation_type}完成: {progress:.0f}% | 当前位置: {current_pos:.2f}mL") - + # 等待一小步时间 if i < steps and step_duration > 0: await self._ros_node.sleep(step_duration) @@ -236,25 +244,27 @@ class VirtualTransferPump: self._position = target_position self._current_volume = target_position self.logger.info(f" 📍 微调完成: {target_position:.2f}mL") - + # 确保最终位置准确 self._position = target_position self._current_volume = target_position self._status = "Idle" - + # 📊 最终状态日志 if volume_to_move > 0.01: - self.logger.info(f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") - + self.logger.info( + f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL" + ) + # 返回符合action定义的结果 return { "success": True, "message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})", "final_position": self._position, "final_volume": self._current_volume, - "operation_type": operation_type + "operation_type": operation_type, } - + except Exception as e: error_msg = f"❌ 设置位置失败: {str(e)}" self.logger.error(error_msg) @@ -262,134 +272,136 @@ class VirtualTransferPump: "success": False, "message": error_msg, "final_position": self._position, - "final_volume": self._current_volume + "final_volume": self._current_volume, } - + # 其他泵操作方法 async def pull_plunger(self, volume: float, velocity: float = None): """ 拉取柱塞(吸液) 📥 - + Args: volume (float): 要拉取的体积 (ml) velocity (float): 拉取速度 (ml/s) """ new_position = min(self.max_volume, self._position + volume) actual_volume = new_position - self._position - + if actual_volume <= 0: self.logger.warning("⚠️ 无法吸液 - 已达到最大容量") return - + display_duration = self._calculate_display_duration(actual_volume, velocity) actual_duration = self._calculate_duration(actual_volume, velocity) - + self.logger.info(f"📥 开始吸液: {actual_volume:.2f}mL") self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + await self._simulate_operation(actual_duration) - + self._position = new_position self._current_volume = new_position - + self.logger.info(f"✅ 吸液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") async def push_plunger(self, volume: float, velocity: float = None): """ 推出柱塞(排液) 📤 - + Args: volume (float): 要推出的体积 (ml) velocity (float): 推出速度 (ml/s) """ new_position = max(0, self._position - volume) actual_volume = self._position - new_position - + if actual_volume <= 0: self.logger.warning("⚠️ 无法排液 - 已达到最小容量") return - + display_duration = self._calculate_display_duration(actual_volume, velocity) actual_duration = self._calculate_duration(actual_volume, velocity) - + self.logger.info(f"📤 开始排液: {actual_volume:.2f}mL") self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + await self._simulate_operation(actual_duration) - + self._position = new_position self._current_volume = new_position - + self.logger.info(f"✅ 排液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") # 便捷操作方法 async def aspirate(self, volume: float, velocity: float = None): """吸液操作 📥""" await self.pull_plunger(volume, velocity) - + async def dispense(self, volume: float, velocity: float = None): """排液操作 📤""" await self.push_plunger(volume, velocity) - + async def transfer(self, volume: float, aspirate_velocity: float = None, dispense_velocity: float = None): """转移操作(先吸后排) 🔄""" self.logger.info(f"🔄 开始转移操作: {volume:.2f}mL") - + # 吸液 await self.aspirate(volume, aspirate_velocity) - + # 短暂停顿 self.logger.debug("⏸️ 短暂停顿...") await self._ros_node.sleep(0.1) - + # 排液 await self.dispense(volume, dispense_velocity) - + async def empty_syringe(self, velocity: float = None): """清空注射器""" await self.set_position(0, velocity) - + async def fill_syringe(self, velocity: float = None): """充满注射器""" await self.set_position(self.max_volume, velocity) - + async def stop_operation(self): """停止当前操作""" self._status = "Idle" self.logger.info("Operation stopped") - + # 状态查询方法 def get_position(self) -> float: """获取当前位置""" return self._position - + def get_current_volume(self) -> float: """获取当前体积""" return self._current_volume - + def get_remaining_capacity(self) -> float: """获取剩余容量""" return self.max_volume - self._current_volume - + def is_empty(self) -> bool: """检查是否为空""" return self._current_volume <= 0.01 # 允许小量误差 - + def is_full(self) -> bool: """检查是否已满""" return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差 - + def __str__(self): - return f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" - + return ( + f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" + ) + def __repr__(self): return self.__str__() @@ -398,20 +410,20 @@ class VirtualTransferPump: async def demo(): """虚拟泵使用示例""" pump = VirtualTransferPump("demo_pump", {"max_volume": 50.0}) - + await pump.initialize() - + print(f"Initial state: {pump}") - + # 测试set_position方法 result = await pump.set_position(10.0, max_velocity=2.0) print(f"Set position result: {result}") print(f"After setting position to 10ml: {pump}") - + # 吸液测试 await pump.aspirate(5.0, velocity=2.0) print(f"After aspirating 5ml: {pump}") - + # 清空测试 result = await pump.set_position(0.0) print(f"Empty result: {result}") diff --git a/unilabos/devices/virtual/workbench.py b/unilabos/devices/virtual/workbench.py index 7a8e145..f3ad912 100644 --- a/unilabos/devices/virtual/workbench.py +++ b/unilabos/devices/virtual/workbench.py @@ -11,6 +11,7 @@ Virtual Workbench Device - 模拟工作台设备 注意:调用来自线程池,使用 threading.Lock 进行同步 """ + import logging import time from typing import Dict, Any, Optional @@ -22,12 +23,15 @@ from typing_extensions import TypedDict from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode from unilabos.utils.decorator import not_action +from unilabos.resources.resource_tracker import SampleUUIDsType # ============ TypedDict 返回类型定义 ============ + class MoveToHeatingStationResult(TypedDict): """move_to_heating_station 返回类型""" + success: bool station_id: int material_id: str @@ -37,6 +41,7 @@ class MoveToHeatingStationResult(TypedDict): class StartHeatingResult(TypedDict): """start_heating 返回类型""" + success: bool station_id: int material_id: str @@ -46,6 +51,7 @@ class StartHeatingResult(TypedDict): class MoveToOutputResult(TypedDict): """move_to_output 返回类型""" + success: bool station_id: int material_id: str @@ -53,6 +59,7 @@ class MoveToOutputResult(TypedDict): class PrepareMaterialsResult(TypedDict): """prepare_materials 返回类型 - 批量准备物料""" + success: bool count: int material_1: int # 物料编号1 @@ -65,8 +72,10 @@ class PrepareMaterialsResult(TypedDict): # ============ 状态枚举 ============ + class HeatingStationState(Enum): """加热台状态枚举""" + IDLE = "idle" # 空闲 OCCUPIED = "occupied" # 已放置物料,等待加热 HEATING = "heating" # 加热中 @@ -75,6 +84,7 @@ class HeatingStationState(Enum): class ArmState(Enum): """机械臂状态枚举""" + IDLE = "idle" # 空闲 BUSY = "busy" # 工作中 @@ -82,6 +92,7 @@ class ArmState(Enum): @dataclass class HeatingStation: """加热台数据结构""" + station_id: int state: HeatingStationState = HeatingStationState.IDLE current_material: Optional[str] = None # 当前物料 (如 "A1", "A2") @@ -137,8 +148,7 @@ class VirtualWorkbench: # 加热台状态 (station_id -> HeatingStation) - 立即初始化,不依赖initialize() self._heating_stations: Dict[int, HeatingStation] = { - i: HeatingStation(station_id=i) - for i in range(1, self.NUM_HEATING_STATIONS + 1) + i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1) } self._stations_lock = RLock() # 可重入锁,保护加热台状态 @@ -178,14 +188,16 @@ class VirtualWorkbench: station.heating_progress = 0.0 # 初始化状态 - self.data.update({ - "status": "Ready", - "arm_state": ArmState.IDLE.value, - "arm_current_task": None, - "heating_stations": self._get_stations_status(), - "active_tasks_count": 0, - "message": "工作台就绪", - }) + self.data.update( + { + "status": "Ready", + "arm_state": ArmState.IDLE.value, + "arm_current_task": None, + "heating_stations": self._get_stations_status(), + "active_tasks_count": 0, + "message": "工作台就绪", + } + ) self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪") return True @@ -204,12 +216,14 @@ class VirtualWorkbench: with self._tasks_lock: self._active_tasks.clear() - self.data.update({ - "status": "Offline", - "arm_state": ArmState.IDLE.value, - "heating_stations": {}, - "message": "工作台已关闭", - }) + self.data.update( + { + "status": "Offline", + "arm_state": ArmState.IDLE.value, + "heating_stations": {}, + "message": "工作台已关闭", + } + ) return True def _get_stations_status(self) -> Dict[int, Dict[str, Any]]: @@ -227,12 +241,14 @@ class VirtualWorkbench: def _update_data_status(self, message: Optional[str] = None): """更新状态数据""" - self.data.update({ - "arm_state": self._arm_state.value, - "arm_current_task": self._arm_current_task, - "heating_stations": self._get_stations_status(), - "active_tasks_count": len(self._active_tasks), - }) + self.data.update( + { + "arm_state": self._arm_state.value, + "arm_current_task": self._arm_current_task, + "heating_stations": self._get_stations_status(), + "active_tasks_count": len(self._active_tasks), + } + ) if message: self.data["message"] = message @@ -280,6 +296,7 @@ class VirtualWorkbench: def prepare_materials( self, + sample_uuids: SampleUUIDsType, count: int = 5, ) -> PrepareMaterialsResult: """ @@ -297,10 +314,7 @@ class VirtualWorkbench: # 生成物料列表 A1 - A{count} materials = [i for i in range(1, count + 1)] - self.logger.info( - f"[准备物料] 生成 {count} 个物料: " - f"A1-A{count} -> material_1~material_{count}" - ) + self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}") return { "success": True, @@ -315,6 +329,7 @@ class VirtualWorkbench: def move_to_heating_station( self, + sample_uuids: SampleUUIDsType, material_number: int, ) -> MoveToHeatingStationResult: """ @@ -407,6 +422,7 @@ class VirtualWorkbench: def start_heating( self, + sample_uuids: SampleUUIDsType, station_id: int, material_number: int, ) -> StartHeatingResult: @@ -503,6 +519,7 @@ class VirtualWorkbench: def move_to_output( self, + sample_uuids: SampleUUIDsType, station_id: int, material_number: int, ) -> MoveToOutputResult: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 952a502..89df756 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -412,16 +412,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): else: for r in rts.root_nodes: r.res_content.parent_uuid = self.uuid - - if ( - len(LIQUID_INPUT_SLOT) - and LIQUID_INPUT_SLOT[0] == -1 - and len(rts.root_nodes) == 1 - and isinstance(rts.root_nodes[0], RegularContainer) - ): + rts_plr_instances = rts.to_plr_resources() + if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer): # noinspection PyTypeChecker - container_instance: RegularContainer = rts.root_nodes[0] - found_resources = self.resource_tracker.figure_resource({"id": container_instance.name}, try_mode=True) + container_instance: RegularContainer = rts_plr_instances[0] + found_resources = self.resource_tracker.figure_resource({"name": container_instance.name}, try_mode=True) if not len(found_resources): self.resource_tracker.add_resource(container_instance) logger.info(f"添加物料{container_instance.name}到资源跟踪器") @@ -430,7 +425,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): found_resource = found_resources[0] if isinstance(found_resource, RegularContainer): logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") - found_resource.state.update(json.loads(container_instance.state)) + found_resource.state.update(container_instance.state) elif isinstance(found_resource, dict): raise ValueError("已不支持 字典 版本的RegularContainer") else: @@ -443,7 +438,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): "action": "add", "data": { "data": rts.dump(), - "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", + "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else self.uuid, "first_add": False, }, } @@ -1538,11 +1533,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): if isinstance(rs, list): for r in rs: res = self.resource_tracker.parent_resource(r) # 获取 resource 对象 + if res is None: + res = rs + if id(res) not in seen: + seen.add(id(res)) + unique_resources.append(res) else: res = self.resource_tracker.parent_resource(rs) - if id(res) not in seen: - seen.add(id(res)) - unique_resources.append(res) + if res is None: + res = rs + if id(res) not in seen: + seen.add(id(res)) + unique_resources.append(res) # 使用新的资源树接口 if unique_resources: diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index f30e33b..0d62579 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -340,6 +340,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False) # 获取父资源 res = self.resource_tracker.parent_resource(plr) + if res is None: + res = plr if id(res) not in seen: seen.add(id(res)) unique_resources.append(res) diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 35aca5e..47e7533 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -52,7 +52,8 @@ class DeviceClassCreator(Generic[T]): if self.device_instance is not None: for c in self.children: if c.res_content.type != "device": - self.resource_tracker.add_resource(c.get_plr_nested_dict()) + res = ResourceTreeSet([ResourceTreeInstance(c)]).to_plr_resources()[0] + self.resource_tracker.add_resource(res) def create_instance(self, data: Dict[str, Any]) -> T: """ @@ -119,7 +120,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): # return resource, source_type def _process_resource_references( - self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None + self, data: Any, processed_child_names: Optional[Dict[str, Any]], to_dict=False, states=None, prefix_path="", name_to_uuid=None ) -> Any: """ 递归处理资源引用,替换_resource_child_name对应的资源 @@ -164,6 +165,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): states[prefix_path] = resource_instance.serialize_all_state() return serialized else: + processed_child_names[child_name] = resource_instance self.resource_tracker.add_resource(resource_instance) # 立即设置UUID,state已经在resource_ulab_to_plr中处理过了 if name_to_uuid: @@ -182,12 +184,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): result = {} for key, value in data.items(): new_prefix = f"{prefix_path}.{key}" if prefix_path else key - result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid) + result[key] = self._process_resource_references(value, processed_child_names, to_dict, states, new_prefix, name_to_uuid) return result elif isinstance(data, list): return [ - self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) + self._process_resource_references(item, processed_child_names, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) for i, item in enumerate(data) ] @@ -234,7 +236,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): # 首先处理资源引用 states = {} processed_data = self._process_resource_references( - data, to_dict=True, states=states, name_to_uuid=name_to_uuid + data, {}, to_dict=True, states=states, name_to_uuid=name_to_uuid ) try: @@ -270,7 +272,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): arg_value = spec_args[param_name].annotation data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") - processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid) + processed_child_names = {} + processed_data = self._process_resource_references(data, processed_child_names, to_dict=False, name_to_uuid=name_to_uuid) + for child_name, resource_instance in processed_data.items(): + for ind, name in enumerate([child.res_content.name for child in self.children]): + if name == child_name: + self.children.pop(ind) self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用,调用的自身的attach_resource except Exception as e: logger.error(f"PyLabRobot创建实例失败: {e}") @@ -342,9 +349,10 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children - for child in self.children: - if child.res_content.type != "device": - self.resource_tracker.add_resource(child.get_plr_nested_dict()) + # super(WorkstationNodeCreator, self).create_instance(data)的时候会attach + # for child in self.children: + # if child.res_content.type != "device": + # self.resource_tracker.add_resource(child.get_plr_nested_dict()) deck_dict = data.get("deck") if deck_dict: from pylabrobot.resources import Deck, Resource