From 301bea639e8c1561b9d52452f376ed6a5c66b1cb Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Fri, 19 Sep 2025 22:54:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dprotocolnode=E7=9A=84?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/compile/add_protocol.py | 4 +- unilabos/compile/adjustph_protocol.py | 2 +- unilabos/compile/centrifuge_protocol.py | 2 +- unilabos/compile/clean_vessel_protocol.py | 4 +- unilabos/compile/dissolve_protocol.py | 12 +- unilabos/compile/dry_protocol.py | 6 +- .../compile/evacuateandrefill_protocol.py | 4 +- unilabos/compile/evaporate_protocol.py | 2 +- unilabos/compile/heatchill_protocol.py | 5 +- unilabos/compile/separate_protocol.py | 2 +- unilabos/compile/stir_protocol.py | 6 +- unilabos/compile/wash_solid_protocol.py | 4 +- .../virtual/virtual_solid_dispenser.py | 2 +- unilabos/ros/nodes/base_device_node.py | 4 +- unilabos/ros/nodes/presets/protocol_node.py | 406 ------------------ unilabos/ros/nodes/presets/workstation.py | 10 +- 16 files changed, 35 insertions(+), 440 deletions(-) delete mode 100644 unilabos/ros/nodes/presets/protocol_node.py diff --git a/unilabos/compile/add_protocol.py b/unilabos/compile/add_protocol.py index fbb264a2..e387ebc7 100644 --- a/unilabos/compile/add_protocol.py +++ b/unilabos/compile/add_protocol.py @@ -155,7 +155,7 @@ def generate_add_protocol( "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "stir_speed": stir_speed, "purpose": f"准备添加固体 {reagent}" } @@ -232,7 +232,7 @@ def generate_add_protocol( "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "stir_speed": stir_speed, "purpose": f"准备添加液体 {reagent}" } diff --git a/unilabos/compile/adjustph_protocol.py b/unilabos/compile/adjustph_protocol.py index 1207cda5..166d6471 100644 --- a/unilabos/compile/adjustph_protocol.py +++ b/unilabos/compile/adjustph_protocol.py @@ -325,7 +325,7 @@ def generate_adjust_ph_protocol( "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - "vessel": vessel_id, + "vessel": {"id": vessel_id}, "stir_speed": stir_speed, "purpose": f"pH调节: 启动搅拌,准备添加 {reagent}" } diff --git a/unilabos/compile/centrifuge_protocol.py b/unilabos/compile/centrifuge_protocol.py index 3f541071..52a2b7eb 100644 --- a/unilabos/compile/centrifuge_protocol.py +++ b/unilabos/compile/centrifuge_protocol.py @@ -156,7 +156,7 @@ def generate_centrifuge_protocol( "device_id": centrifuge_id, "action_name": "centrifuge", "action_kwargs": { - "vessel": centrifuge_vessel, + "vessel": {"id": centrifuge_vessel}, "speed": speed, "time": time, "temp": temp diff --git a/unilabos/compile/clean_vessel_protocol.py b/unilabos/compile/clean_vessel_protocol.py index b9f903c8..63e9022f 100644 --- a/unilabos/compile/clean_vessel_protocol.py +++ b/unilabos/compile/clean_vessel_protocol.py @@ -143,7 +143,7 @@ def generate_clean_vessel_protocol( "device_id": heatchill_id, "action_name": "heat_chill_start", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "temp": temp, "purpose": f"cleaning with {solvent}" } @@ -295,7 +295,7 @@ def generate_clean_vessel_protocol( "device_id": heatchill_id, "action_name": "heat_chill_stop", "action_kwargs": { - "vessel": vessel_id # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id } } action_sequence.append(heatchill_stop_action) diff --git a/unilabos/compile/dissolve_protocol.py b/unilabos/compile/dissolve_protocol.py index d57b7eb5..98dec9a4 100644 --- a/unilabos/compile/dissolve_protocol.py +++ b/unilabos/compile/dissolve_protocol.py @@ -563,7 +563,7 @@ def generate_dissolve_protocol( "device_id": heatchill_id, "action_name": "heat_chill_start", "action_kwargs": { - "vessel": vessel_id, + "vessel": {"id": vessel_id}, "temp": final_temp, "purpose": f"溶解准备 - {event}" if event else "溶解准备" } @@ -587,7 +587,7 @@ def generate_dissolve_protocol( "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - "vessel": vessel_id, + "vessel": {"id": vessel_id}, "stir_speed": stir_speed, "purpose": f"溶解搅拌 - {event}" if event else "溶解搅拌" } @@ -612,7 +612,7 @@ def generate_dissolve_protocol( # 固体加样 add_kwargs = { - "vessel": vessel_id, + "vessel": {"id": vessel_id}, "reagent": reagent or amount or "solid reagent", "purpose": f"溶解固体试剂 - {event}" if event else "溶解固体试剂", "event": event @@ -758,7 +758,7 @@ def generate_dissolve_protocol( "device_id": heatchill_id, "action_name": "heat_chill", "action_kwargs": { - "vessel": vessel_id, + "vessel": {"id": vessel_id}, "temp": final_temp, "time": final_time, "stir": True, @@ -776,7 +776,7 @@ def generate_dissolve_protocol( "device_id": stirrer_id, "action_name": "stir", "action_kwargs": { - "vessel": vessel_id, + "vessel": {"id": vessel_id}, "stir_time": final_time, "stir_speed": stir_speed, "settling_time": 0, @@ -802,7 +802,7 @@ def generate_dissolve_protocol( "device_id": heatchill_id, "action_name": "heat_chill_stop", "action_kwargs": { - "vessel": vessel_id + "vessel": {"id": vessel_id}, } } action_sequence.append(stop_action) diff --git a/unilabos/compile/dry_protocol.py b/unilabos/compile/dry_protocol.py index 469e929a..1e736072 100644 --- a/unilabos/compile/dry_protocol.py +++ b/unilabos/compile/dry_protocol.py @@ -167,7 +167,7 @@ def generate_dry_protocol( "device_id": heater_id, "action_name": "heat_chill_start", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "temp": dry_temp, "purpose": f"干燥 {compound or '化合物'}" } @@ -191,7 +191,7 @@ def generate_dry_protocol( "device_id": heater_id, "action_name": "heat_chill", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "temp": dry_temp, "time": simulation_time, "purpose": f"干燥 {compound or '化合物'},保持温度 {dry_temp}°C" @@ -251,7 +251,7 @@ def generate_dry_protocol( "device_id": heater_id, "action_name": "heat_chill_stop", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "purpose": f"干燥完成,停止加热" } }) diff --git a/unilabos/compile/evacuateandrefill_protocol.py b/unilabos/compile/evacuateandrefill_protocol.py index bef4b923..45693c2a 100644 --- a/unilabos/compile/evacuateandrefill_protocol.py +++ b/unilabos/compile/evacuateandrefill_protocol.py @@ -452,7 +452,7 @@ def generate_evacuateandrefill_protocol( "device_id": stirrer_id, "action_name": "start_stir", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "stir_speed": STIR_SPEED, "purpose": "抽真空充气前预搅拌" } @@ -685,7 +685,7 @@ def generate_evacuateandrefill_protocol( action_sequence.append({ "device_id": stirrer_id, "action_name": "stop_stir", - "action_kwargs": {"vessel": vessel_id} # 🔧 使用 vessel_id + "action_kwargs": {"vessel": {"id": vessel_id},} # 🔧 使用 vessel_id }) else: action_sequence.append(create_action_log("跳过搅拌器停止", "⏭️")) diff --git a/unilabos/compile/evaporate_protocol.py b/unilabos/compile/evaporate_protocol.py index ba396c13..fd0b12ff 100644 --- a/unilabos/compile/evaporate_protocol.py +++ b/unilabos/compile/evaporate_protocol.py @@ -329,7 +329,7 @@ def generate_evaporate_protocol( "device_id": rotavap_device, "action_name": "evaporate", "action_kwargs": { - "vessel": target_vessel, + "vessel": {"id": target_vessel}, "pressure": float(pressure), "temp": float(temp), "time": float(final_time), # 🔧 强制转换为float类型 diff --git a/unilabos/compile/heatchill_protocol.py b/unilabos/compile/heatchill_protocol.py index bc5a15db..9e1f6597 100644 --- a/unilabos/compile/heatchill_protocol.py +++ b/unilabos/compile/heatchill_protocol.py @@ -220,7 +220,7 @@ def generate_heat_chill_protocol( "device_id": heatchill_id, "action_name": "heat_chill", "action_kwargs": { - "vessel": vessel, + "vessel": {"id": vessel}, "temp": float(final_temp), "time": float(final_time), "stir": bool(stir), @@ -287,7 +287,8 @@ def generate_heat_chill_start_protocol( "action_name": "heat_chill_start", "action_kwargs": { "temp": temp, - "purpose": purpose or f"开始加热到 {temp}°C" + "purpose": purpose or f"开始加热到 {temp}°C", + "vessel": {"id": vessel_id}, } }] diff --git a/unilabos/compile/separate_protocol.py b/unilabos/compile/separate_protocol.py index 6b2800d3..307d9787 100644 --- a/unilabos/compile/separate_protocol.py +++ b/unilabos/compile/separate_protocol.py @@ -265,7 +265,7 @@ def generate_separate_protocol( "device_id": stirrer_device, "action_name": "start_stir", "action_kwargs": { - "vessel": final_vessel_id, # 🔧 使用 final_vessel_id + "vessel": {"id": final_vessel_id}, # 🔧 使用 final_vessel_id "stir_speed": stir_speed, "purpose": f"分离混合 - {purpose}" } diff --git a/unilabos/compile/stir_protocol.py b/unilabos/compile/stir_protocol.py index 64a821a8..73d1b878 100644 --- a/unilabos/compile/stir_protocol.py +++ b/unilabos/compile/stir_protocol.py @@ -234,7 +234,7 @@ def generate_stir_protocol( "action_name": "stir", "action_kwargs": { # 🔧 关键修复:传递vessel_id字符串,而不是完整的Resource对象 - "vessel": vessel_id, # 传递字符串ID,不是Resource对象 + "vessel": {"id": vessel_id}, # 传递字符串ID,不是Resource对象 "time": str(time), "event": event, "time_spec": time_spec, @@ -323,7 +323,7 @@ def generate_start_stir_protocol( "action_name": "start_stir", "action_kwargs": { # 🔧 关键修复:传递vessel_id字符串,而不是完整的Resource对象 - "vessel": vessel_id, # 传递字符串ID,不是Resource对象 + "vessel": {"id": vessel_id}, # 传递字符串ID,不是Resource对象 "stir_speed": stir_speed, "purpose": purpose or f"启动搅拌 {stir_speed} RPM" } @@ -383,7 +383,7 @@ def generate_stop_stir_protocol( "action_name": "stop_stir", "action_kwargs": { # 🔧 关键修复:传递vessel_id字符串,而不是完整的Resource对象 - "vessel": vessel_id # 传递字符串ID,不是Resource对象 + "vessel": {"id": vessel_id}, # 传递字符串ID,不是Resource对象 } }] diff --git a/unilabos/compile/wash_solid_protocol.py b/unilabos/compile/wash_solid_protocol.py index a295d6ee..096476c7 100644 --- a/unilabos/compile/wash_solid_protocol.py +++ b/unilabos/compile/wash_solid_protocol.py @@ -361,7 +361,7 @@ def generate_wash_solid_protocol( "device_id": "stirrer_1", "action_name": "stir", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "time": str(time), "stir_time": final_time, "stir_speed": stir_speed, @@ -377,7 +377,7 @@ def generate_wash_solid_protocol( "device_id": "filter_1", "action_name": "filter", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id + "vessel": {"id": vessel_id}, # 🔧 使用 vessel_id "filtrate_vessel": actual_filtrate_vessel, "temp": temp, "volume": final_volume diff --git a/unilabos/devices/virtual/virtual_solid_dispenser.py b/unilabos/devices/virtual/virtual_solid_dispenser.py index 4d914df9..f8c14a75 100644 --- a/unilabos/devices/virtual/virtual_solid_dispenser.py +++ b/unilabos/devices/virtual/virtual_solid_dispenser.py @@ -288,7 +288,7 @@ class VirtualSolidDispenser: "return_info": f"dispensed_{actual_amount:.6f}g", "dispensed_amount": actual_amount, "reagent": reagent, - "vessel": vessel + "vessel": {"id": vessel}, } except Exception as e: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 08d5321e..f46c7d6f 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -683,7 +683,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): if action_name not in ["create_resource_detailed", "create_resource"]: for k, v in goal.get_fields_and_field_types().items(): if v in ["unilabos_msgs/Resource", "sequence"]: - self.lab_logger().info(f"查询资源状态: Key: {k} Type: {v}") + self.lab_logger().info(f"{action_name} 查询资源状态: Key: {k} Type: {v}") current_resources: Union[List[Resource], List[List[Resource]]] = [] # TODO: resource后面需要分组 only_one_resource = False @@ -722,7 +722,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): try: action_kwargs[k] = self.resource_tracker.figure_resource(final_resource, try_mode=False) except Exception as e: - self.lab_logger().error(f"物料实例获取失败: {e}\n{traceback.format_exc()}") + self.lab_logger().error(f"{action_name} 物料实例获取失败: {e}\n{traceback.format_exc()}") error_skip = True execution_error = traceback.format_exc() break diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py deleted file mode 100644 index 02e6674e..00000000 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ /dev/null @@ -1,406 +0,0 @@ -import json -import time -import traceback -from pprint import pprint, saferepr, pformat -from typing import Union - -import rclpy -from rosidl_runtime_py import message_to_ordereddict - -from unilabos.messages import * # type: ignore # protocol names -from rclpy.action import ActionServer, ActionClient -from rclpy.action.server import ServerGoalHandle -from rclpy.callback_groups import ReentrantCallbackGroup -from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceGet, ResourceUpdate # type: ignore - -from unilabos.compile import action_protocol_generators -from unilabos.resources.graphio import list_to_nested_dict, nested_dict_to_list -from unilabos.ros.initialize_device import initialize_device_from_dict -from unilabos.ros.msgs.message_converter import ( - get_action_type, - convert_to_ros_msg, - convert_from_ros_msg, - convert_from_ros_msg_with_mapping, String, -) -from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode -from unilabos.utils.log import error -from unilabos.utils.type_check import serialize_result_info, get_result_info_str - - -class ROS2ProtocolNodeTempError(Exception): - pass - -class ROS2ProtocolNode(BaseROS2DeviceNode): - """ - ROS2ProtocolNode代表管理ROS2环境中设备通信和动作的协议节点。 - 它初始化设备节点,处理动作客户端,并基于指定的协议执行工作流。 - 它还物理上代表一组协同工作的设备,如带夹持器的机械臂,带传送带的CNC机器等。 - """ - - # create_action_server = False # Action Server要自己创建 - - def __init__( - self, - device_id: str, - children: dict, - protocol_type: Union[str, list[str]], - resource_tracker: DeviceNodeResourceTracker, - *args, - **kwargs, - ): - self._setup_protocol_names(protocol_type) - - # 初始化其它属性 - self.children = children - self._busy = False - self.sub_devices = {} - self._goals = {} - self._protocol_servers = {} - self._action_clients = {} - - # 初始化基类,让基类处理常规动作 - super().__init__( - driver_instance=self, - device_id=device_id, - status_types={}, - action_value_mappings=self.protocol_action_mappings, - hardware_interface={}, - print_publish=False, - resource_tracker=resource_tracker, - ) - - # 初始化子设备 - self.communication_node_id_to_instance = {} - - for device_id, device_config in self.children.items(): - if device_config.get("type", "device") != "device": - self.lab_logger().debug( - f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." - ) - continue - try: - d = self.initialize_device(device_id, device_config) - except Exception as ex: - self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}") - d = None - if d is None: - continue - - if "serial_" in device_id or "io_" in device_id: - self.communication_node_id_to_instance[device_id] = d - continue - - for device_id, device_config in self.children.items(): - if device_config.get("type", "device") != "device": - continue - # 设置硬件接口代理 - if device_id not in self.sub_devices: - self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...") - continue - d = self.sub_devices[device_id] - if d: - hardware_interface = d.ros_node_instance._hardware_interface - if ( - hasattr(d.driver_instance, hardware_interface["name"]) - and hasattr(d.driver_instance, hardware_interface["write"]) - and (hardware_interface["read"] is None or hasattr(d.driver_instance, hardware_interface["read"])) - ): - - name = getattr(d.driver_instance, hardware_interface["name"]) - read = hardware_interface.get("read", None) - write = hardware_interface.get("write", None) - - # 如果硬件接口是字符串,通过通信设备提供 - if isinstance(name, str) and name in self.sub_devices: - communicate_device = self.sub_devices[name] - communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface - self._setup_hardware_proxy(d, self.sub_devices[name], read, write) - self.lab_logger().info( - f"\n通信代理:为子设备{device_id}\n " - f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n " - f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})" - ) - - self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") - - def _setup_protocol_names(self, protocol_type): - # 处理协议类型 - if isinstance(protocol_type, str): - if "," not in protocol_type: - self.protocol_names = [protocol_type] - else: - self.protocol_names = [protocol.strip() for protocol in protocol_type.split(",")] - else: - self.protocol_names = protocol_type - # 准备协议相关的动作值映射 - self.protocol_action_mappings = {} - for protocol_name in self.protocol_names: - protocol_type = globals()[protocol_name] - self.protocol_action_mappings[protocol_name] = get_action_type(protocol_type) - - def initialize_device(self, device_id, device_config): - """初始化设备并创建相应的动作客户端""" - # device_id_abs = f"{self.device_id}/{device_id}" - device_id_abs = f"{device_id}" - self.lab_logger().info(f"初始化子设备: {device_id_abs}") - d = self.sub_devices[device_id] = initialize_device_from_dict(device_id_abs, device_config) - - # 为子设备的每个动作创建动作客户端 - if d is not None and hasattr(d, "ros_node_instance"): - node = d.ros_node_instance - node.resource_tracker = self.resource_tracker # 站内应当共享资源跟踪器 - for action_name, action_mapping in node._action_value_mappings.items(): - if action_name.startswith("auto-") or str(action_mapping.get("type", "")).startswith("UniLabJsonCommand"): - continue - action_id = f"/devices/{device_id_abs}/{action_name}" - if action_id not in self._action_clients: - try: - self._action_clients[action_id] = ActionClient( - self, action_mapping["type"], action_id, callback_group=self.callback_group - ) - except Exception as ex: - self.lab_logger().error(f"创建动作客户端失败: {action_id}, 错误: {ex}") - continue - self.lab_logger().trace(f"为子设备 {device_id} 创建动作客户端: {action_name}") - return d - - def create_ros_action_server(self, action_name, action_value_mapping): - """创建ROS动作服务器""" - # 和Base创建的路径是一致的 - protocol_name = action_name - action_type = action_value_mapping["type"] - str_action_type = str(action_type)[8:-2] - protocol_type = globals()[protocol_name] - protocol_steps_generator = action_protocol_generators[protocol_type] - - self._action_servers[action_name] = ActionServer( - self, - action_type, - action_name, - execute_callback=self._create_protocol_execute_callback(action_name, protocol_steps_generator), - callback_group=ReentrantCallbackGroup(), - ) - - self.lab_logger().trace(f"发布动作: {action_name}, 类型: {str_action_type}") - - def _create_protocol_execute_callback(self, protocol_name, protocol_steps_generator): - async def execute_protocol(goal_handle: ServerGoalHandle): - """执行完整的工作流""" - # 初始化结果信息变量 - execution_error = "" - execution_success = False - protocol_return_value = None - self.lab_logger().info(f"Executing {protocol_name} action...") - action_value_mapping = self._action_value_mappings[protocol_name] - step_results = [] - try: - self.lab_logger().warning("+" * 30) - # 从目标消息中提取参数, 并调用Protocol生成器(根据设备连接图)生成action步骤 - goal = goal_handle.request - protocol_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"]) - - # # 🔧 添加调试信息 - # print(f"🔍 转换后的 protocol_kwargs: {protocol_kwargs}") - # print(f"🔍 vessel 在转换后: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") - - # # 🔧 完全禁用Host查询,直接使用转换后的数据 - # print(f"🔧 跳过Host查询,直接使用转换后的数据") - # 向Host查询物料当前状态 - for k, v in goal.get_fields_and_field_types().items(): - if v in ["unilabos_msgs/Resource", "sequence"]: - r = ResourceGet.Request() - resource_id = ( - protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"] - ) - r.id = resource_id - r.with_children = True - response = await self._resource_clients["resource_get"].call_async(r) - protocol_kwargs[k] = list_to_nested_dict( - [convert_from_ros_msg(rs) for rs in response.resources] - ) - - # self.lab_logger().info(f"🔍 最终的 vessel: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") - - from unilabos.resources.graphio import physical_setup_graph - - self.lab_logger().info(f"Working on physical setup: {physical_setup_graph}") - protocol_steps = protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs) - logs = [] - for step in protocol_steps: - if isinstance(step, dict) and "log_message" in step.get("action_kwargs", {}): - logs.append(step) - elif isinstance(step, list): - logs.append(step) - self.lab_logger().info(f"Goal received: {protocol_kwargs}, running steps: " - f"{json.dumps(logs, indent=4, ensure_ascii=False)}") - - time_start = time.time() - time_overall = 100 - self._busy = True - - # 逐步执行工作流 - for i, action in enumerate(protocol_steps): - # self.get_logger().info(f"Running step {i + 1}: {action}") - if isinstance(action, dict): - # 如果是单个动作,直接执行 - if action["action_name"] == "wait": - time.sleep(action["action_kwargs"]["time"]) - step_results.append({"step": i + 1, "action": "wait", "result": "completed"}) - else: - try: - result = await self.execute_single_action(**action) - step_results.append({"step": i + 1, "action": action["action_name"], "result": result}) - ret_info = json.loads(getattr(result, "return_info", "{}")) - if not ret_info.get("suc", False): - raise RuntimeError(f"Step {i + 1} failed.") - except ROS2ProtocolNodeTempError as ex: - step_results.append({"step": i + 1, "action": action["action_name"], "result": ex.args[0]}) - elif isinstance(action, list): - # 如果是并行动作,同时执行 - actions = action - futures = [ - rclpy.get_global_executor().create_task(self.execute_single_action(**a)) for a in actions - ] - results = [await f for f in futures] - step_results.append( - { - "step": i + 1, - "parallel_actions": [a["action_name"] for a in actions], - "results": results, - } - ) - - # 向Host更新物料当前状态 - for k, v in goal.get_fields_and_field_types().items(): - if v in ["unilabos_msgs/Resource", "sequence"]: - r = ResourceUpdate.Request() - r.resources = [ - convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k]) - ] - response = await self._resource_clients["resource_update"].call_async(r) - - # 设置成功状态和返回值 - execution_success = True - protocol_return_value = { - "protocol_name": protocol_name, - "steps_executed": len(protocol_steps), - "step_results": step_results, - "total_time": time.time() - time_start, - } - - goal_handle.succeed() - - except Exception as e: - # 捕获并记录错误信息 - str_step_results = [{k: dict(message_to_ordereddict(v)) if k == "result" and hasattr(v, "SLOT_TYPES") else v for k, v in i.items()} for i in step_results] - execution_error = f"{traceback.format_exc()}\n\nStep Result: {pformat(str_step_results)}" - execution_success = False - self.lab_logger().error(f"协议 {protocol_name} 执行出错: {str(e)} \n{traceback.format_exc()}") - - # 设置动作失败 - goal_handle.abort() - - finally: - self._busy = False - - # 创建结果消息 - result = action_value_mapping["type"].Result() - result.success = execution_success - - # 获取结果消息类型信息,检查是否有return_info字段 - result_msg_types = action_value_mapping["type"].Result.get_fields_and_field_types() - - # 设置return_info字段(如果存在) - for attr_name in result_msg_types.keys(): - if attr_name in ["success", "reached_goal"]: - setattr(result, attr_name, execution_success) - elif attr_name == "return_info": - setattr( - result, - attr_name, - get_result_info_str(execution_error, execution_success, protocol_return_value), - ) - - self.lab_logger().info(f"协议 {protocol_name} 完成并返回结果") - return result - - return execute_protocol - - async def execute_single_action(self, device_id, action_name, action_kwargs): - """执行单个动作""" - # 构建动作ID - if action_name == "log_message": - self.lab_logger().info(f"[Protocol Log] {action_kwargs}") - raise ROS2ProtocolNodeTempError(f"[Protocol Log] {action_kwargs}") - if device_id in ["", None, "self"]: - action_id = f"/devices/{self.device_id}/{action_name}" - else: - action_id = f"/devices/{device_id}/{action_name}" # 执行时取消了主节点信息 /{self.device_id} - - # 检查动作客户端是否存在 - if action_id not in self._action_clients: - self.lab_logger().error(f"找不到动作客户端: {action_id}") - return None - - # 发送动作请求 - action_client = self._action_clients[action_id] - goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) - - ##### self.lab_logger().info(f"发送动作请求到: {action_id}") - action_client.wait_for_server() - - # 等待动作完成 - request_future = action_client.send_goal_async(goal_msg) - handle = await request_future - - if not handle.accepted: - self.lab_logger().error(f"动作请求被拒绝: {action_name}") - return None - - result_future = await handle.get_result_async() - ##### self.lab_logger().info(f"动作完成: {action_name}") - - return result_future.result - - """还没有改过的部分""" - - def _setup_hardware_proxy( - self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method - ): - """为设备设置硬件接口代理""" - # extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])] - write_func = getattr( - communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["write"] - ) - read_func = getattr( - communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["read"] - ) - - def _read(*args, **kwargs): - return read_func(*args, **kwargs) - - def _write(*args, **kwargs): - return write_func(*args, **kwargs) - - if read_method: - # bound_read = MethodType(_read, device.driver_instance) - setattr(device.driver_instance, read_method, _read) - - if write_method: - # bound_write = MethodType(_write, device.driver_instance) - setattr(device.driver_instance, write_method, _write) - - async def _update_resources(self, goal, protocol_kwargs): - """更新资源状态""" - for k, v in goal.get_fields_and_field_types().items(): - if v in ["unilabos_msgs/Resource", "sequence"]: - if protocol_kwargs[k] is not None: - try: - r = ResourceUpdate.Request() - r.resources = [ - convert_to_ros_msg(Resource, rs) for rs in nested_dict_to_list(protocol_kwargs[k]) - ] - await self._resource_clients["resource_update"].call_async(r) - except Exception as e: - self.lab_logger().error(f"更新资源失败: {e}") diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 92d47cc9..fdf27b94 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -24,7 +24,7 @@ from unilabos.ros.msgs.message_converter import ( convert_from_ros_msg_with_mapping, ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode -from unilabos.utils.type_check import serialize_result_info +from unilabos.utils.type_check import serialize_result_info, get_result_info_str if TYPE_CHECKING: from unilabos.devices.workstation.workstation_base import WorkstationBase @@ -203,12 +203,12 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): execution_error = "" execution_success = False protocol_return_value = None - self.get_logger().info(f"Executing {protocol_name} action...") + self.lab_logger().info(f"Executing {protocol_name} action...") action_value_mapping = self._action_value_mappings[protocol_name] step_results = [] try: - print("+" * 30) - print(protocol_steps_generator) + self.lab_logger().warning("+" * 30) + self.lab_logger().info(protocol_steps_generator) # 从目标消息中提取参数, 并调用Protocol生成器(根据设备连接图)生成action步骤 goal = goal_handle.request protocol_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"]) @@ -334,7 +334,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): setattr( result, attr_name, - serialize_result_info(execution_error, execution_success, protocol_return_value), + get_result_info_str(execution_error, execution_success, protocol_return_value), ) self.lab_logger().info(f"协议 {protocol_name} 完成并返回结果")