diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index f1063123..edf41fbd 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -53,7 +53,7 @@ from unilabos.ros.nodes.resource_tracker import ( ) from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator -from unilabos.utils.async_util import run_async_func +from rclpy.task import Task from unilabos.utils.import_manager import default_manager from unilabos.utils.log import info, debug, warning, error, critical, logger, trace from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str @@ -1385,18 +1385,19 @@ class ROS2DeviceNode: 它不继承设备类,而是通过代理模式访问设备类的属性和方法。 """ - # 类变量,用于循环管理 - _loop = None - _loop_running = False - _loop_thread = None - @classmethod - def get_loop(cls): - return cls._loop + def run_async_func(cls, func, trace_error=True, **kwargs) -> Task: + def _handle_future_exception(fut): + try: + fut.result() + except Exception as e: + error(f"异步任务 {func.__name__} 报错了") + error(traceback.format_exc()) - @classmethod - def run_async_func(cls, func, trace_error=True, **kwargs): - return run_async_func(func, loop=cls._loop, trace_error=trace_error, **kwargs) + future = rclpy.get_global_executor().create_task(func(**kwargs)) + if trace_error: + future.add_done_callback(_handle_future_exception) + return future @property def driver_instance(self): @@ -1436,11 +1437,6 @@ class ROS2DeviceNode: print_publish: 是否打印发布信息 driver_is_ros: """ - # 在初始化时检查循环状态 - if ROS2DeviceNode._loop_running and ROS2DeviceNode._loop_thread is not None: - pass - elif ROS2DeviceNode._loop_thread is None: - self._start_loop() # 保存设备类是否支持异步上下文 self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__") @@ -1529,17 +1525,6 @@ class ROS2DeviceNode: except Exception as e: self._ros_node.lab_logger().error(f"设备后初始化失败: {e}") - def _start_loop(self): - def run_event_loop(): - loop = asyncio.new_event_loop() - ROS2DeviceNode._loop = loop - asyncio.set_event_loop(loop) - loop.run_forever() - - ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNodeLoop") - ROS2DeviceNode._loop_thread.start() - logger.info(f"循环线程已启动") - class DeviceInfoType(TypedDict): id: str diff --git a/unilabos/utils/async_util.py b/unilabos/utils/async_util.py deleted file mode 100644 index 0f50a730..00000000 --- a/unilabos/utils/async_util.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio -import traceback -from asyncio import get_event_loop - -from unilabos.utils.log import error - - -def run_async_func(func, *, loop=None, trace_error=True, **kwargs): - if loop is None: - loop = get_event_loop() - - def _handle_future_exception(fut): - try: - fut.result() - except Exception as e: - error(f"异步任务 {func.__name__} 报错了") - error(traceback.format_exc()) - - future = asyncio.run_coroutine_threadsafe(func(**kwargs), loop) - if trace_error: - future.add_done_callback(_handle_future_exception) - return future