fix host_node test_resource error

This commit is contained in:
Xuwznln
2025-10-11 03:04:15 +08:00
parent 4b43734b55
commit 93f0e08d75

View File

@@ -148,7 +148,7 @@ class Registry:
"goal_default": {}, "goal_default": {},
"handles": {}, "handles": {},
}, },
"test_resource": { "auto-test_resource": {
"type": "UniLabJsonCommand", "type": "UniLabJsonCommand",
"goal": {}, "goal": {},
"feedback": {}, "feedback": {},
@@ -162,27 +162,21 @@ class Registry:
"resource": ros_message_to_json_schema(Resource, "resource"), "resource": ros_message_to_json_schema(Resource, "resource"),
"resources": { "resources": {
"items": { "items": {
"properties": ros_message_to_json_schema(Resource, "resources"), "properties": ros_message_to_json_schema(
"type": "object" Resource, "resources"
),
"type": "object",
}, },
"type": "array" "type": "array",
}, },
"device": { "device": {"type": "string"},
"type": "string" "devices": {"items": {"type": "string"}, "type": "array"},
}, },
"devices": { "type": "object",
"items": {
"type": "string"
}, },
"type": "array" "result": {},
}, },
}, "title": "test_resource",
"type": "object"
},
"result": {}
},
"required": ["goal"],
"title": "transfer_resource_to_another参数",
"type": "object", "type": "object",
}, },
"placeholder_keys": { "placeholder_keys": {
@@ -207,6 +201,8 @@ class Registry:
} }
} }
) )
# 为host_node添加内置的驱动命令动作
self._add_builtin_actions(self.device_type_registry["host_node"], "host_node")
logger.trace(f"[UniLab Registry] ----------Setup----------") logger.trace(f"[UniLab Registry] ----------Setup----------")
self.registry_paths = [Path(path).absolute() for path in self.registry_paths] self.registry_paths = [Path(path).absolute() for path in self.registry_paths]
for i, path in enumerate(self.registry_paths): for i, path in enumerate(self.registry_paths):
@@ -501,6 +497,43 @@ class Registry:
"required": ["goal"], "required": ["goal"],
} }
def _add_builtin_actions(self, device_config: Dict[str, Any], device_id: str):
"""
为设备配置添加内置的执行驱动命令动作
Args:
device_config: 设备配置字典
device_id: 设备ID
"""
from unilabos.app.web.utils.action_utils import get_yaml_from_goal_type
if "class" not in device_config:
return
if "action_value_mappings" not in device_config["class"]:
device_config["class"]["action_value_mappings"] = {}
for additional_action in ["_execute_driver_command", "_execute_driver_command_async"]:
device_config["class"]["action_value_mappings"][additional_action] = {
"type": self._replace_type_with_class("StrSingleInput", device_id, f"动作 {additional_action}"),
"goal": {"string": "string"},
"feedback": {},
"result": {},
"schema": ros_action_to_json_schema(
self._replace_type_with_class("StrSingleInput", device_id, f"动作 {additional_action}")
),
"goal_default": yaml.safe_load(
io.StringIO(
get_yaml_from_goal_type(
self._replace_type_with_class(
"StrSingleInput", device_id, f"动作 {additional_action}"
).Goal
)
)
),
"handles": {},
}
def load_device_types(self, path: os.PathLike, complete_registry: bool): def load_device_types(self, path: os.PathLike, complete_registry: bool):
# return # return
abs_path = Path(path).absolute() abs_path = Path(path).absolute()
@@ -676,30 +709,8 @@ class Registry:
device_config["class"]["status_types"][status_name] = status_str_type_mapping[status_type] device_config["class"]["status_types"][status_name] = status_str_type_mapping[status_type]
for action_name, action_config in device_config["class"]["action_value_mappings"].items(): for action_name, action_config in device_config["class"]["action_value_mappings"].items():
action_config["type"] = action_str_type_mapping[action_config["type"]] action_config["type"] = action_str_type_mapping[action_config["type"]]
for additional_action in ["_execute_driver_command", "_execute_driver_command_async"]: # 添加内置的驱动命令动作
device_config["class"]["action_value_mappings"][additional_action] = { self._add_builtin_actions(device_config, device_id)
"type": self._replace_type_with_class(
"StrSingleInput", device_id, f"动作 {additional_action}"
),
"goal": {"string": "string"},
"feedback": {},
"result": {},
"schema": ros_action_to_json_schema(
self._replace_type_with_class(
"StrSingleInput", device_id, f"动作 {additional_action}"
)
),
"goal_default": yaml.safe_load(
io.StringIO(
get_yaml_from_goal_type(
self._replace_type_with_class(
"StrSingleInput", device_id, f"动作 {additional_action}"
).Goal
)
)
),
"handles": {},
}
device_config["file_path"] = str(file.absolute()).replace("\\", "/") device_config["file_path"] = str(file.absolute()).replace("\\", "/")
device_config["registry_type"] = "device" device_config["registry_type"] = "device"
logger.trace( # type: ignore logger.trace( # type: ignore