diff --git a/docs/user_guide/best_practice.md b/docs/user_guide/best_practice.md index 767dc4d..499ee9e 100644 --- a/docs/user_guide/best_practice.md +++ b/docs/user_guide/best_practice.md @@ -452,8 +452,9 @@ unilab --ak your_ak --sk your_sk -g test/experiments/mock_devices/mock_all.json **操作步骤:** 1. 将两个 `container` 拖拽到 `workstation` 中 -2. 将 `virtual_transfer_pump` 拖拽到 `workstation` 中 -3. 在画布上连接它们(建立父子关系) +2. 将 `virtual_multiway_valve` 拖拽到 `workstation` 中 +3. 将 `virtual_transfer_pump` 拖拽到 `workstation` 中 +4. 在画布上连接它们(建立父子关系) ![设备连接](image/links.png) diff --git a/docs/user_guide/image/links.png b/docs/user_guide/image/links.png index 7e5e2bb..c5fc245 100644 Binary files a/docs/user_guide/image/links.png and b/docs/user_guide/image/links.png differ diff --git a/unilabos/app/model.py b/unilabos/app/model.py index 6f40e73..f80ce35 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -54,6 +54,7 @@ class JobAddReq(BaseModel): action_type: str = Field( examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default="" ) + sample_material: dict = Field(examples=[{"string": "string"}], description="sample uuid to material uuid") action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict) task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") diff --git a/unilabos/app/web/controller.py b/unilabos/app/web/controller.py index 9b0f1ff..b2be584 100644 --- a/unilabos/app/web/controller.py +++ b/unilabos/app/web/controller.py @@ -327,6 +327,7 @@ def job_add(req: JobAddReq) -> JobData: queue_item, action_type=action_type, action_kwargs=action_args, + sample_material=req.sample_material, server_info=server_info, ) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index ea8c8e7..61d337a 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -540,7 +540,7 @@ class MessageProcessor: try: message_str = json.dumps(msg, ensure_ascii=False) await self.websocket.send(message_str) - logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 + # logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 except Exception as e: logger.error(f"[MessageProcessor] Failed to send message: {str(e)}") logger.error(traceback.format_exc()) @@ -652,6 +652,8 @@ class MessageProcessor: async def _handle_job_start(self, data: Dict[str, Any]): """处理job_start消息""" try: + if not data.get("sample_material"): + data["sample_material"] = {} req = JobAddReq(**data) job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action) @@ -683,6 +685,7 @@ class MessageProcessor: queue_item, action_type=req.action_type, action_kwargs=req.action_args, + sample_material=req.sample_material, server_info=req.server_info, ) @@ -1294,7 +1297,7 @@ class WebSocketClient(BaseCommunicationClient): }, } self.message_processor.send_message(message) - logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") + # logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") def publish_job_status( self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index 7215fc5..d2dd497 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -95,8 +95,29 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: return total_volume -def is_integrated_pump(node_name): - return "pump" in node_name and "valve" in node_name +def is_integrated_pump(node_class: str, node_name: str = "") -> bool: + """ + 判断是否为泵阀一体设备 + """ + class_lower = (node_class or "").lower() + name_lower = (node_name or "").lower() + + if "pump" not in class_lower and "pump" not in name_lower: + return False + + integrated_markers = [ + "valve", + "pump_valve", + "pumpvalve", + "integrated", + "transfer_pump", + ] + + for marker in integrated_markers: + if marker in class_lower or marker in name_lower: + return True + + return False def find_connected_pump(G, valve_node): @@ -186,7 +207,9 @@ def build_pump_valve_maps(G, pump_backbone): debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") for node in filtered_backbone: - if is_integrated_pump(G.nodes[node]["class"]): + node_data = G.nodes.get(node, {}) + node_class = node_data.get("class", "") or "" + if is_integrated_pump(node_class, node): pumps_from_node[node] = node valve_from_node[node] = node debug_print(f" - 集成泵-阀: {node}") diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index c397845..c6958db 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -27,7 +27,12 @@ from typing_extensions import TypedDict from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend from unilabos.registry.placeholder_type import ResourceSlot -from unilabos.resources.resource_tracker import ResourceTreeSet, ResourceDict +from unilabos.resources.resource_tracker import ( + ResourceTreeSet, + ResourceDict, + EXTRA_SAMPLE_UUID, + EXTRA_UNILABOS_SAMPLE_UUID, +) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode @@ -241,7 +246,7 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples.append({"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)}) res_volumes.append(volume) self.pending_liquids_dict[channel] = { - "sample_uuid": resource.unilabos_extra.get("sample_uuid", None), + EXTRA_SAMPLE_UUID: sample_uuid_value, "volume": volume, } return SimpleReturn(samples=res_samples, volumes=res_volumes) @@ -283,10 +288,10 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] for resource, volume, channel in zip(resources, vols, use_channels): - res_uuid = self.pending_liquids_dict[channel]["sample_uuid"] + res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID] self.pending_liquids_dict[channel]["volume"] -= volume - resource.unilabos_extra["sample_uuid"] = res_uuid - res_samples.append({"name": resource.name, "sample_uuid": res_uuid}) + resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid + res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid}) res_volumes.append(volume) return SimpleReturn(samples=res_samples, volumes=res_volumes) @@ -691,16 +696,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): ) def set_liquid_from_plate( - self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float] + self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float] ) -> SetLiquidFromPlateReturn: """Set the liquid in wells of a plate by well names (e.g., A1, A2, B3). 如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。 """ - if isinstance(plate, list): # 未来移除 - plate = plate[0] assert issubclass(plate.__class__, Plate), "plate must be a Plate" - plate: Plate = cast(Plate, plate) + plate: Plate = cast(Plate, cast(Resource, plate)) # 根据 well_names 获取对应的 Well 对象 wells = [plate.get_well(name) for name in well_names] res_volumes = [] diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 7ff22eb..e97aa7e 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -803,7 +803,7 @@ class PRCXI9300Handler(LiquidHandlerAbstract): return super().set_liquid(wells, liquid_names, volumes) def set_liquid_from_plate( - self, plate: List[ResourceSlot], well_names: list[str], liquid_names: list[str], volumes: list[float] + self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float] ) -> SetLiquidFromPlateReturn: return super().set_liquid_from_plate(plate, well_names, liquid_names, volumes) diff --git a/unilabos/devices/virtual/virtual_transferpump.py b/unilabos/devices/virtual/virtual_transferpump.py index 7b8eea8..2d3c9d8 100644 --- a/unilabos/devices/virtual/virtual_transferpump.py +++ b/unilabos/devices/virtual/virtual_transferpump.py @@ -15,35 +15,35 @@ class VirtualPumpMode(Enum): class VirtualTransferPump: """虚拟转移泵类 - 模拟泵的基本功能,无需实际硬件 🚰""" - + _ros_node: BaseROS2DeviceNode - + def __init__(self, device_id: str = None, config: dict = None, **kwargs): """ 初始化虚拟转移泵 - + Args: device_id: 设备ID config: 配置字典,包含max_volume, port等参数 **kwargs: 其他参数,确保兼容性 """ self.device_id = device_id or "virtual_transfer_pump" - + # 从config或kwargs中获取参数,确保类型正确 if config: - self.max_volume = float(config.get('max_volume', 25.0)) - self.port = config.get('port', 'VIRTUAL') + self.max_volume = float(config.get("max_volume", 25.0)) + self.port = config.get("port", "VIRTUAL") else: - self.max_volume = float(kwargs.get('max_volume', 25.0)) - self.port = kwargs.get('port', 'VIRTUAL') - - self._transfer_rate = float(kwargs.get('transfer_rate', 0)) - self.mode = kwargs.get('mode', VirtualPumpMode.Normal) - + self.max_volume = float(kwargs.get("max_volume", 25.0)) + self.port = kwargs.get("port", "VIRTUAL") + + self._transfer_rate = float(kwargs.get("transfer_rate", 0)) + self.mode = kwargs.get("mode", VirtualPumpMode.Normal) + # 状态变量 - 确保都是正确类型 self._status = "Idle" self._position = 0.0 # float - self._max_velocity = 5.0 # float + self._max_velocity = 5.0 # float self._current_volume = 0.0 # float # 🚀 新增:快速模式设置 - 大幅缩短执行时间 @@ -52,14 +52,16 @@ class VirtualTransferPump: self._fast_dispense_time = 1.0 # 快速喷射时间(秒) self.logger = logging.getLogger(f"VirtualTransferPump.{self.device_id}") - + print(f"🚰 === 虚拟转移泵 {self.device_id} 已创建 === ✨") - print(f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s") + print( + f"💨 快速模式: {'启用' if self._fast_mode else '禁用'} | 移动时间: {self._fast_move_time}s | 喷射时间: {self._fast_dispense_time}s" + ) print(f"📊 最大容量: {self.max_volume}mL | 端口: {self.port}") - + def post_init(self, ros_node: BaseROS2DeviceNode): self._ros_node = ros_node - + async def initialize(self) -> bool: """初始化虚拟泵 🚀""" self.logger.info(f"🔧 初始化虚拟转移泵 {self.device_id} ✨") @@ -68,33 +70,33 @@ class VirtualTransferPump: self._current_volume = 0.0 self.logger.info(f"✅ 转移泵 {self.device_id} 初始化完成 🚰") return True - + async def cleanup(self) -> bool: """清理虚拟泵 🧹""" self.logger.info(f"🧹 清理虚拟转移泵 {self.device_id} 🔚") self._status = "Idle" self.logger.info(f"✅ 转移泵 {self.device_id} 清理完成 💤") return True - + # 基本属性 @property def status(self) -> str: return self._status - + @property def position(self) -> float: """当前柱塞位置 (ml) 📍""" return self._position - + @property def current_volume(self) -> float: """当前注射器中的体积 (ml) 💧""" return self._current_volume - + @property def max_velocity(self) -> float: return self._max_velocity - + @property def transfer_rate(self) -> float: return self._transfer_rate @@ -103,17 +105,17 @@ class VirtualTransferPump: """设置最大速度 (ml/s) 🌊""" self._max_velocity = max(0.1, min(50.0, velocity)) # 限制在合理范围内 self.logger.info(f"🌊 设置最大速度为 {self._max_velocity} mL/s") - + def get_status(self) -> str: """获取泵状态 📋""" return self._status - + async def _simulate_operation(self, duration: float): """模拟操作延时 ⏱️""" self._status = "Busy" await self._ros_node.sleep(duration) self._status = "Idle" - + def _calculate_duration(self, volume: float, velocity: float = None) -> float: """ 计算操作持续时间 ⏰ @@ -121,10 +123,10 @@ class VirtualTransferPump: """ if velocity is None: velocity = self._max_velocity - + # 📊 计算理论时间(用于日志显示) theoretical_duration = abs(volume) / velocity - + # 🚀 如果启用快速模式,使用固定的快速时间 if self._fast_mode: # 根据操作类型选择快速时间 @@ -132,13 +134,13 @@ class VirtualTransferPump: actual_duration = self._fast_move_time else: # 很小的操作 actual_duration = 0.5 - + self.logger.debug(f"⚡ 快速模式: 理论时间 {theoretical_duration:.2f}s → 实际时间 {actual_duration:.2f}s") return actual_duration else: # 正常模式使用理论时间 return theoretical_duration - + def _calculate_display_duration(self, volume: float, velocity: float = None) -> float: """ 计算显示用的持续时间(用于日志) 📊 @@ -147,16 +149,16 @@ class VirtualTransferPump: if velocity is None: velocity = self._max_velocity return abs(volume) / velocity - + # 新的set_position方法 - 专门用于SetPumpPosition动作 async def set_position(self, position: float, max_velocity: float = None): """ 移动到绝对位置 - 专门用于SetPumpPosition动作 🎯 - + Args: position (float): 目标位置 (ml) max_velocity (float): 移动速度 (ml/s) - + Returns: dict: 符合SetPumpPosition.action定义的结果 """ @@ -164,19 +166,19 @@ class VirtualTransferPump: # 验证并转换参数 target_position = float(position) velocity = float(max_velocity) if max_velocity is not None else self._max_velocity - + # 限制位置在有效范围内 target_position = max(0.0, min(float(self.max_volume), target_position)) - + # 计算移动距离 volume_to_move = abs(target_position - self._position) - + # 📊 计算显示用的时间(用于日志) display_duration = self._calculate_display_duration(volume_to_move, velocity) - + # ⚡ 计算实际执行时间(快速模式) actual_duration = self._calculate_duration(volume_to_move, velocity) - + # 🎯 确定操作类型和emoji if target_position > self._position: operation_type = "吸液" @@ -187,28 +189,34 @@ class VirtualTransferPump: else: operation_type = "保持" operation_emoji = "📍" - + self.logger.info(f"🎯 SET_POSITION: {operation_type} {operation_emoji}") - self.logger.info(f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)") + self.logger.info( + f" 📍 位置: {self._position:.2f}mL → {target_position:.2f}mL (移动 {volume_to_move:.2f}mL)" + ) self.logger.info(f" 🌊 速度: {velocity:.2f} mL/s") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + # 🚀 模拟移动过程 if volume_to_move > 0.01: # 只有当移动距离足够大时才显示进度 start_position = self._position steps = 5 if actual_duration > 0.5 else 2 # 根据实际时间调整步数 step_duration = actual_duration / steps - + self.logger.info(f"🚀 开始{operation_type}... {operation_emoji}") - + for i in range(steps + 1): # 计算当前位置和进度 progress = (i / steps) * 100 if steps > 0 else 100 - current_pos = start_position + (target_position - start_position) * (i / steps) if steps > 0 else target_position - + current_pos = ( + start_position + (target_position - start_position) * (i / steps) + if steps > 0 + else target_position + ) + # 更新状态 if i < steps: self._status = f"{operation_type}中" @@ -216,10 +224,10 @@ class VirtualTransferPump: else: self._status = "Idle" status_emoji = "✅" - + self._position = current_pos self._current_volume = current_pos - + # 显示进度(每25%或最后一步) if i == 0: self.logger.debug(f" 🔄 {operation_type}开始: {progress:.0f}%") @@ -227,7 +235,7 @@ class VirtualTransferPump: self.logger.debug(f" 🔄 {operation_type}进度: {progress:.0f}%") elif i == steps: self.logger.info(f" ✅ {operation_type}完成: {progress:.0f}% | 当前位置: {current_pos:.2f}mL") - + # 等待一小步时间 if i < steps and step_duration > 0: await self._ros_node.sleep(step_duration) @@ -236,25 +244,27 @@ class VirtualTransferPump: self._position = target_position self._current_volume = target_position self.logger.info(f" 📍 微调完成: {target_position:.2f}mL") - + # 确保最终位置准确 self._position = target_position self._current_volume = target_position self._status = "Idle" - + # 📊 最终状态日志 if volume_to_move > 0.01: - self.logger.info(f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") - + self.logger.info( + f"🎉 SET_POSITION 完成! 📍 最终位置: {self._position:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL" + ) + # 返回符合action定义的结果 return { "success": True, "message": f"✅ 成功移动到位置 {self._position:.2f}mL ({operation_type})", "final_position": self._position, "final_volume": self._current_volume, - "operation_type": operation_type + "operation_type": operation_type, } - + except Exception as e: error_msg = f"❌ 设置位置失败: {str(e)}" self.logger.error(error_msg) @@ -262,134 +272,136 @@ class VirtualTransferPump: "success": False, "message": error_msg, "final_position": self._position, - "final_volume": self._current_volume + "final_volume": self._current_volume, } - + # 其他泵操作方法 async def pull_plunger(self, volume: float, velocity: float = None): """ 拉取柱塞(吸液) 📥 - + Args: volume (float): 要拉取的体积 (ml) velocity (float): 拉取速度 (ml/s) """ new_position = min(self.max_volume, self._position + volume) actual_volume = new_position - self._position - + if actual_volume <= 0: self.logger.warning("⚠️ 无法吸液 - 已达到最大容量") return - + display_duration = self._calculate_display_duration(actual_volume, velocity) actual_duration = self._calculate_duration(actual_volume, velocity) - + self.logger.info(f"📥 开始吸液: {actual_volume:.2f}mL") self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + await self._simulate_operation(actual_duration) - + self._position = new_position self._current_volume = new_position - + self.logger.info(f"✅ 吸液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") async def push_plunger(self, volume: float, velocity: float = None): """ 推出柱塞(排液) 📤 - + Args: volume (float): 要推出的体积 (ml) velocity (float): 推出速度 (ml/s) """ new_position = max(0, self._position - volume) actual_volume = self._position - new_position - + if actual_volume <= 0: self.logger.warning("⚠️ 无法排液 - 已达到最小容量") return - + display_duration = self._calculate_display_duration(actual_volume, velocity) actual_duration = self._calculate_duration(actual_volume, velocity) - + self.logger.info(f"📤 开始排液: {actual_volume:.2f}mL") self.logger.info(f" 📍 位置: {self._position:.2f}mL → {new_position:.2f}mL") self.logger.info(f" ⏰ 预计时间: {display_duration:.2f}s") - + if self._fast_mode: self.logger.info(f" ⚡ 快速模式: 实际用时 {actual_duration:.2f}s") - + await self._simulate_operation(actual_duration) - + self._position = new_position self._current_volume = new_position - + self.logger.info(f"✅ 排液完成: {actual_volume:.2f}mL | 💧 当前体积: {self._current_volume:.2f}mL") # 便捷操作方法 async def aspirate(self, volume: float, velocity: float = None): """吸液操作 📥""" await self.pull_plunger(volume, velocity) - + async def dispense(self, volume: float, velocity: float = None): """排液操作 📤""" await self.push_plunger(volume, velocity) - + async def transfer(self, volume: float, aspirate_velocity: float = None, dispense_velocity: float = None): """转移操作(先吸后排) 🔄""" self.logger.info(f"🔄 开始转移操作: {volume:.2f}mL") - + # 吸液 await self.aspirate(volume, aspirate_velocity) - + # 短暂停顿 self.logger.debug("⏸️ 短暂停顿...") await self._ros_node.sleep(0.1) - + # 排液 await self.dispense(volume, dispense_velocity) - + async def empty_syringe(self, velocity: float = None): """清空注射器""" await self.set_position(0, velocity) - + async def fill_syringe(self, velocity: float = None): """充满注射器""" await self.set_position(self.max_volume, velocity) - + async def stop_operation(self): """停止当前操作""" self._status = "Idle" self.logger.info("Operation stopped") - + # 状态查询方法 def get_position(self) -> float: """获取当前位置""" return self._position - + def get_current_volume(self) -> float: """获取当前体积""" return self._current_volume - + def get_remaining_capacity(self) -> float: """获取剩余容量""" return self.max_volume - self._current_volume - + def is_empty(self) -> bool: """检查是否为空""" return self._current_volume <= 0.01 # 允许小量误差 - + def is_full(self) -> bool: """检查是否已满""" return self._current_volume >= (self.max_volume - 0.01) # 允许小量误差 - + def __str__(self): - return f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" - + return ( + f"VirtualTransferPump({self.device_id}: {self._current_volume:.2f}/{self.max_volume} ml, {self._status})" + ) + def __repr__(self): return self.__str__() @@ -398,20 +410,20 @@ class VirtualTransferPump: async def demo(): """虚拟泵使用示例""" pump = VirtualTransferPump("demo_pump", {"max_volume": 50.0}) - + await pump.initialize() - + print(f"Initial state: {pump}") - + # 测试set_position方法 result = await pump.set_position(10.0, max_velocity=2.0) print(f"Set position result: {result}") print(f"After setting position to 10ml: {pump}") - + # 吸液测试 await pump.aspirate(5.0, velocity=2.0) print(f"After aspirating 5ml: {pump}") - + # 清空测试 result = await pump.set_position(0.0) print(f"Empty result: {result}") diff --git a/unilabos/devices/virtual/workbench.py b/unilabos/devices/virtual/workbench.py new file mode 100644 index 0000000..d20885f --- /dev/null +++ b/unilabos/devices/virtual/workbench.py @@ -0,0 +1,742 @@ +""" +Virtual Workbench Device - 模拟工作台设备 +包含: +- 1个机械臂 (每次操作3s, 独占锁) +- 3个加热台 (每次加热10s, 可并行) + +工作流程: +1. A1-A5 物料同时启动,竞争机械臂 +2. 机械臂将物料移动到空闲加热台 +3. 加热完成后,机械臂将物料移动到C1-C5 + +注意:调用来自线程池,使用 threading.Lock 进行同步 +""" + +import logging +import time +from typing import Dict, Any, Optional, List +from dataclasses import dataclass +from enum import Enum +from threading import Lock, RLock + +from typing_extensions import TypedDict + +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode +from unilabos.utils.decorator import not_action +from unilabos.resources.resource_tracker import SampleUUIDsType, LabSample, RETURN_UNILABOS_SAMPLES + + +# ============ TypedDict 返回类型定义 ============ + + +class MoveToHeatingStationResult(TypedDict): + """move_to_heating_station 返回类型""" + + success: bool + station_id: int + material_id: str + material_number: int + message: str + unilabos_samples: List[LabSample] + + +class StartHeatingResult(TypedDict): + """start_heating 返回类型""" + + success: bool + station_id: int + material_id: str + material_number: int + message: str + unilabos_samples: List[LabSample] + + +class MoveToOutputResult(TypedDict): + """move_to_output 返回类型""" + + success: bool + station_id: int + material_id: str + unilabos_samples: List[LabSample] + + +class PrepareMaterialsResult(TypedDict): + """prepare_materials 返回类型 - 批量准备物料""" + + success: bool + count: int + material_1: int # 物料编号1 + material_2: int # 物料编号2 + material_3: int # 物料编号3 + material_4: int # 物料编号4 + material_5: int # 物料编号5 + message: str + unilabos_samples: List[LabSample] + + +# ============ 状态枚举 ============ + + +class HeatingStationState(Enum): + """加热台状态枚举""" + + IDLE = "idle" # 空闲 + OCCUPIED = "occupied" # 已放置物料,等待加热 + HEATING = "heating" # 加热中 + COMPLETED = "completed" # 加热完成,等待取走 + + +class ArmState(Enum): + """机械臂状态枚举""" + + IDLE = "idle" # 空闲 + BUSY = "busy" # 工作中 + + +@dataclass +class HeatingStation: + """加热台数据结构""" + + station_id: int + state: HeatingStationState = HeatingStationState.IDLE + current_material: Optional[str] = None # 当前物料 (如 "A1", "A2") + material_number: Optional[int] = None # 物料编号 (1-5) + heating_start_time: Optional[float] = None + heating_progress: float = 0.0 + + +class VirtualWorkbench: + """ + Virtual Workbench Device - 虚拟工作台设备 + + 模拟一个包含1个机械臂和3个加热台的工作站 + - 机械臂操作耗时3秒,同一时间只能执行一个操作 + - 加热台加热耗时10秒,3个加热台可并行工作 + + 工作流: + 1. 物料A1-A5并发启动(线程池),竞争机械臂使用权 + 2. 获取机械臂后,查找空闲加热台 + 3. 机械臂将物料放入加热台,开始加热 + 4. 加热完成后,机械臂将物料移动到目标位置Cn + """ + + _ros_node: BaseROS2DeviceNode + + # 配置常量 + ARM_OPERATION_TIME: float = 3.0 # 机械臂操作时间(秒) + HEATING_TIME: float = 10.0 # 加热时间(秒) + NUM_HEATING_STATIONS: int = 3 # 加热台数量 + + def __init__(self, device_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None, **kwargs): + # 处理可能的不同调用方式 + if device_id is None and "id" in kwargs: + device_id = kwargs.pop("id") + if config is None and "config" in kwargs: + config = kwargs.pop("config") + + self.device_id = device_id or "virtual_workbench" + self.config = config or {} + + self.logger = logging.getLogger(f"VirtualWorkbench.{self.device_id}") + self.data: Dict[str, Any] = {} + + # 从config中获取可配置参数 + self.ARM_OPERATION_TIME = float(self.config.get("arm_operation_time", 3.0)) + self.HEATING_TIME = float(self.config.get("heating_time", 10.0)) + self.NUM_HEATING_STATIONS = int(self.config.get("num_heating_stations", 3)) + + # 机械臂状态和锁 (使用threading.Lock) + self._arm_lock = Lock() + self._arm_state = ArmState.IDLE + self._arm_current_task: Optional[str] = None + + # 加热台状态 (station_id -> HeatingStation) - 立即初始化,不依赖initialize() + self._heating_stations: Dict[int, HeatingStation] = { + i: HeatingStation(station_id=i) for i in range(1, self.NUM_HEATING_STATIONS + 1) + } + self._stations_lock = RLock() # 可重入锁,保护加热台状态 + + # 任务追踪 + self._active_tasks: Dict[str, Dict[str, Any]] = {} # material_id -> task_info + self._tasks_lock = Lock() + + # 处理其他kwargs参数 + skip_keys = {"arm_operation_time", "heating_time", "num_heating_stations"} + for key, value in kwargs.items(): + if key not in skip_keys and not hasattr(self, key): + setattr(self, key, value) + + self.logger.info(f"=== 虚拟工作台 {self.device_id} 已创建 ===") + self.logger.info( + f"机械臂操作时间: {self.ARM_OPERATION_TIME}s | " + f"加热时间: {self.HEATING_TIME}s | " + f"加热台数量: {self.NUM_HEATING_STATIONS}" + ) + + @not_action + def post_init(self, ros_node: BaseROS2DeviceNode): + """ROS节点初始化后回调""" + self._ros_node = ros_node + + @not_action + def initialize(self) -> bool: + """初始化虚拟工作台""" + self.logger.info(f"初始化虚拟工作台 {self.device_id}") + + # 重置加热台状态 (已在__init__中创建,这里重置为初始状态) + with self._stations_lock: + for station in self._heating_stations.values(): + station.state = HeatingStationState.IDLE + station.current_material = None + station.material_number = None + station.heating_progress = 0.0 + + # 初始化状态 + self.data.update( + { + "status": "Ready", + "arm_state": ArmState.IDLE.value, + "arm_current_task": None, + "heating_stations": self._get_stations_status(), + "active_tasks_count": 0, + "message": "工作台就绪", + } + ) + + self.logger.info(f"工作台初始化完成: {self.NUM_HEATING_STATIONS}个加热台就绪") + return True + + @not_action + def cleanup(self) -> bool: + """清理虚拟工作台""" + self.logger.info(f"清理虚拟工作台 {self.device_id}") + + self._arm_state = ArmState.IDLE + self._arm_current_task = None + + with self._stations_lock: + self._heating_stations.clear() + + with self._tasks_lock: + self._active_tasks.clear() + + self.data.update( + { + "status": "Offline", + "arm_state": ArmState.IDLE.value, + "heating_stations": {}, + "message": "工作台已关闭", + } + ) + return True + + def _get_stations_status(self) -> Dict[int, Dict[str, Any]]: + """获取所有加热台状态""" + with self._stations_lock: + return { + station_id: { + "state": station.state.value, + "current_material": station.current_material, + "material_number": station.material_number, + "heating_progress": station.heating_progress, + } + for station_id, station in self._heating_stations.items() + } + + def _update_data_status(self, message: Optional[str] = None): + """更新状态数据""" + self.data.update( + { + "arm_state": self._arm_state.value, + "arm_current_task": self._arm_current_task, + "heating_stations": self._get_stations_status(), + "active_tasks_count": len(self._active_tasks), + } + ) + if message: + self.data["message"] = message + + def _find_available_heating_station(self) -> Optional[int]: + """查找空闲的加热台 + + Returns: + 空闲加热台ID,如果没有则返回None + """ + with self._stations_lock: + for station_id, station in self._heating_stations.items(): + if station.state == HeatingStationState.IDLE: + return station_id + return None + + def _acquire_arm(self, task_description: str) -> bool: + """获取机械臂使用权(阻塞直到获取) + + Args: + task_description: 任务描述,用于日志 + + Returns: + 是否成功获取 + """ + self.logger.info(f"[{task_description}] 等待获取机械臂...") + + # 阻塞等待获取锁 + self._arm_lock.acquire() + + self._arm_state = ArmState.BUSY + self._arm_current_task = task_description + self._update_data_status(f"机械臂执行: {task_description}") + + self.logger.info(f"[{task_description}] 成功获取机械臂使用权") + return True + + def _release_arm(self): + """释放机械臂""" + task = self._arm_current_task + self._arm_state = ArmState.IDLE + self._arm_current_task = None + self._arm_lock.release() + self._update_data_status(f"机械臂已释放 (完成: {task})") + self.logger.info(f"机械臂已释放 (完成: {task})") + + def prepare_materials( + self, + sample_uuids: SampleUUIDsType, + count: int = 5, + ) -> PrepareMaterialsResult: + """ + 批量准备物料 - 虚拟起始节点 + + 作为工作流的起始节点,生成指定数量的物料编号供后续节点使用。 + 输出5个handle (material_1 ~ material_5),分别对应实验1~5。 + + Args: + count: 待生成的物料数量,默认5 (生成 A1-A5) + + Returns: + PrepareMaterialsResult: 包含 material_1 ~ material_5 用于传递给 move_to_heating_station + """ + # 生成物料列表 A1 - A{count} + materials = [i for i in range(1, count + 1)] + + self.logger.info(f"[准备物料] 生成 {count} 个物料: " f"A1-A{count} -> material_1~material_{count}") + + return { + "success": True, + "count": count, + "material_1": materials[0] if len(materials) > 0 else 0, + "material_2": materials[1] if len(materials) > 1 else 0, + "material_3": materials[2] if len(materials) > 2 else 0, + "material_4": materials[3] if len(materials) > 3 else 0, + "material_5": materials[4] if len(materials) > 4 else 0, + "message": f"已准备 {count} 个物料: A1-A{count}", + "unilabos_samples": [LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for sample_uuid, content in sample_uuids.items()] + } + + def move_to_heating_station( + self, + sample_uuids: SampleUUIDsType, + material_number: int, + ) -> MoveToHeatingStationResult: + """ + 将物料从An位置移动到加热台 + + 多线程并发调用时,会竞争机械臂使用权,并自动查找空闲加热台 + + Args: + material_number: 物料编号 (1-5) + + Returns: + MoveToHeatingStationResult: 包含 station_id, material_number 等用于传递给下一个节点 + """ + # 根据物料编号生成物料ID + material_id = f"A{material_number}" + task_desc = f"移动{material_id}到加热台" + self.logger.info(f"[任务] {task_desc} - 开始执行") + + # 记录任务 + with self._tasks_lock: + self._active_tasks[material_id] = { + "status": "waiting_for_arm", + "start_time": time.time(), + } + + try: + # 步骤1: 等待获取机械臂使用权(竞争) + with self._tasks_lock: + self._active_tasks[material_id]["status"] = "waiting_for_arm" + self._acquire_arm(task_desc) + + # 步骤2: 查找空闲加热台 + with self._tasks_lock: + self._active_tasks[material_id]["status"] = "finding_station" + station_id = None + + # 循环等待直到找到空闲加热台 + while station_id is None: + station_id = self._find_available_heating_station() + if station_id is None: + self.logger.info(f"[{material_id}] 没有空闲加热台,等待中...") + # 释放机械臂,等待后重试 + self._release_arm() + time.sleep(0.5) + self._acquire_arm(task_desc) + + # 步骤3: 占用加热台 - 立即标记为OCCUPIED,防止其他任务选择同一加热台 + with self._stations_lock: + self._heating_stations[station_id].state = HeatingStationState.OCCUPIED + self._heating_stations[station_id].current_material = material_id + self._heating_stations[station_id].material_number = material_number + + # 步骤4: 模拟机械臂移动操作 (3秒) + with self._tasks_lock: + self._active_tasks[material_id]["status"] = "arm_moving" + self._active_tasks[material_id]["assigned_station"] = station_id + self.logger.info(f"[{material_id}] 机械臂正在移动到加热台{station_id}...") + + time.sleep(self.ARM_OPERATION_TIME) + + # 步骤5: 放入加热台完成 + self._update_data_status(f"{material_id}已放入加热台{station_id}") + self.logger.info(f"[{material_id}] 已放入加热台{station_id} (用时{self.ARM_OPERATION_TIME}s)") + + # 释放机械臂 + self._release_arm() + + with self._tasks_lock: + self._active_tasks[material_id]["status"] = "placed_on_station" + + return { + "success": True, + "station_id": station_id, + "material_id": material_id, + "material_number": material_number, + "message": f"{material_id}已成功移动到加热台{station_id}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + except Exception as e: + self.logger.error(f"[{material_id}] 移动失败: {str(e)}") + if self._arm_lock.locked(): + self._release_arm() + return { + "success": False, + "station_id": -1, + "material_id": material_id, + "material_number": material_number, + "message": f"移动失败: {str(e)}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + def start_heating( + self, + sample_uuids: SampleUUIDsType, + station_id: int, + material_number: int, + ) -> StartHeatingResult: + """ + 启动指定加热台的加热程序 + + Args: + station_id: 加热台ID (1-3),从 move_to_heating_station 的 handle 传入 + material_number: 物料编号,从 move_to_heating_station 的 handle 传入 + + Returns: + StartHeatingResult: 包含 station_id, material_number 等用于传递给下一个节点 + """ + self.logger.info(f"[加热台{station_id}] 开始加热") + + if station_id not in self._heating_stations: + return { + "success": False, + "station_id": station_id, + "material_id": "", + "material_number": material_number, + "message": f"无效的加热台ID: {station_id}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + with self._stations_lock: + station = self._heating_stations[station_id] + + if station.current_material is None: + return { + "success": False, + "station_id": station_id, + "material_id": "", + "material_number": material_number, + "message": f"加热台{station_id}上没有物料", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + if station.state == HeatingStationState.HEATING: + return { + "success": False, + "station_id": station_id, + "material_id": station.current_material, + "material_number": material_number, + "message": f"加热台{station_id}已经在加热中", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + material_id = station.current_material + + # 开始加热 + station.state = HeatingStationState.HEATING + station.heating_start_time = time.time() + station.heating_progress = 0.0 + + with self._tasks_lock: + if material_id in self._active_tasks: + self._active_tasks[material_id]["status"] = "heating" + + self._update_data_status(f"加热台{station_id}开始加热{material_id}") + + # 模拟加热过程 (10秒) + start_time = time.time() + while True: + elapsed = time.time() - start_time + progress = min(100.0, (elapsed / self.HEATING_TIME) * 100) + + with self._stations_lock: + self._heating_stations[station_id].heating_progress = progress + + self._update_data_status(f"加热台{station_id}加热中: {progress:.1f}%") + + if elapsed >= self.HEATING_TIME: + break + + time.sleep(1.0) + + # 加热完成 + with self._stations_lock: + self._heating_stations[station_id].state = HeatingStationState.COMPLETED + self._heating_stations[station_id].heating_progress = 100.0 + + with self._tasks_lock: + if material_id in self._active_tasks: + self._active_tasks[material_id]["status"] = "heating_completed" + + self._update_data_status(f"加热台{station_id}加热完成") + self.logger.info(f"[加热台{station_id}] {material_id}加热完成 (用时{self.HEATING_TIME}s)") + + return { + "success": True, + "station_id": station_id, + "material_id": material_id, + "material_number": material_number, + "message": f"加热台{station_id}加热完成", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + def move_to_output( + self, + sample_uuids: SampleUUIDsType, + station_id: int, + material_number: int, + ) -> MoveToOutputResult: + """ + 将物料从加热台移动到输出位置Cn + + Args: + station_id: 加热台ID (1-3),从 start_heating 的 handle 传入 + material_number: 物料编号,从 start_heating 的 handle 传入,用于确定输出位置 Cn + + Returns: + MoveToOutputResult: 包含执行结果 + """ + output_number = material_number # 物料编号决定输出位置 + + if station_id not in self._heating_stations: + return { + "success": False, + "station_id": station_id, + "material_id": "", + "output_position": f"C{output_number}", + "message": f"无效的加热台ID: {station_id}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + with self._stations_lock: + station = self._heating_stations[station_id] + material_id = station.current_material + + if material_id is None: + return { + "success": False, + "station_id": station_id, + "material_id": "", + "output_position": f"C{output_number}", + "message": f"加热台{station_id}上没有物料", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + if station.state != HeatingStationState.COMPLETED: + return { + "success": False, + "station_id": station_id, + "material_id": material_id, + "output_position": f"C{output_number}", + "message": f"加热台{station_id}尚未完成加热 (当前状态: {station.state.value})", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + output_position = f"C{output_number}" + task_desc = f"从加热台{station_id}移动{material_id}到{output_position}" + self.logger.info(f"[任务] {task_desc}") + + try: + with self._tasks_lock: + if material_id in self._active_tasks: + self._active_tasks[material_id]["status"] = "waiting_for_arm_output" + + # 获取机械臂 + self._acquire_arm(task_desc) + + with self._tasks_lock: + if material_id in self._active_tasks: + self._active_tasks[material_id]["status"] = "arm_moving_to_output" + + # 模拟机械臂操作 (3秒) + self.logger.info(f"[{material_id}] 机械臂正在从加热台{station_id}取出并移动到{output_position}...") + time.sleep(self.ARM_OPERATION_TIME) + + # 清空加热台 + with self._stations_lock: + self._heating_stations[station_id].state = HeatingStationState.IDLE + self._heating_stations[station_id].current_material = None + self._heating_stations[station_id].material_number = None + self._heating_stations[station_id].heating_progress = 0.0 + self._heating_stations[station_id].heating_start_time = None + + # 释放机械臂 + self._release_arm() + + # 任务完成 + with self._tasks_lock: + if material_id in self._active_tasks: + self._active_tasks[material_id]["status"] = "completed" + self._active_tasks[material_id]["end_time"] = time.time() + + self._update_data_status(f"{material_id}已移动到{output_position}") + self.logger.info(f"[{material_id}] 已成功移动到{output_position} (用时{self.ARM_OPERATION_TIME}s)") + + return { + "success": True, + "station_id": station_id, + "material_id": material_id, + "output_position": output_position, + "message": f"{material_id}已成功移动到{output_position}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + except Exception as e: + self.logger.error(f"移动到输出位置失败: {str(e)}") + if self._arm_lock.locked(): + self._release_arm() + return { + "success": False, + "station_id": station_id, + "material_id": "", + "output_position": output_position, + "message": f"移动失败: {str(e)}", + "unilabos_samples": [ + LabSample(sample_uuid=sample_uuid, oss_path="", extra={"material_uuid": content} if isinstance(content, str) else content.serialize()) for + sample_uuid, content in sample_uuids.items()] + } + + # ============ 状态属性 ============ + + @property + def status(self) -> str: + return self.data.get("status", "Unknown") + + @property + def arm_state(self) -> str: + return self._arm_state.value + + @property + def arm_current_task(self) -> str: + return self._arm_current_task or "" + + @property + def heating_station_1_state(self) -> str: + with self._stations_lock: + station = self._heating_stations.get(1) + return station.state.value if station else "unknown" + + @property + def heating_station_1_material(self) -> str: + with self._stations_lock: + station = self._heating_stations.get(1) + return station.current_material or "" if station else "" + + @property + def heating_station_1_progress(self) -> float: + with self._stations_lock: + station = self._heating_stations.get(1) + return station.heating_progress if station else 0.0 + + @property + def heating_station_2_state(self) -> str: + with self._stations_lock: + station = self._heating_stations.get(2) + return station.state.value if station else "unknown" + + @property + def heating_station_2_material(self) -> str: + with self._stations_lock: + station = self._heating_stations.get(2) + return station.current_material or "" if station else "" + + @property + def heating_station_2_progress(self) -> float: + with self._stations_lock: + station = self._heating_stations.get(2) + return station.heating_progress if station else 0.0 + + @property + def heating_station_3_state(self) -> str: + with self._stations_lock: + station = self._heating_stations.get(3) + return station.state.value if station else "unknown" + + @property + def heating_station_3_material(self) -> str: + with self._stations_lock: + station = self._heating_stations.get(3) + return station.current_material or "" if station else "" + + @property + def heating_station_3_progress(self) -> float: + with self._stations_lock: + station = self._heating_stations.get(3) + return station.heating_progress if station else 0.0 + + @property + def active_tasks_count(self) -> int: + with self._tasks_lock: + return len(self._active_tasks) + + @property + def message(self) -> str: + return self.data.get("message", "") diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index d32dc0b..f8744af 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -9592,7 +9592,7 @@ liquid_handler.prcxi: well_names: null handles: input: - - data_key: plate + - data_key: '@this.0@@@plate' data_source: handle data_type: resource handler_key: input_plate @@ -9627,81 +9627,78 @@ liquid_handler.prcxi: type: string type: array plate: - items: - properties: - category: + properties: + category: + type: string + children: + items: type: string - children: - items: - type: string - type: array - config: - type: string - data: - type: string - id: - type: string - name: - type: string - parent: - type: string - pose: - properties: - orientation: - properties: - w: - type: number - x: - type: number - y: - type: number - z: - type: number - required: - - x - - y - - z - - w - title: orientation - type: object - position: - properties: - x: - type: number - y: - type: number - z: - type: number - required: - - x - - y - - z - title: position - type: object - required: - - position - - orientation - title: pose - type: object - sample_id: - type: string - type: - type: string - required: - - id - - name - - sample_id - - children - - parent - - type - - category - - pose - - config - - data - title: plate - type: object + type: array + config: + type: string + data: + type: string + id: + type: string + name: + type: string + parent: + type: string + pose: + properties: + orientation: + properties: + w: + type: number + x: + type: number + y: + type: number + z: + type: number + required: + - x + - y + - z + - w + title: orientation + type: object + position: + properties: + x: + type: number + y: + type: number + z: + type: number + required: + - x + - y + - z + title: position + type: object + required: + - position + - orientation + title: pose + type: object + sample_id: + type: string + type: + type: string + required: + - id + - name + - sample_id + - children + - parent + - type + - category + - pose + - config + - data title: plate - type: array + type: object volumes: items: type: number @@ -9717,17 +9714,207 @@ liquid_handler.prcxi: - volumes type: object result: + $defs: + ResourceDict: + properties: + class: + description: Resource class name + title: Class + type: string + config: + additionalProperties: true + description: Resource configuration + title: Config + type: object + data: + additionalProperties: true + description: 'Resource data, eg: container liquid data' + title: Data + type: object + description: + default: '' + description: Resource description + title: Description + type: string + extra: + additionalProperties: true + description: 'Extra data, eg: slot index' + title: Extra + type: object + icon: + default: '' + description: Resource icon + title: Icon + type: string + id: + description: Resource ID + title: Id + type: string + model: + additionalProperties: true + description: Resource model + title: Model + type: object + name: + description: Resource name + title: Name + type: string + parent: + anyOf: + - $ref: '#/$defs/ResourceDict' + - type: 'null' + default: null + description: Parent resource object + parent_uuid: + anyOf: + - type: string + - type: 'null' + default: null + description: Parent resource uuid + title: Parent Uuid + pose: + $ref: '#/$defs/ResourceDictPosition' + description: Resource position + schema: + additionalProperties: true + description: Resource schema + title: Schema + type: object + type: + anyOf: + - const: device + type: string + - type: string + description: Resource type + title: Type + uuid: + description: Resource UUID + title: Uuid + type: string + required: + - id + - uuid + - name + - type + - class + - config + - data + - extra + title: ResourceDict + type: object + ResourceDictPosition: + properties: + cross_section_type: + default: rectangle + description: Cross section type + enum: + - rectangle + - circle + - rounded_rectangle + title: Cross Section Type + type: string + layout: + default: x-y + description: Resource layout + enum: + - 2d + - x-y + - z-y + - x-z + title: Layout + type: string + position: + $ref: '#/$defs/ResourceDictPositionObject' + description: Resource position + position3d: + $ref: '#/$defs/ResourceDictPositionObject' + description: Resource position in 3D space + rotation: + $ref: '#/$defs/ResourceDictPositionObject' + description: Resource rotation + scale: + $ref: '#/$defs/ResourceDictPositionScale' + description: Resource scale + size: + $ref: '#/$defs/ResourceDictPositionSize' + description: Resource size + title: ResourceDictPosition + type: object + ResourceDictPositionObject: + properties: + x: + default: 0.0 + description: X coordinate + title: X + type: number + y: + default: 0.0 + description: Y coordinate + title: Y + type: number + z: + default: 0.0 + description: Z coordinate + title: Z + type: number + title: ResourceDictPositionObject + type: object + ResourceDictPositionScale: + properties: + x: + default: 0.0 + description: x scale + title: X + type: number + y: + default: 0.0 + description: y scale + title: Y + type: number + z: + default: 0.0 + description: z scale + title: Z + type: number + title: ResourceDictPositionScale + type: object + ResourceDictPositionSize: + properties: + depth: + default: 0.0 + description: Depth + title: Depth + type: number + height: + default: 0.0 + description: Height + title: Height + type: number + width: + default: 0.0 + description: Width + title: Width + type: number + title: ResourceDictPositionSize + type: object properties: plate: - items: {} + items: + items: + $ref: '#/$defs/ResourceDict' + type: array title: Plate type: array volumes: - items: {} + items: + type: number title: Volumes type: array wells: - items: {} + items: + items: + $ref: '#/$defs/ResourceDict' + type: array title: Wells type: array required: diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index c38655c..e44b745 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -5835,6 +5835,25 @@ virtual_workbench: - material_number type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: move_to_heating_station 返回类型 properties: material_id: @@ -5853,12 +5872,18 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - station_id - material_id - material_number - message + - unilabos_samples title: MoveToHeatingStationResult type: object required: @@ -5903,6 +5928,25 @@ virtual_workbench: - material_number type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: move_to_output 返回类型 properties: material_id: @@ -5914,10 +5958,16 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - station_id - material_id + - unilabos_samples title: MoveToOutputResult type: object required: @@ -5972,6 +6022,25 @@ virtual_workbench: required: [] type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: prepare_materials 返回类型 - 批量准备物料 properties: count: @@ -5998,6 +6067,11 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - count @@ -6007,6 +6081,7 @@ virtual_workbench: - material_4 - material_5 - message + - unilabos_samples title: PrepareMaterialsResult type: object required: @@ -6062,6 +6137,25 @@ virtual_workbench: - material_number type: object result: + $defs: + LabSample: + properties: + extra: + additionalProperties: true + title: Extra + type: object + oss_path: + title: Oss Path + type: string + sample_uuid: + title: Sample Uuid + type: string + required: + - sample_uuid + - oss_path + - extra + title: LabSample + type: object description: start_heating 返回类型 properties: material_id: @@ -6079,12 +6173,18 @@ virtual_workbench: success: title: Success type: boolean + unilabos_samples: + items: + $ref: '#/$defs/LabSample' + title: Unilabos Samples + type: array required: - success - station_id - material_id - material_number - message + - unilabos_samples title: StartHeatingResult type: object required: diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index 61b9a90..92e5c3a 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -5,6 +5,8 @@ from pydantic import BaseModel, field_serializer, field_validator, ValidationErr from pydantic import Field from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union +from typing_extensions import TypedDict + from unilabos.resources.plr_additional_res_reg import register from unilabos.utils.log import logger @@ -14,6 +16,26 @@ if TYPE_CHECKING: EXTRA_CLASS = "unilabos_resource_class" +EXTRA_SAMPLE_UUID = "sample_uuid" +EXTRA_UNILABOS_SAMPLE_UUID = "unilabos_sample_uuid" + +# 函数参数名常量 - 用于自动注入 sample_uuids 列表 +PARAM_SAMPLE_UUIDS = "sample_uuids" + +# JSON Command 中的系统参数字段名 +JSON_UNILABOS_PARAM = "unilabos_param" + +# 返回值中的 samples 字段名 +RETURN_UNILABOS_SAMPLES = "unilabos_samples" + +# sample_uuids 参数类型 (用于 virtual bench 等设备添加 sample_uuids 参数) +SampleUUIDsType = Dict[str, Optional["PLRResource"]] + + +class LabSample(TypedDict): + sample_uuid: str + oss_path: str + extra: Dict[str, Any] class ResourceDictPositionSize(BaseModel): @@ -529,6 +551,7 @@ class ResourceTreeSet(object): plr_resource = sub_cls.deserialize(plr_dict, allow_marshal=True) from pylabrobot.resources import Coordinate from pylabrobot.serializer import deserialize + location = cast(Coordinate, deserialize(plr_dict["location"])) plr_resource.location = location plr_resource.load_all_state(all_states) diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 6e6a098..89df756 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -1,12 +1,23 @@ -from ast import Try import inspect import io import json import threading import time import traceback -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING, Union, \ - Tuple +from typing import ( + get_type_hints, + TypeVar, + Generic, + Dict, + Any, + Type, + TypedDict, + Optional, + List, + TYPE_CHECKING, + Union, + Tuple, +) from concurrent.futures import ThreadPoolExecutor import asyncio @@ -49,8 +60,10 @@ from unilabos.resources.resource_tracker import ( ResourceTreeSet, ResourceTreeInstance, ResourceDictInstance, + EXTRA_SAMPLE_UUID, + PARAM_SAMPLE_UUIDS, + JSON_UNILABOS_PARAM, ) -from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from rclpy.task import Task, Future from unilabos.utils.import_manager import default_manager @@ -186,7 +199,7 @@ class PropertyPublisher: f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}" ) self.timer = node.create_timer(self.timer_period, self.publish_property) - self.__loop = get_event_loop() + self.__loop = ROS2DeviceNode.get_asyncio_loop() str_msg_type = str(msg_type)[8:-2] self.node.lab_logger().trace(f"发布属性: {name}, 类型: {str_msg_type}, 周期: {initial_period}秒, QoS: {qos}") @@ -218,14 +231,15 @@ class PropertyPublisher: def publish_property(self): try: - self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") + # self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") value = self.get_property() if self.print_publish: - self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") + pass + # self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") if value is not None: msg = convert_to_ros_msg(self.msg_type, value) self.publisher_.publish(msg) - self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") + # self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") except Exception as e: self.node.lab_logger().error( f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}" @@ -363,6 +377,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): from pylabrobot.resources.deck import Deck from pylabrobot.resources import Coordinate from pylabrobot.resources import Plate + # 物料传输到对应的node节点 client = self._resource_clients["c2s_update_resource_tree"] request = SerialCommand.Request() @@ -390,33 +405,27 @@ class BaseROS2DeviceNode(Node, Generic[T]): rts: ResourceTreeSet = ResourceTreeSet.from_raw_dict_list(input_resources) parent_resource = None if bind_parent_id != self.node_name: - parent_resource = self.resource_tracker.figure_resource( - {"name": bind_parent_id} - ) + parent_resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) for r in rts.root_nodes: # noinspection PyUnresolvedReferences r.res_content.parent_uuid = parent_resource.unilabos_uuid else: for r in rts.root_nodes: r.res_content.parent_uuid = self.uuid - - if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1 and len(rts.root_nodes) == 1 and isinstance(rts.root_nodes[0], RegularContainer): + rts_plr_instances = rts.to_plr_resources() + if len(rts.root_nodes) == 1 and isinstance(rts_plr_instances[0], RegularContainer): # noinspection PyTypeChecker - container_instance: RegularContainer = rts.root_nodes[0] - found_resources = self.resource_tracker.figure_resource( - {"id": container_instance.name}, try_mode=True - ) + container_instance: RegularContainer = rts_plr_instances[0] + found_resources = self.resource_tracker.figure_resource({"name": container_instance.name}, try_mode=True) if not len(found_resources): self.resource_tracker.add_resource(container_instance) logger.info(f"添加物料{container_instance.name}到资源跟踪器") else: - assert ( - len(found_resources) == 1 - ), f"找到多个同名物料: {container_instance.name}, 请检查物料系统" + assert len(found_resources) == 1, f"找到多个同名物料: {container_instance.name}, 请检查物料系统" found_resource = found_resources[0] if isinstance(found_resource, RegularContainer): logger.info(f"更新物料{container_instance.name}的数据{found_resource.state}") - found_resource.state.update(json.loads(container_instance.state)) + found_resource.state.update(container_instance.state) elif isinstance(found_resource, dict): raise ValueError("已不支持 字典 版本的RegularContainer") else: @@ -424,14 +433,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"更新物料{container_instance.name}出现不支持的数据类型{type(found_resource)} {found_resource}" ) # noinspection PyUnresolvedReferences - request.command = json.dumps({ - "action": "add", - "data": { - "data": rts.dump(), - "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else "", - "first_add": False, - }, - }) + request.command = json.dumps( + { + "action": "add", + "data": { + "data": rts.dump(), + "mount_uuid": parent_resource.unilabos_uuid if parent_resource is not None else self.uuid, + "first_add": False, + }, + } + ) tree_response: SerialCommand.Response = await client.call_async(request) uuid_maps = json.loads(tree_response.response) plr_instances = rts.to_plr_resources() @@ -473,7 +484,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): if len(ADD_LIQUID_TYPE) == 1 and len(LIQUID_VOLUME) == 1 and len(LIQUID_INPUT_SLOT) > 1: ADD_LIQUID_TYPE = ADD_LIQUID_TYPE * len(LIQUID_INPUT_SLOT) LIQUID_VOLUME = LIQUID_VOLUME * len(LIQUID_INPUT_SLOT) - self.lab_logger().warning(f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个") + self.lab_logger().warning( + f"增加液体资源时,数量为1,自动补全为 {len(LIQUID_INPUT_SLOT)} 个" + ) for liquid_type, liquid_volume, liquid_input_slot in zip( ADD_LIQUID_TYPE, LIQUID_VOLUME, LIQUID_INPUT_SLOT ): @@ -492,9 +505,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): input_wells = [] for r in LIQUID_INPUT_SLOT: input_wells.append(plr_instance.children[r]) - final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources(input_wells).dump() + final_response["liquid_input_resource_tree"] = ResourceTreeSet.from_plr_resources( + input_wells + ).dump() res.response = json.dumps(final_response) - if issubclass(parent_resource.__class__, Deck) and hasattr(parent_resource, "assign_child_at_slot") and "slot" in other_calling_param: + if ( + issubclass(parent_resource.__class__, Deck) + and hasattr(parent_resource, "assign_child_at_slot") + and "slot" in other_calling_param + ): other_calling_param["slot"] = int(other_calling_param["slot"]) parent_resource.assign_child_at_slot(plr_instance, **other_calling_param) else: @@ -509,14 +528,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): rts_with_parent = ResourceTreeSet.from_plr_resources([parent_resource]) if rts_with_parent.root_nodes[0].res_content.uuid_parent is None: rts_with_parent.root_nodes[0].res_content.parent_uuid = self.uuid - request.command = json.dumps({ - "action": "add", - "data": { - "data": rts_with_parent.dump(), - "mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent, - "first_add": False, - }, - }) + request.command = json.dumps( + { + "action": "add", + "data": { + "data": rts_with_parent.dump(), + "mount_uuid": rts_with_parent.root_nodes[0].res_content.uuid_parent, + "first_add": False, + }, + } + ) tree_response: SerialCommand.Response = await client.call_async(request) uuid_maps = json.loads(tree_response.response) self.resource_tracker.loop_update_uuid(input_resources, uuid_maps) @@ -626,7 +647,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) # type: ignore raw_nodes = json.loads(response.response) tree_set = ResourceTreeSet.from_raw_dict_list(raw_nodes) - self.lab_logger().debug(f"获取资源结果: {len(tree_set.trees)} 个资源树") + self.lab_logger().trace(f"获取资源结果: {len(tree_set.trees)} 个资源树 {tree_set.root_nodes}") return tree_set async def get_resource_with_dir(self, resource_id: str, with_children: bool = True) -> "ResourcePLR": @@ -813,7 +834,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): } def _handle_update( - plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any] + plr_resources: List[Union[ResourcePLR, ResourceDictInstance]], + tree_set: ResourceTreeSet, + additional_add_params: Dict[str, Any], ) -> Tuple[Dict[str, Any], List[ResourcePLR]]: """ 处理资源更新操作的内部函数 @@ -838,7 +861,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): original_parent_resource = original_instance.parent original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None) target_parent_resource_uuid = tree.root_node.res_content.uuid_parent - not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None + not_same_parent = ( + original_parent_resource_uuid != target_parent_resource_uuid + and original_parent_resource is not None + ) old_name = original_instance.name new_name = plr_resource.name parent_appended = False @@ -874,8 +900,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): else: # 判断是否变更了resource_site,重新登记 target_site = original_instance.unilabos_extra.get("update_resource_site") - sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None - site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else [] + sites = ( + original_instance.parent.sites + if original_instance.parent is not None and hasattr(original_instance.parent, "sites") + else None + ) + site_names = ( + list(original_instance.parent._ordering.keys()) + if original_instance.parent is not None and hasattr(original_instance.parent, "sites") + else [] + ) if target_site is not None and sites is not None and site_names is not None: site_index = sites.index(original_instance) site_name = site_names[site_index] @@ -912,9 +946,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): action = i.get("action") # remove, add, update resources_uuid: List[str] = i.get("data") # 资源数据 additional_add_params = i.get("additional_add_params", {}) # 额外参数 - self.lab_logger().trace( - f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}" - ) + self.lab_logger().trace(f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}") tree_set = None if action in ["add", "update"]: tree_set = await self.get_resource( @@ -941,9 +973,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( - {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 + {"data": {"data": new_tree_set.dump()}, "action": "update"} + ) # 和Update Resource一致 response: SerialCommand_Response = await self._resource_clients[ - "c2s_update_resource_tree"].call_async(r) # type: ignore + "c2s_update_resource_tree" + ].call_async( + r + ) # type: ignore self.lab_logger().info(f"确认资源云端 Add 结果: {response.response}") results.append(result) elif action == "update": @@ -963,9 +999,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): tree.root_node.res_content.parent_uuid = self.uuid r = SerialCommand.Request() r.command = json.dumps( - {"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致 + {"data": {"data": new_tree_set.dump()}, "action": "update"} + ) # 和Update Resource一致 response: SerialCommand_Response = await self._resource_clients[ - "c2s_update_resource_tree"].call_async(r) # type: ignore + "c2s_update_resource_tree" + ].call_async( + r + ) # type: ignore self.lab_logger().info(f"确认资源云端 Update 结果: {response.response}") results.append(result) elif action == "remove": @@ -1335,7 +1375,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): resource_id=resource_data["id"], with_children=True ) if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] + plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"] queried_resources[idx] = plr_resource else: uuid_indices.append((idx, unilabos_uuid, resource_data)) @@ -1348,31 +1388,17 @@ class BaseROS2DeviceNode(Node, Generic[T]): for i, (idx, _, resource_data) in enumerate(uuid_indices): plr_resource = plr_resources[i] if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] + plr_resource.unilabos_extra[EXTRA_SAMPLE_UUID] = resource_data["sample_id"] queried_resources[idx] = plr_resource - # 第二遍:批量查询有uuid的资源 - if uuid_indices: - uuids = [item[1] for item in uuid_indices] - resource_tree = await self.get_resource(uuids) - plr_resources = resource_tree.to_plr_resources() - # 通过uuid查找对应的plr_resource - tracker = self.resource_tracker - for idx, uuid, resource_data in uuid_indices: - try: - plr_resource = tracker.loop_find_with_uuid(plr_resources, uuid) - if "sample_id" in resource_data: - plr_resource.unilabos_extra["sample_uuid"] = resource_data["sample_id"] - queried_resources[idx] = plr_resource - except Exception as e: - self.lab_logger().error(f"资源查询失败: {e}\n{traceback.format_exc()}") - continue self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源") # 通过资源跟踪器获取本地实例 final_resources = queried_resources if is_sequence else queried_resources[0] if not is_sequence: - plr = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) + plr = self.resource_tracker.figure_resource( + {"name": final_resources.name}, try_mode=False + ) # 保留unilabos_extra if hasattr(final_resources, "unilabos_extra") and hasattr(plr, "unilabos_extra"): plr.unilabos_extra = getattr(final_resources, "unilabos_extra", {}).copy() @@ -1411,8 +1437,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = True except Exception as _: execution_error = traceback.format_exc() - error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}") - trace(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + error( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}" + ) + trace( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) future.add_done_callback(_handle_future_exception) @@ -1432,9 +1462,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): except Exception as _: execution_error = traceback.format_exc() error( - f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}") + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{str(action_kwargs)[:1000]}" + ) trace( - f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future.add_done_callback(_handle_future_exception) @@ -1501,13 +1533,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): if isinstance(rs, list): for r in rs: res = self.resource_tracker.parent_resource(r) # 获取 resource 对象 - elif type(rs).__name__ == "ResourceHolder": - pass + if res is None: + res = rs + if id(res) not in seen: + seen.add(id(res)) + unique_resources.append(res) else: res = self.resource_tracker.parent_resource(rs) - if id(res) not in seen: - seen.add(id(res)) - unique_resources.append(res) + if res is None: + res = rs + if id(res) not in seen: + seen.add(id(res)) + unique_resources.append(res) # 使用新的资源树接口 if unique_resources: @@ -1559,20 +1596,39 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: function_name = target["function_name"] function_args = target["function_args"] + # 获取 unilabos 系统参数 + unilabos_param: Dict[str, Any] = target[JSON_UNILABOS_PARAM] + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) assert callable( function ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" - # 处理 ResourceSlot 类型参数 - args_list = default_manager._analyze_method_signature(function)["args"] + # 处理参数(包含 unilabos 系统参数如 sample_uuids) + args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"] for arg in args_list: arg_name = arg["name"] arg_type = arg["type"] # 跳过不在 function_args 中的参数 if arg_name not in function_args: + # 处理 sample_uuids 参数注入 + if arg_name == PARAM_SAMPLE_UUIDS: + raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {}) + # 将 material uuid 转换为 resource 实例 + # key: sample_uuid, value: material_uuid -> resource 实例 + resolved_sample_uuids: Dict[str, Any] = {} + for sample_uuid, material_uuid in raw_sample_uuids.items(): + if material_uuid and self.resource_tracker: + resource = self.resource_tracker.uuid_to_resources.get(material_uuid) + resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid + else: + resolved_sample_uuids[sample_uuid] = material_uuid + function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids + self.lab_logger().debug( + f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}" + ) continue # 处理单个 ResourceSlot @@ -1601,6 +1657,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" ) raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}") + # todo: 默认反报送 return function(**function_args) except KeyError as ex: @@ -1621,14 +1678,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): raise ValueError("至少需要提供一个 UUID") uuids_list = list(uuids) - future = self._resource_clients["c2s_update_resource_tree"].call_async(SerialCommand.Request( - command=json.dumps( - { - "data": {"data": uuids_list, "with_children": True}, - "action": "get", - } + future = self._resource_clients["c2s_update_resource_tree"].call_async( + SerialCommand.Request( + command=json.dumps( + { + "data": {"data": uuids_list, "with_children": True}, + "action": "get", + } + ) ) - )) + ) # 等待结果(使用while循环,每次sleep 0.05秒,最多等待30秒) timeout = 30.0 @@ -1686,6 +1745,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: function_name = target["function_name"] function_args = target["function_args"] + # 获取 unilabos 系统参数 + unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {}) + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) assert callable( @@ -1695,14 +1757,30 @@ class BaseROS2DeviceNode(Node, Generic[T]): function ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" - # 处理 ResourceSlot 类型参数 - args_list = default_manager._analyze_method_signature(function)["args"] + # 处理参数(包含 unilabos 系统参数如 sample_uuids) + args_list = default_manager._analyze_method_signature(function, skip_unilabos_params=False)["args"] for arg in args_list: arg_name = arg["name"] arg_type = arg["type"] # 跳过不在 function_args 中的参数 if arg_name not in function_args: + # 处理 sample_uuids 参数注入 + if arg_name == PARAM_SAMPLE_UUIDS: + raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {}) + # 将 material uuid 转换为 resource 实例 + # key: sample_uuid, value: material_uuid -> resource 实例 + resolved_sample_uuids: Dict[str, Any] = {} + for sample_uuid, material_uuid in raw_sample_uuids.items(): + if material_uuid and self.resource_tracker: + resource = self.resource_tracker.uuid_to_resources.get(material_uuid) + resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid + else: + resolved_sample_uuids[sample_uuid] = material_uuid + function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids + self.lab_logger().debug( + f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}" + ) continue # 处理单个 ResourceSlot @@ -1792,6 +1870,15 @@ class ROS2DeviceNode: 它不继承设备类,而是通过代理模式访问设备类的属性和方法。 """ + # 类变量,用于循环管理 + _asyncio_loop = None + _asyncio_loop_running = False + _asyncio_loop_thread = None + + @classmethod + def get_asyncio_loop(cls): + return cls._asyncio_loop + @staticmethod async def safe_task_wrapper(trace_callback, func, **kwargs): try: @@ -1868,6 +1955,11 @@ class ROS2DeviceNode: print_publish: 是否打印发布信息 driver_is_ros: """ + # 在初始化时检查循环状态 + if ROS2DeviceNode._asyncio_loop_running and ROS2DeviceNode._asyncio_loop_thread is not None: + pass + elif ROS2DeviceNode._asyncio_loop_thread is None: + self._start_loop() # 保存设备类是否支持异步上下文 self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__") @@ -1959,6 +2051,19 @@ class ROS2DeviceNode: except Exception as e: self._ros_node.lab_logger().error(f"设备后初始化失败: {e}") + def _start_loop(self): + def run_event_loop(): + loop = asyncio.new_event_loop() + ROS2DeviceNode._asyncio_loop = loop + asyncio.set_event_loop(loop) + loop.run_forever() + + ROS2DeviceNode._asyncio_loop_thread = threading.Thread( + target=run_event_loop, daemon=True, name="ROS2DeviceNode" + ) + ROS2DeviceNode._asyncio_loop_thread.start() + logger.info(f"循环线程已启动") + class DeviceInfoType(TypedDict): id: str diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 65b03ac..dde756a 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,16 +1,17 @@ import collections -from dataclasses import dataclass, field import json import threading import time import traceback import uuid -from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, TypedDict, Union +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.service import Service +from typing_extensions import TypedDict from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.srv import ( ResourceAdd, @@ -22,10 +23,20 @@ from unilabos_msgs.srv import ( from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unique_identifier_msgs.msg import UUID +from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.registry.registry import lab_registry from unilabos.resources.container import RegularContainer from unilabos.resources.graphio import initialize_resource from unilabos.resources.registry import add_schema +from unilabos.resources.resource_tracker import ( + ResourceDict, + ResourceDictInstance, + ResourceTreeSet, + ResourceTreeInstance, + RETURN_UNILABOS_SAMPLES, + JSON_UNILABOS_PARAM, + PARAM_SAMPLE_UUIDS, +) from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( get_msg_type, @@ -36,17 +47,10 @@ from unilabos.ros.msgs.message_converter import ( ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode -from unilabos.resources.resource_tracker import ( - ResourceDict, - ResourceDictInstance, - ResourceTreeSet, - ResourceTreeInstance, -) from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info -from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot if TYPE_CHECKING: from unilabos.app.ws_client import QueueItem @@ -62,6 +66,18 @@ class TestResourceReturn(TypedDict): devices: List[DeviceSlot] +class TestLatencyReturn(TypedDict): + """test_latency方法的返回值类型""" + + avg_rtt_ms: float + avg_time_diff_ms: float + max_time_error_ms: float + task_delay_ms: float + raw_delay_ms: float + test_count: int + status: str + + class HostNode(BaseROS2DeviceNode): """ 主机节点类,负责管理设备、资源和控制器 @@ -735,13 +751,14 @@ class HostNode(BaseROS2DeviceNode): if bCreate: self.lab_logger().trace(f"Status created: {device_id}.{property_name} = {msg.data}") else: - self.lab_logger().debug(f"Status updated: {device_id}.{property_name} = {msg.data}") + self.lab_logger().trace(f"Status updated: {device_id}.{property_name} = {msg.data}") def send_goal( self, item: "QueueItem", action_type: str, action_kwargs: Dict[str, Any], + sample_material: Dict[str, str], server_info: Optional[Dict[str, Any]] = None, ) -> None: """ @@ -759,14 +776,14 @@ class HostNode(BaseROS2DeviceNode): if action_name.startswith("auto-"): action_name = action_name[5:] action_id = f"/devices/{device_id}/_execute_driver_command" - action_kwargs = { - "string": json.dumps( - { - "function_name": action_name, - "function_args": action_kwargs, - } - ) + json_command: Dict[str, Any] = { + "function_name": action_name, + "function_args": action_kwargs, + JSON_UNILABOS_PARAM: { + PARAM_SAMPLE_UUIDS: sample_material, + }, } + action_kwargs = {"string": json.dumps(json_command)} if action_type.startswith("UniLabJsonCommandAsync"): action_id = f"/devices/{device_id}/_execute_driver_command_async" else: @@ -777,21 +794,6 @@ class HostNode(BaseROS2DeviceNode): raise ValueError(f"ActionClient {action_id} not found.") action_client: ActionClient = self._action_clients[action_id] - - # 遍历action_kwargs下的所有子dict,将"sample_uuid"的值赋给"sample_id" - def assign_sample_id(obj): - if isinstance(obj, dict): - if "sample_uuid" in obj: - obj["sample_id"] = obj["sample_uuid"] - obj.pop("sample_uuid") - for k, v in obj.items(): - if k != "unilabos_extra": - assign_sample_id(v) - elif isinstance(obj, list): - for item in obj: - assign_sample_id(item) - - assign_sample_id(action_kwargs) goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) # self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}") @@ -854,9 +856,14 @@ class HostNode(BaseROS2DeviceNode): # 适配后端的一些额外处理 return_value = return_info.get("return_value") if isinstance(return_value, dict): - unilabos_samples = return_info.get("unilabos_samples") - if isinstance(unilabos_samples, list): - return_info["unilabos_samples"] = unilabos_samples + unilabos_samples = return_value.pop(RETURN_UNILABOS_SAMPLES, None) + if isinstance(unilabos_samples, list) and unilabos_samples: + self.lab_logger().info( + f"[Host Node] Job {job_id[:8]} returned {len(unilabos_samples)} sample(s): " + f"{[s.get('name', s.get('id', 'unknown')) if isinstance(s, dict) else str(s)[:20] for s in unilabos_samples[:5]]}" + f"{'...' if len(unilabos_samples) > 5 else ''}" + ) + return_info["samples"] = unilabos_samples suc = return_info.get("suc", False) if not suc: status = "failed" @@ -882,7 +889,7 @@ class HostNode(BaseROS2DeviceNode): # 清理 _goals 中的记录 if job_id in self._goals: del self._goals[job_id] - self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals") + self.lab_logger().trace(f"[Host Node] Removed goal {job_id[:8]} from _goals") # 存储结果供 HTTP API 查询 try: @@ -1327,10 +1334,20 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}") return response - def test_latency(self): + def test_latency(self) -> TestLatencyReturn: """ 测试网络延迟的action实现 通过5次ping-pong机制校对时间误差并计算实际延迟 + + Returns: + TestLatencyReturn: 包含延迟测试结果的字典,包括: + - avg_rtt_ms: 平均往返时间(毫秒) + - avg_time_diff_ms: 平均时间差(毫秒) + - max_time_error_ms: 最大时间误差(毫秒) + - task_delay_ms: 实际任务延迟(毫秒),-1表示无法计算 + - raw_delay_ms: 原始时间差(毫秒),-1表示无法计算 + - test_count: 有效测试次数 + - status: 测试状态,"success"表示成功,"all_timeout"表示全部超时 """ import uuid as uuid_module @@ -1393,7 +1410,15 @@ class HostNode(BaseROS2DeviceNode): if not ping_results: self.lab_logger().error("❌ 所有ping-pong测试都失败了") - return {"status": "all_timeout"} + return { + "avg_rtt_ms": -1.0, + "avg_time_diff_ms": -1.0, + "max_time_error_ms": -1.0, + "task_delay_ms": -1.0, + "raw_delay_ms": -1.0, + "test_count": 0, + "status": "all_timeout", + } # 统计分析 rtts = [r["rtt_ms"] for r in ping_results] @@ -1401,7 +1426,7 @@ class HostNode(BaseROS2DeviceNode): avg_rtt_ms = sum(rtts) / len(rtts) avg_time_diff_ms = sum(time_diffs) / len(time_diffs) - max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs))) + max_time_diff_error_ms: float = max(abs(min(time_diffs)), abs(max(time_diffs))) self.lab_logger().info("-" * 50) self.lab_logger().info("[测试统计]") @@ -1441,7 +1466,7 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info("=" * 60) - return { + res: TestLatencyReturn = { "avg_rtt_ms": avg_rtt_ms, "avg_time_diff_ms": avg_time_diff_ms, "max_time_error_ms": max_time_diff_error_ms, @@ -1452,9 +1477,14 @@ class HostNode(BaseROS2DeviceNode): "test_count": len(ping_results), "status": "success", } + return res def test_resource( - self, resource: ResourceSlot = None, resources: List[ResourceSlot] = None, device: DeviceSlot = None, devices: List[DeviceSlot] = None + self, + resource: ResourceSlot = None, + resources: List[ResourceSlot] = None, + device: DeviceSlot = None, + devices: List[DeviceSlot] = None, ) -> TestResourceReturn: if resources is None: resources = [] @@ -1515,7 +1545,9 @@ class HostNode(BaseROS2DeviceNode): # 构建服务地址 srv_address = f"/srv{namespace}/s2c_resource_tree" - self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------") + self.lab_logger().trace( + f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------" + ) # 创建服务客户端 sclient = self.create_client(SerialCommand, srv_address) @@ -1550,7 +1582,9 @@ class HostNode(BaseROS2DeviceNode): time.sleep(0.05) response = future.result() - self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------") + self.lab_logger().trace( + f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------" + ) return True except Exception as e: diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index f30e33b..0d62579 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -340,6 +340,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): plr = self.resource_tracker.figure_resource({"name": res_name}, try_mode=False) # 获取父资源 res = self.resource_tracker.parent_resource(plr) + if res is None: + res = plr if id(res) not in seen: seen.add(id(res)) unique_resources.append(res) diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 35aca5e..47e7533 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -52,7 +52,8 @@ class DeviceClassCreator(Generic[T]): if self.device_instance is not None: for c in self.children: if c.res_content.type != "device": - self.resource_tracker.add_resource(c.get_plr_nested_dict()) + res = ResourceTreeSet([ResourceTreeInstance(c)]).to_plr_resources()[0] + self.resource_tracker.add_resource(res) def create_instance(self, data: Dict[str, Any]) -> T: """ @@ -119,7 +120,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): # return resource, source_type def _process_resource_references( - self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None + self, data: Any, processed_child_names: Optional[Dict[str, Any]], to_dict=False, states=None, prefix_path="", name_to_uuid=None ) -> Any: """ 递归处理资源引用,替换_resource_child_name对应的资源 @@ -164,6 +165,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): states[prefix_path] = resource_instance.serialize_all_state() return serialized else: + processed_child_names[child_name] = resource_instance self.resource_tracker.add_resource(resource_instance) # 立即设置UUID,state已经在resource_ulab_to_plr中处理过了 if name_to_uuid: @@ -182,12 +184,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): result = {} for key, value in data.items(): new_prefix = f"{prefix_path}.{key}" if prefix_path else key - result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid) + result[key] = self._process_resource_references(value, processed_child_names, to_dict, states, new_prefix, name_to_uuid) return result elif isinstance(data, list): return [ - self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) + self._process_resource_references(item, processed_child_names, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) for i, item in enumerate(data) ] @@ -234,7 +236,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): # 首先处理资源引用 states = {} processed_data = self._process_resource_references( - data, to_dict=True, states=states, name_to_uuid=name_to_uuid + data, {}, to_dict=True, states=states, name_to_uuid=name_to_uuid ) try: @@ -270,7 +272,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): arg_value = spec_args[param_name].annotation data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") - processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid) + processed_child_names = {} + processed_data = self._process_resource_references(data, processed_child_names, to_dict=False, name_to_uuid=name_to_uuid) + for child_name, resource_instance in processed_data.items(): + for ind, name in enumerate([child.res_content.name for child in self.children]): + if name == child_name: + self.children.pop(ind) self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) # 补全变量后直接调用,调用的自身的attach_resource except Exception as e: logger.error(f"PyLabRobot创建实例失败: {e}") @@ -342,9 +349,10 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children - for child in self.children: - if child.res_content.type != "device": - self.resource_tracker.add_resource(child.get_plr_nested_dict()) + # super(WorkstationNodeCreator, self).create_instance(data)的时候会attach + # for child in self.children: + # if child.res_content.type != "device": + # self.resource_tracker.add_resource(child.get_plr_nested_dict()) deck_dict = data.get("deck") if deck_dict: from pylabrobot.resources import Deck, Resource diff --git a/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json b/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json index 9af64af..74ca47e 100644 --- a/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json +++ b/unilabos/test/experiments/comprehensive_protocol/comprehensive_station.json @@ -339,13 +339,8 @@ "z": 0 }, "config": { - "max_volume": 500.0, "type": "RegularContainer", - "category": "container", - "max_temp": 200.0, - "min_temp": -20.0, - "has_stirrer": true, - "has_heater": true + "category": "container" }, "data": { "liquids": [], @@ -769,9 +764,7 @@ "size_y": 250, "size_z": 0, "type": "RegularContainer", - "category": "container", - "reagent": "sodium_chloride", - "physical_state": "solid" + "category": "container" }, "data": { "current_mass": 500.0, @@ -792,14 +785,11 @@ "z": 0 }, "config": { - "volume": 500.0, "size_x": 600, "size_y": 250, "size_z": 0, "type": "RegularContainer", - "category": "container", - "reagent": "sodium_carbonate", - "physical_state": "solid" + "category": "container" }, "data": { "current_mass": 500.0, @@ -820,14 +810,11 @@ "z": 0 }, "config": { - "volume": 500.0, "size_x": 650, "size_y": 250, "size_z": 0, "type": "RegularContainer", - "category": "container", - "reagent": "magnesium_chloride", - "physical_state": "solid" + "category": "container" }, "data": { "current_mass": 500.0, diff --git a/unilabos/utils/import_manager.py b/unilabos/utils/import_manager.py index 00fcd06..5292d7c 100644 --- a/unilabos/utils/import_manager.py +++ b/unilabos/utils/import_manager.py @@ -27,6 +27,7 @@ __all__ = [ from ast import Constant +from unilabos.resources.resource_tracker import PARAM_SAMPLE_UUIDS from unilabos.utils import logger @@ -334,13 +335,18 @@ class ImportManager: result["action_methods"][method_name] = method_info return result - def _analyze_method_signature(self, method) -> Dict[str, Any]: + def _analyze_method_signature(self, method, skip_unilabos_params: bool = True) -> Dict[str, Any]: """ 分析方法签名,提取具体的命名参数信息 注意:此方法会跳过*args和**kwargs,只提取具体的命名参数 这样可以确保通过**dict方式传参时的准确性 + Args: + method: 要分析的方法 + skip_unilabos_params: 是否跳过 unilabos 系统参数(如 sample_uuids), + registry 补全时为 True,JsonCommand 执行时为 False + 示例用法: method_info = self._analyze_method_signature(some_method) params = {"param1": "value1", "param2": "value2"} @@ -361,6 +367,10 @@ class ImportManager: if param.kind == param.VAR_KEYWORD: # **kwargs continue + # 跳过 sample_uuids 参数(由系统自动注入,registry 补全时跳过) + if skip_unilabos_params and param_name == PARAM_SAMPLE_UUIDS: + continue + is_required = param.default == inspect.Parameter.empty if is_required: num_required += 1 @@ -549,6 +559,9 @@ class ImportManager: for i, arg in enumerate(node.args.args): if arg.arg == "self": continue + # 跳过 sample_uuids 参数(由系统自动注入) + if arg.arg == PARAM_SAMPLE_UUIDS: + continue arg_info = { "name": arg.arg, "type": None, diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index f4c0ac8..381cc66 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -60,7 +60,11 @@ ==================== 连接关系图 ==================== 控制流 (ready 端口串联): - create_resource_1 -> create_resource_2 -> ... -> set_liquid_1 -> set_liquid_2 -> ... -> transfer_liquid_1 -> transfer_liquid_2 -> ... + - create_resource 之间: 无 ready 连接 + - set_liquid_from_plate 之间: 无 ready 连接 + - create_resource 与 set_liquid_from_plate 之间: 无 ready 连接 + - transfer_liquid 之间: 通过 ready 端口串联 + transfer_liquid_1 -> transfer_liquid_2 -> transfer_liquid_3 -> ... 物料流: [create_resource] --labware--> [set_liquid_from_plate] --output_wells--> [transfer_liquid] --sources_out/targets_out--> [下一个 transfer_liquid] @@ -402,7 +406,6 @@ def build_protocol_graph( # 为每个唯一的 slot 创建 create_resource 节点 res_index = 0 - last_create_resource_id = None for slot, info in slots_info.items(): node_id = str(uuid.uuid4()) res_id = info["res_id"] @@ -431,10 +434,7 @@ def build_protocol_graph( ) slot_to_create_resource[slot] = node_id - # create_resource 之间通过 ready 串联 - if last_create_resource_id is not None: - G.add_edge(last_create_resource_id, node_id, source_port="ready", target_port="ready") - last_create_resource_id = node_id + # create_resource 之间不需要 ready 连接 # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== # 创建 Group 节点,包含所有 set_liquid_from_plate 节点 @@ -453,7 +453,6 @@ def build_protocol_graph( ) set_liquid_index = 0 - last_set_liquid_id = last_create_resource_id # set_liquid_from_plate 连接在 create_resource 之后 for labware_id, item in labware_info.items(): # 跳过 Tip/Rack 类型 @@ -494,10 +493,7 @@ def build_protocol_graph( }, ) - # ready 连接:上一个节点 -> set_liquid_from_plate - if last_set_liquid_id is not None: - G.add_edge(last_set_liquid_id, node_id, source_port="ready", target_port="ready") - last_set_liquid_id = node_id + # set_liquid_from_plate 之间不需要 ready 连接 # 物料流:create_resource 的 labware -> set_liquid_from_plate 的 input_plate create_res_node_id = slot_to_create_resource.get(slot) @@ -507,7 +503,8 @@ def build_protocol_graph( # set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid resource_last_writer[labware_id] = f"{node_id}:output_wells" - last_control_node_id = last_set_liquid_id + # transfer_liquid 之间通过 ready 串联,从 None 开始 + last_control_node_id = None # 端口名称映射:JSON 字段名 -> 实际 handle key INPUT_PORT_MAPPING = {