From 9854ed8c9c6ebcf0d907fdd98f616a3e9ad6296a Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:04:56 +0800 Subject: [PATCH] fix ros2 future print all logs to file fix resource dict dump error --- unilabos/app/communication.py | 2 +- unilabos/app/ws_client.py | 6 ++-- unilabos/resources/graphio.py | 2 +- unilabos/ros/nodes/base_device_node.py | 44 ++++++++++++++++--------- unilabos/ros/nodes/presets/host_node.py | 8 +++-- unilabos/ros/nodes/resource_tracker.py | 2 +- unilabos/utils/log.py | 11 +++---- 7 files changed, 43 insertions(+), 32 deletions(-) diff --git a/unilabos/app/communication.py b/unilabos/app/communication.py index 436fa98a..700065dc 100644 --- a/unilabos/app/communication.py +++ b/unilabos/app/communication.py @@ -141,7 +141,7 @@ class CommunicationClientFactory: """ if cls._client_cache is None: cls._client_cache = cls.create_client(protocol) - logger.info(f"[CommunicationFactory] Created {type(cls._client_cache).__name__} client") + logger.trace(f"[CommunicationFactory] Created {type(cls._client_cache).__name__} client") return cls._client_cache diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 471868bc..50204a2e 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -389,7 +389,7 @@ class MessageProcessor: self.is_running = True self.thread = threading.Thread(target=self._run, daemon=True, name="MessageProcessor") self.thread.start() - logger.info("[MessageProcessor] Started") + logger.trace("[MessageProcessor] Started") def stop(self) -> None: """停止消息处理线程""" @@ -939,7 +939,7 @@ class QueueProcessor: # 事件通知机制 self.queue_update_event = threading.Event() - logger.info("[QueueProcessor] Initialized") + logger.trace("[QueueProcessor] Initialized") def set_websocket_client(self, websocket_client: "WebSocketClient"): """设置WebSocket客户端引用""" @@ -954,7 +954,7 @@ class QueueProcessor: self.is_running = True self.thread = threading.Thread(target=self._run, daemon=True, name="QueueProcessor") self.thread.start() - logger.info("[QueueProcessor] Started") + logger.trace("[QueueProcessor] Started") def stop(self) -> None: """停止队列处理线程""" diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 4915668f..d81e5c34 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -97,7 +97,7 @@ def canonicalize_nodes_data( for node in nodes: try: - print_status(f"DeviceId: {node['id']}, Class: {node['class']}", "info") + # print_status(f"DeviceId: {node['id']}, Class: {node['class']}", "info") # 使用标准化方法 resource_instance = ResourceDictInstance.get_resource_instance_from_dict(node) known_nodes[node["id"]] = resource_instance diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 4495dbf8..6952320f 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -582,7 +582,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): except Exception as e: self.lab_logger().error(f"更新资源uuid失败: {e}") self.lab_logger().error(traceback.format_exc()) - self.lab_logger().debug(f"资源更新结果: {response}") + self.lab_logger().trace(f"资源更新结果: {response}") async def get_resource(self, resources_uuid: List[str], with_children: bool = True) -> ResourceTreeSet: """ @@ -1164,7 +1164,6 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_error = traceback.format_exc() break - ##### self.lab_logger().info(f"准备执行: {action_kwargs}, 函数: {ACTION.__name__}") time_start = time.time() time_overall = 100 future = None @@ -1172,35 +1171,36 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 将阻塞操作放入线程池执行 if asyncio.iscoroutinefunction(ACTION): try: - ##### self.lab_logger().info(f"异步执行动作 {ACTION}") - future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) - - def _handle_future_exception(fut): + self.lab_logger().trace(f"异步执行动作 {ACTION}") + def _handle_future_exception(fut: Future): nonlocal execution_error, execution_success, action_return_value try: action_return_value = fut.result() + if isinstance(action_return_value, BaseException): + raise action_return_value execution_success = True - except Exception as e: + except Exception as _: execution_error = traceback.format_exc() error( f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" ) + future = ROS2DeviceNode.run_async_func(ACTION, trace_error=False, **action_kwargs) future.add_done_callback(_handle_future_exception) except Exception as e: execution_error = traceback.format_exc() execution_success = False self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") else: - ##### self.lab_logger().info(f"同步执行动作 {ACTION}") + self.lab_logger().trace(f"同步执行动作 {ACTION}") future = self._executor.submit(ACTION, **action_kwargs) - def _handle_future_exception(fut): + def _handle_future_exception(fut: Future): nonlocal execution_error, execution_success, action_return_value try: action_return_value = fut.result() execution_success = True - except Exception as e: + except Exception as _: execution_error = traceback.format_exc() error( f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" @@ -1305,7 +1305,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): get_result_info_str(execution_error, execution_success, action_return_value), ) - ##### self.lab_logger().info(f"动作 {action_name} 完成并返回结果") + self.lab_logger().trace(f"动作 {action_name} 完成并返回结果") return result_msg return execute_callback @@ -1540,17 +1540,29 @@ class ROS2DeviceNode: 这个类封装了设备类实例和ROS2节点的功能,提供ROS2接口。 它不继承设备类,而是通过代理模式访问设备类的属性和方法。 """ + @staticmethod + async def safe_task_wrapper(trace_callback, func, **kwargs): + try: + if callable(trace_callback): + trace_callback(await func(**kwargs)) + return await func(**kwargs) + except Exception as e: + if callable(trace_callback): + trace_callback(e) + return e @classmethod - def run_async_func(cls, func, trace_error=True, **kwargs) -> Task: - def _handle_future_exception(fut): + def run_async_func(cls, func, trace_error=True, inner_trace_callback=None, **kwargs) -> Task: + def _handle_future_exception(fut: Future): try: - fut.result() + ret = fut.result() + if isinstance(ret, BaseException): + raise ret except Exception as e: - error(f"异步任务 {func.__name__} 报错了") + error(f"异步任务 {func.__name__} 获取结果失败") error(traceback.format_exc()) - future = rclpy.get_global_executor().create_task(func(**kwargs)) + future = rclpy.get_global_executor().create_task(ROS2DeviceNode.safe_task_wrapper(inner_trace_callback, func, **kwargs)) if trace_error: future.add_done_callback(_handle_future_exception) return future diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 28942fe5..33f01f92 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -718,7 +718,7 @@ class HostNode(BaseROS2DeviceNode): feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg), goal_uuid=goal_uuid_obj, ) - future.add_done_callback(lambda future: self.goal_response_callback(item, action_id, future)) + future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f)) def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: """目标响应回调""" @@ -729,9 +729,11 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted") self._goals[item.job_id] = goal_handle - goal_handle.get_result_async().add_done_callback( - lambda future: self.get_result_callback(item, action_id, future) + goal_future = goal_handle.get_result_async() + goal_future.add_done_callback( + lambda f: self.get_result_callback(item, action_id, f) ) + goal_future.result() def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None: """反馈回调""" diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index c9dc57f9..0eed1172 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -155,7 +155,7 @@ class ResourceDictInstance(object): res_dict = self.res_content.model_dump(by_alias=True) res_dict["children"] = {child.res_content.id: child.get_plr_nested_dict() for child in self.children} res_dict["parent"] = self.res_content.parent_instance_name - res_dict["position"] = self.res_content.position.position.model_dump() + res_dict["position"] = self.res_content.pose.position.model_dump() del res_dict["pose"] return res_dict diff --git a/unilabos/utils/log.py b/unilabos/utils/log.py index af03d946..ffe13c02 100644 --- a/unilabos/utils/log.py +++ b/unilabos/utils/log.py @@ -162,8 +162,9 @@ def configure_logger(loglevel=None, working_dir=None): """ # 获取根日志记录器 root_logger = logging.getLogger() - + root_logger.setLevel(TRACE_LEVEL) # 设置日志级别 + numeric_level = logging.DEBUG if loglevel is not None: if isinstance(loglevel, str): # 将字符串转换为logging级别 @@ -173,12 +174,8 @@ def configure_logger(loglevel=None, working_dir=None): numeric_level = getattr(logging, loglevel.upper(), None) if not isinstance(numeric_level, int): print(f"警告: 无效的日志级别 '{loglevel}',使用默认级别 DEBUG") - numeric_level = logging.DEBUG else: numeric_level = loglevel - root_logger.setLevel(numeric_level) - else: - root_logger.setLevel(logging.DEBUG) # 默认级别 # 移除已存在的处理器 for handler in root_logger.handlers[:]: @@ -186,7 +183,7 @@ def configure_logger(loglevel=None, working_dir=None): # 创建控制台处理器 console_handler = logging.StreamHandler() - console_handler.setLevel(root_logger.level) # 使用与根记录器相同的级别 + console_handler.setLevel(numeric_level) # 使用与根记录器相同的级别 # 使用自定义的颜色格式化器 color_formatter = ColoredFormatter() @@ -206,7 +203,7 @@ def configure_logger(loglevel=None, working_dir=None): # 创建文件处理器 file_handler = logging.FileHandler(log_filepath, encoding="utf-8") - file_handler.setLevel(root_logger.level) + file_handler.setLevel(TRACE_LEVEL) # 使用不带颜色的格式化器 file_formatter = ColoredFormatter(use_colors=False)