From cdf0652020c4b7116c26f5f6b0d9d7d34b69d348 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 10 Feb 2026 15:18:41 +0800 Subject: [PATCH] add test mode --- unilabos/app/main.py | 9 ++++ unilabos/config/config.py | 1 + unilabos/ros/nodes/presets/host_node.py | 57 +++++++++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index cc306b7..c652757 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -171,6 +171,12 @@ def parse_args(): action="store_true", help="Disable sending update feedback to server", ) + parser.add_argument( + "--test_mode", + action="store_true", + default=False, + help="Test mode: all actions simulate execution and return mock results without running real hardware", + ) # workflow upload subcommand workflow_parser = subparsers.add_parser( "workflow_upload", @@ -348,6 +354,9 @@ def main(): BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.upload_registry = args_dict.get("upload_registry", False) BasicConfig.no_update_feedback = args_dict.get("no_update_feedback", False) + BasicConfig.test_mode = args_dict.get("test_mode", False) + if BasicConfig.test_mode: + print_status("启用测试模式:所有动作将模拟执行,不调用真实硬件", "warning") BasicConfig.communication_protocol = "websocket" machine_name = os.popen("hostname").read().strip() machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 9677b21..4b7d91a 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -23,6 +23,7 @@ class BasicConfig: disable_browser = False # 禁止浏览器自动打开 port = 8002 # 本地HTTP服务 check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性 + test_mode = False # 测试模式,所有动作不实际执行,返回模拟结果 # 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG" diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index f05bf0c..c671549 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -51,6 +51,7 @@ from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info +from unilabos.config.config import BasicConfig if TYPE_CHECKING: from unilabos.app.ws_client import QueueItem @@ -776,6 +777,17 @@ class HostNode(BaseROS2DeviceNode): u = uuid.UUID(item.job_id) device_id = item.device_id action_name = item.action_name + + if BasicConfig.test_mode: + action_id = f"/devices/{device_id}/{action_name}" + self.lab_logger().info( + f"[TEST MODE] 模拟执行: {action_id} (job={item.job_id[:8]}), 参数: {str(action_kwargs)[:500]}" + ) + # 根据注册表 handles 构建模拟返回值 + mock_return = self._build_test_mode_return(device_id, action_name, action_kwargs) + self._handle_test_mode_result(item, action_id, mock_return) + return + if action_type.startswith("UniLabJsonCommand"): if action_name.startswith("auto-"): action_name = action_name[5:] @@ -813,6 +825,51 @@ class HostNode(BaseROS2DeviceNode): ) future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f)) + def _build_test_mode_return( + self, device_id: str, action_name: str, action_kwargs: Dict[str, Any] + ) -> Dict[str, Any]: + """ + 根据注册表 handles 的 output 定义构建测试模式的模拟返回值 + + 根据 data_key 中 @flatten 的层数决定嵌套数组层数,叶子值为空字典。 + 例如: "vessel" → {}, "plate.@flatten" → [{}], "a.@flatten.@flatten" → [[{}]] + """ + mock_return: Dict[str, Any] = {"test_mode": True, "action_name": action_name} + action_mappings = self._action_value_mappings.get(device_id, {}) + action_mapping = action_mappings.get(action_name, {}) + handles = action_mapping.get("handles", {}) + if isinstance(handles, dict): + for output_handle in handles.get("output", []): + data_key = output_handle.get("data_key", "") + handler_key = output_handle.get("handler_key", "") + # 根据 @flatten 层数构建嵌套数组,叶子为空字典 + flatten_count = data_key.count("@flatten") + value: Any = {} + for _ in range(flatten_count): + value = [value] + mock_return[handler_key] = value + return mock_return + + def _handle_test_mode_result( + self, item: "QueueItem", action_id: str, mock_return: Dict[str, Any] + ) -> None: + """ + 测试模式下直接构建结果并走正常的结果回调流程(跳过 ROS) + """ + job_id = item.job_id + status = "success" + return_info = serialize_result_info("", True, mock_return) + + self.lab_logger().info(f"[TEST MODE] Result for {action_id} ({job_id[:8]}): {status}") + + from unilabos.app.web.controller import store_job_result + store_job_result(job_id, status, return_info, mock_return) + + # 发布状态到桥接器 + for bridge in self.bridges: + if hasattr(bridge, "publish_job_status"): + bridge.publish_job_status(mock_return, item, status, return_info) + def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: """目标响应回调""" goal_handle = future.result()