fix ros2 future

This commit is contained in:
Xuwznln
2025-12-04 18:44:50 +08:00
parent 154048107d
commit 04024bc8a3

View File

@@ -582,7 +582,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
except Exception as e: except Exception as e:
self.lab_logger().error(f"更新资源uuid失败: {e}") self.lab_logger().error(f"更新资源uuid失败: {e}")
self.lab_logger().error(traceback.format_exc()) self.lab_logger().error(traceback.format_exc())
self.lab_logger().debug(f"资源更新结果: {response}") self.lab_logger().trace(f"资源更新结果: {response}")
async def get_resource(self, resources_uuid: List[str], with_children: bool = True) -> ResourceTreeSet: async def get_resource(self, resources_uuid: List[str], with_children: bool = True) -> ResourceTreeSet:
""" """
@@ -1164,7 +1164,6 @@ class BaseROS2DeviceNode(Node, Generic[T]):
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
break break
##### self.lab_logger().info(f"准备执行: {action_kwargs}, 函数: {ACTION.__name__}")
time_start = time.time() time_start = time.time()
time_overall = 100 time_overall = 100
future = None future = None
@@ -1172,35 +1171,36 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 将阻塞操作放入线程池执行 # 将阻塞操作放入线程池执行
if asyncio.iscoroutinefunction(ACTION): if asyncio.iscoroutinefunction(ACTION):
try: try:
##### self.lab_logger().info(f"异步执行动作 {ACTION}") self.lab_logger().trace(f"异步执行动作 {ACTION}")
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) def _handle_future_exception(fut: Future):
def _handle_future_exception(fut):
nonlocal execution_error, execution_success, action_return_value nonlocal execution_error, execution_success, action_return_value
try: try:
action_return_value = fut.result() action_return_value = fut.result()
if isinstance(action_return_value, BaseException):
raise action_return_value
execution_success = True execution_success = True
except Exception as e: except Exception as _:
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
error( error(
f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
) )
future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs)
future.add_done_callback(_handle_future_exception) future.add_done_callback(_handle_future_exception)
except Exception as e: except Exception as e:
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
execution_success = False execution_success = False
self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}")
else: else:
##### self.lab_logger().info(f"同步执行动作 {ACTION}") self.lab_logger().trace(f"同步执行动作 {ACTION}")
future = self._executor.submit(ACTION, **action_kwargs) future = self._executor.submit(ACTION, **action_kwargs)
def _handle_future_exception(fut): def _handle_future_exception(fut: Future):
nonlocal execution_error, execution_success, action_return_value nonlocal execution_error, execution_success, action_return_value
try: try:
action_return_value = fut.result() action_return_value = fut.result()
execution_success = True execution_success = True
except Exception as e: except Exception as _:
execution_error = traceback.format_exc() execution_error = traceback.format_exc()
error( error(
f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}"
@@ -1305,7 +1305,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
get_result_info_str(execution_error, execution_success, action_return_value), get_result_info_str(execution_error, execution_success, action_return_value),
) )
##### self.lab_logger().info(f"动作 {action_name} 完成并返回结果") self.lab_logger().trace(f"动作 {action_name} 完成并返回结果")
return result_msg return result_msg
return execute_callback return execute_callback
@@ -1540,17 +1540,29 @@ class ROS2DeviceNode:
这个类封装了设备类实例和ROS2节点的功能提供ROS2接口。 这个类封装了设备类实例和ROS2节点的功能提供ROS2接口。
它不继承设备类,而是通过代理模式访问设备类的属性和方法。 它不继承设备类,而是通过代理模式访问设备类的属性和方法。
""" """
@staticmethod
async def safe_task_wrapper(trace_callback, func, **kwargs):
try:
if callable(trace_callback):
trace_callback(await func(**kwargs))
return await func(**kwargs)
except Exception as e:
if callable(trace_callback):
trace_callback(e)
return e
@classmethod @classmethod
def run_async_func(cls, func, trace_error=True, **kwargs) -> Task: def run_async_func(cls, func, trace_error=True, inner_trace_callback=None, **kwargs) -> Task:
def _handle_future_exception(fut): def _handle_future_exception(fut: Future):
try: try:
fut.result() ret = fut.result()
if isinstance(ret, BaseException):
raise ret
except Exception as e: except Exception as e:
error(f"异步任务 {func.__name__} 报错了") error(f"异步任务 {func.__name__} 获取结果失败")
error(traceback.format_exc()) error(traceback.format_exc())
future = rclpy.get_global_executor().create_task(func(**kwargs)) future = rclpy.get_global_executor().create_task(ROS2DeviceNode.safe_task_wrapper(inner_trace_callback, func, **kwargs))
if trace_error: if trace_error:
future.add_done_callback(_handle_future_exception) future.add_done_callback(_handle_future_exception)
return future return future