diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 4495dbf8..6952320f 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -582,7 +582,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): except Exception as e: self.lab_logger().error(f"更新资源uuid失败: {e}") self.lab_logger().error(traceback.format_exc()) - self.lab_logger().debug(f"资源更新结果: {response}") + self.lab_logger().trace(f"资源更新结果: {response}") async def get_resource(self, resources_uuid: List[str], with_children: bool = True) -> ResourceTreeSet: """ @@ -1164,7 +1164,6 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_error = traceback.format_exc() break - ##### self.lab_logger().info(f"准备执行: {action_kwargs}, 函数: {ACTION.__name__}") time_start = time.time() time_overall = 100 future = None @@ -1172,35 +1171,36 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 将阻塞操作放入线程池执行 if asyncio.iscoroutinefunction(ACTION): try: - ##### self.lab_logger().info(f"异步执行动作 {ACTION}") - future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) - - def _handle_future_exception(fut): + self.lab_logger().trace(f"异步执行动作 {ACTION}") + def _handle_future_exception(fut: Future): nonlocal execution_error, execution_success, action_return_value try: action_return_value = fut.result() + if isinstance(action_return_value, BaseException): + raise action_return_value execution_success = True - except Exception as e: + except Exception as _: execution_error = traceback.format_exc() error( f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" ) + future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) future.add_done_callback(_handle_future_exception) except Exception as e: execution_error = traceback.format_exc() execution_success = False self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") else: - ##### self.lab_logger().info(f"同步执行动作 {ACTION}") + self.lab_logger().trace(f"同步执行动作 {ACTION}") future = self._executor.submit(ACTION, **action_kwargs) - def _handle_future_exception(fut): + def _handle_future_exception(fut: Future): nonlocal execution_error, execution_success, action_return_value try: action_return_value = fut.result() execution_success = True - except Exception as e: + except Exception as _: execution_error = traceback.format_exc() error( f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" @@ -1305,7 +1305,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): get_result_info_str(execution_error, execution_success, action_return_value), ) - ##### self.lab_logger().info(f"动作 {action_name} 完成并返回结果") + self.lab_logger().trace(f"动作 {action_name} 完成并返回结果") return result_msg return execute_callback @@ -1540,17 +1540,29 @@ class ROS2DeviceNode: 这个类封装了设备类实例和ROS2节点的功能,提供ROS2接口。 它不继承设备类,而是通过代理模式访问设备类的属性和方法。 """ + @staticmethod + async def safe_task_wrapper(trace_callback, func, **kwargs): + try: + if callable(trace_callback): + trace_callback(await func(**kwargs)) + return await func(**kwargs) + except Exception as e: + if callable(trace_callback): + trace_callback(e) + return e @classmethod - def run_async_func(cls, func, trace_error=True, **kwargs) -> Task: - def _handle_future_exception(fut): + def run_async_func(cls, func, trace_error=True, inner_trace_callback=None, **kwargs) -> Task: + def _handle_future_exception(fut: Future): try: - fut.result() + ret = fut.result() + if isinstance(ret, BaseException): + raise ret except Exception as e: - error(f"异步任务 {func.__name__} 报错了") + error(f"异步任务 {func.__name__} 获取结果失败") error(traceback.format_exc()) - future = rclpy.get_global_executor().create_task(func(**kwargs)) + future = rclpy.get_global_executor().create_task(ROS2DeviceNode.safe_task_wrapper(inner_trace_callback, func, **kwargs)) if trace_error: future.add_done_callback(_handle_future_exception) return future