支持通过导入方式补全注册表,新增工作流unilabos_device_id字段

This commit is contained in:
Xuwznln
2025-06-28 01:19:54 +08:00
parent bbc49e9aab
commit 15f3f8518b
17 changed files with 4457 additions and 52 deletions

View File

@@ -190,7 +190,9 @@ class Registry:
logger.error(f"[UniLab Registry] 无法找到类型 '{type_name}' 用于设备 {device_id}{field_name}")
sys.exit(1)
def _generate_unilab_json_command_schema(self, method_args: List[Dict[str, Any]], method_name: str) -> Dict[str, Any]:
def _generate_unilab_json_command_schema(
self, method_args: List[Dict[str, Any]], method_name: str
) -> Dict[str, Any]:
"""
根据UniLabJsonCommand方法信息生成JSON Schema暂不支持嵌套类型
@@ -302,6 +304,7 @@ class Registry:
for status_name, status_type in device_config["class"]["status_types"].items():
if status_type in ["Any", "None"]:
status_type = "String" # 替换成ROS的String便于显示
device_config["class"]["status_types"][status_name] = status_type
target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}")
status_str_type_mapping[status_type] = target_type
device_config["class"]["status_types"] = dict(
@@ -322,7 +325,9 @@ class Registry:
for k, v in enhanced_info["action_methods"].items()
}
)
device_config["init_param_schema"] = self._generate_unilab_json_command_schema(enhanced_info["init_params"], "__init__")
device_config["init_param_schema"] = self._generate_unilab_json_command_schema(
enhanced_info["init_params"], "__init__"
)
device_config.pop("schema", None)
device_config["class"]["action_value_mappings"] = dict(
sorted(device_config["class"]["action_value_mappings"].items())
@@ -398,7 +403,28 @@ class Registry:
def obtain_registry_device_info(self):
devices = []
for device_id, device_info in self.device_type_registry.items():
msg = {"id": device_id, **device_info}
device_info_copy = copy.deepcopy(device_info)
if "class" in device_info_copy and "action_value_mappings" in device_info_copy["class"]:
action_mappings = device_info_copy["class"]["action_value_mappings"]
for action_name, action_config in action_mappings.items():
if "schema" in action_config and action_config["schema"]:
schema = action_config["schema"]
# 确保schema结构存在
if (
"properties" in schema
and "goal" in schema["properties"]
and "properties" in schema["properties"]["goal"]
):
schema["properties"]["goal"]["properties"] = {
"unilabos_device_id": {
"type": "string",
"default": "",
"description": "UniLabOS设备ID用于指定执行动作的具体设备实例",
},
**schema["properties"]["goal"]["properties"],
}
msg = {"id": device_id, **device_info_copy}
devices.append(msg)
return devices