From fe1a029a9b79bd875ceecf3d5a1ba6e38182513f Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sun, 10 Aug 2025 17:31:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96protocol=20node?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E8=BF=90=E8=A1=8C=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/ros/nodes/base_device_node.py | 17 ++++--- unilabos/ros/nodes/presets/host_node.py | 50 ++++++++++++++------- unilabos/ros/nodes/presets/protocol_node.py | 18 +++++--- 3 files changed, 54 insertions(+), 31 deletions(-) diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 9f3d703d..e7b4da21 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -654,7 +654,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): ACTION, action_paramtypes = self.get_real_function(self.driver_instance, action_name) action_kwargs = convert_from_ros_msg_with_mapping(goal, action_value_mapping["goal"]) - self.lab_logger().debug(f"接收到原始目标: {action_kwargs}") + self.lab_logger().debug(f"任务 {ACTION.__name__} 接收到原始目标: {action_kwargs}") # 向Host查询物料当前状态,如果是host本身的增加物料的请求,则直接跳过 if action_name not in ["create_resource_detailed", "create_resource"]: for k, v in goal.get_fields_and_field_types().items(): @@ -699,7 +699,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): ##### self.lab_logger().info(f"准备执行: {action_kwargs}, 函数: {ACTION.__name__}") time_start = time.time() time_overall = 100 - + future = None # 将阻塞操作放入线程池执行 if asyncio.iscoroutinefunction(ACTION): try: @@ -713,13 +713,14 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = True except Exception as e: execution_error = traceback.format_exc() - ##### error(f"异步任务 {ACTION.__name__} 报错了") + error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") error(traceback.format_exc()) future.add_done_callback(_handle_future_exception) except Exception as e: + execution_error = traceback.format_exc() + execution_success = False self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") - raise e else: ##### self.lab_logger().info(f"同步执行动作 {ACTION}") future = self._executor.submit(ACTION, **action_kwargs) @@ -730,9 +731,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): action_return_value = fut.result() execution_success = True except Exception as e: - execution_error = traceback.format_exc() - error(f"同步任务 {ACTION.__name__} 报错了") - error(traceback.format_exc()) + error(f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") future.add_done_callback(_handle_future_exception) @@ -740,7 +739,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): feedback_msg_types = action_type.Feedback.get_fields_and_field_types() result_msg_types = action_type.Result.get_fields_and_field_types() - while not future.done(): + while future is not None and not future.done(): if goal_handle.is_cancel_requested: self.lab_logger().info(f"取消动作: {action_name}") future.cancel() # 尝试取消线程池中的任务 @@ -772,7 +771,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): goal_handle.publish_feedback(feedback_msg) time.sleep(0.5) - if future.cancelled(): + if future is not None and future.cancelled(): self.lab_logger().info(f"动作 {action_name} 已取消") return action_type.Result() diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 27c1e423..eeb32ce1 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -39,6 +39,7 @@ from unilabos.ros.msgs.message_converter import ( from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode from unilabos.utils.exception import DeviceClassInvalid +from unilabos.utils.type_check import serialize_result_info class HostNode(BaseROS2DeviceNode): @@ -408,10 +409,11 @@ class HostNode(BaseROS2DeviceNode): liquid_volume: list[int], slot_on_deck: str, ): + # 暂不支持多对同名父子同时存在 res_creation_input = { - "name": res_id, + "name": res_id.split("/")[-1], "class": class_name, - "parent": parent, + "parent": parent.split("/")[-1], "position": { "x": bind_locations.x, "y": bind_locations.y, @@ -419,18 +421,20 @@ class HostNode(BaseROS2DeviceNode): }, } if len(liquid_input_slot) and liquid_input_slot[0] == -1: # 目前container只逐个创建 - res_creation_input.update({ - "data": { - "liquid_type": liquid_type[0] if liquid_type else None, - "liquid_volume": liquid_volume[0] if liquid_volume else None, + res_creation_input.update( + { + "data": { + "liquid_type": liquid_type[0] if liquid_type else None, + "liquid_volume": liquid_volume[0] if liquid_volume else None, + } } - }) + ) init_new_res = initialize_resource(res_creation_input) # flatten的格式 if len(init_new_res) > 1: # 一个物料,多个子节点 init_new_res = [init_new_res] resources: List[Resource] | List[List[Resource]] = init_new_res # initialize_resource已经返回list[dict] device_ids = [device_id] - bind_parent_id = [parent] + bind_parent_id = [res_creation_input["parent"]] bind_location = [bind_locations] other_calling_param = [ json.dumps( @@ -680,20 +684,34 @@ class HostNode(BaseROS2DeviceNode): result_msg = future.result().result result_data = convert_from_ros_msg(result_msg) status = "success" - try: - ret = json.loads(result_data.get("return_info", "{}")) # 确保返回信息是有效的JSON - suc = ret.get("suc", False) - if not suc: + return_info_str = result_data.get("return_info") + + if return_info_str is not None: + try: + ret = json.loads(return_info_str) + suc = ret.get("suc", False) + if not suc: + status = "failed" + except json.JSONDecodeError: status = "failed" - except json.JSONDecodeError: - status = "failed" - self.lab_logger().info(f"[Host Node] Result for {action_id} ({uuid_str}): success") + else: + # 无 return_info 字段时,回退到 success 字段(若存在) + suc_field = result_data.get("success") + if isinstance(suc_field, bool): + status = "success" if suc_field else "failed" + return_info_str = serialize_result_info("", suc_field, result_data) + else: + # 最保守的回退:标记失败并返回空JSON + status = "failed" + return_info_str = serialize_result_info("缺少return_info", False, result_data) + + self.lab_logger().info(f"[Host Node] Result for {action_id} ({uuid_str}): {status}") self.lab_logger().debug(f"[Host Node] Result data: {result_data}") if uuid_str: for bridge in self.bridges: if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status(result_data, uuid_str, status, result_data.get("return_info", "{}")) + bridge.publish_job_status(result_data, uuid_str, status, return_info_str) def cancel_goal(self, goal_uuid: str) -> None: """取消目标""" diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index 76129a18..58734a3c 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -1,8 +1,12 @@ +import json import time import traceback +from pprint import pprint, saferepr, pformat from typing import Union import rclpy +from rosidl_runtime_py import message_to_ordereddict + from unilabos.messages import * # type: ignore # protocol names from rclpy.action import ActionServer, ActionClient from rclpy.action.server import ServerGoalHandle @@ -185,6 +189,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): protocol_return_value = None self.get_logger().info(f"Executing {protocol_name} action...") action_value_mapping = self._action_value_mappings[protocol_name] + step_results = [] try: print("+" * 30) print(protocol_steps_generator) @@ -227,7 +232,6 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): self._busy = True # 逐步执行工作流 - step_results = [] for i, action in enumerate(protocol_steps): # self.get_logger().info(f"Running step {i + 1}: {action}") if isinstance(action, dict): @@ -238,6 +242,9 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): else: result = await self.execute_single_action(**action) step_results.append({"step": i + 1, "action": action["action_name"], "result": result}) + ret_info = json.loads(getattr(result, "return_info", "{}")) + if not ret_info.get("suc", False): + raise RuntimeError(f"Step {i + 1} failed.") elif isinstance(action, list): # 如果是并行动作,同时执行 actions = action @@ -275,11 +282,10 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): except Exception as e: # 捕获并记录错误信息 - execution_error = traceback.format_exc() + str_step_results = [{k: dict(message_to_ordereddict(v)) if k == "result" and hasattr(v, "SLOT_TYPES") else v for k, v in i.items()} for i in step_results] + execution_error = f"{traceback.format_exc()}\n\nStep Result: {pformat(str_step_results)}" execution_success = False - error(f"协议 {protocol_name} 执行失败") - error(traceback.format_exc()) - self.lab_logger().error(f"协议执行出错: {str(e)}") + self.lab_logger().error(f"协议 {protocol_name} 执行出错: {str(e)} \n{traceback.format_exc()}") # 设置动作失败 goal_handle.abort() @@ -305,7 +311,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): serialize_result_info(execution_error, execution_success, protocol_return_value), ) - self.lab_logger().info(f"🤩🤩🤩🤩🤩🤩协议 {protocol_name} 完成并返回结果😎😎😎😎😎😎") + self.lab_logger().info(f"协议 {protocol_name} 完成并返回结果") return result return execute_protocol