From 93f0e08d7504c95ebbdafea9fad931d3b7a980dd Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sat, 11 Oct 2025 03:04:15 +0800 Subject: [PATCH] fix host_node test_resource error --- unilabos/registry/registry.py | 93 ++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 2cc5dc28..97018d4e 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -148,7 +148,7 @@ class Registry: "goal_default": {}, "handles": {}, }, - "test_resource": { + "auto-test_resource": { "type": "UniLabJsonCommand", "goal": {}, "feedback": {}, @@ -162,27 +162,21 @@ class Registry: "resource": ros_message_to_json_schema(Resource, "resource"), "resources": { "items": { - "properties": ros_message_to_json_schema(Resource, "resources"), - "type": "object" + "properties": ros_message_to_json_schema( + Resource, "resources" + ), + "type": "object", }, - "type": "array" - }, - "device": { - "type": "string" - }, - "devices": { - "items": { - "type": "string" - }, - "type": "array" + "type": "array", }, + "device": {"type": "string"}, + "devices": {"items": {"type": "string"}, "type": "array"}, }, - "type": "object" + "type": "object", }, - "result": {} + "result": {}, }, - "required": ["goal"], - "title": "transfer_resource_to_another参数", + "title": "test_resource", "type": "object", }, "placeholder_keys": { @@ -207,6 +201,8 @@ class Registry: } } ) + # 为host_node添加内置的驱动命令动作 + self._add_builtin_actions(self.device_type_registry["host_node"], "host_node") logger.trace(f"[UniLab Registry] ----------Setup----------") self.registry_paths = [Path(path).absolute() for path in self.registry_paths] for i, path in enumerate(self.registry_paths): @@ -501,6 +497,43 @@ class Registry: "required": ["goal"], } + def _add_builtin_actions(self, device_config: Dict[str, Any], device_id: str): + """ + 为设备配置添加内置的执行驱动命令动作 + + Args: + device_config: 设备配置字典 + device_id: 设备ID + """ + from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type + + if "class" not in device_config: + return + + if "action_value_mappings" not in device_config["class"]: + device_config["class"]["action_value_mappings"] = {} + + for additional_action in ["_execute_driver_command", "_execute_driver_command_async"]: + device_config["class"]["action_value_mappings"][additional_action] = { + "type": self._replace_type_with_class("StrSingleInput", device_id, f"动作 {additional_action}"), + "goal": {"string": "string"}, + "feedback": {}, + "result": {}, + "schema": ros_action_to_json_schema( + self._replace_type_with_class("StrSingleInput", device_id, f"动作 {additional_action}") + ), + "goal_default": yaml.safe_load( + io.StringIO( + get_yaml_from_goal_type( + self._replace_type_with_class( + "StrSingleInput", device_id, f"动作 {additional_action}" + ).Goal + ) + ) + ), + "handles": {}, + } + def load_device_types(self, path: os.PathLike, complete_registry: bool): # return abs_path = Path(path).absolute() @@ -676,30 +709,8 @@ class Registry: device_config["class"]["status_types"][status_name] = status_str_type_mapping[status_type] for action_name, action_config in device_config["class"]["action_value_mappings"].items(): action_config["type"] = action_str_type_mapping[action_config["type"]] - for additional_action in ["_execute_driver_command", "_execute_driver_command_async"]: - device_config["class"]["action_value_mappings"][additional_action] = { - "type": self._replace_type_with_class( - "StrSingleInput", device_id, f"动作 {additional_action}" - ), - "goal": {"string": "string"}, - "feedback": {}, - "result": {}, - "schema": ros_action_to_json_schema( - self._replace_type_with_class( - "StrSingleInput", device_id, f"动作 {additional_action}" - ) - ), - "goal_default": yaml.safe_load( - io.StringIO( - get_yaml_from_goal_type( - self._replace_type_with_class( - "StrSingleInput", device_id, f"动作 {additional_action}" - ).Goal - ) - ) - ), - "handles": {}, - } + # 添加内置的驱动命令动作 + self._add_builtin_actions(device_config, device_id) device_config["file_path"] = str(file.absolute()).replace("\\", "/") device_config["registry_type"] = "device" logger.trace( # type: ignore