update registry with nested obj

This commit is contained in:
Xuwznln
2025-09-19 03:44:18 +08:00
parent e5006285df
commit 01f8816597
14 changed files with 144 additions and 81 deletions

View File

@@ -5,7 +5,7 @@ import sys
import inspect
import importlib
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Union, Tuple
import yaml
@@ -294,7 +294,7 @@ class Registry:
logger.warning(f"[UniLab Registry] 设备 {device_id}{field_name} 类型为空,跳过替换")
return type_name
convert_manager = { # 将python基本对象转为ros2基本对象
"str": "String",
"builtins:str": "String",
"bool": "Bool",
"int": "Int64",
"float": "Float64",
@@ -310,37 +310,73 @@ class Registry:
logger.error(f"[UniLab Registry] 无法找到类型 '{type_name}' 用于设备 {device_id}{field_name}")
sys.exit(1)
def _get_json_schema_type(self, type_str: str) -> str:
"""
根据类型字符串返回对应的JSON Schema类型
Args:
type_str: 类型字符串
Returns:
JSON Schema类型字符串
"""
type_lower = type_str.lower()
type_mapping = {
("str", "string"): "string",
("int", "integer"): "integer",
("float", "number"): "number",
("bool", "boolean"): "boolean",
("list", "array"): "array",
("dict", "object"): "object",
}
# 遍历映射找到匹配的类型
for type_variants, json_type in type_mapping.items():
if type_lower in type_variants:
return json_type
# 特殊处理包含冒号的类型如ROS消息类型
if ":" in type_lower:
return "object"
# 默认返回字符串类型
return "string"
def _generate_schema_from_info(
self,
param_name: str,
param_type: str,
param_type: Union[str, Tuple[str]],
param_default: Any,
) -> Dict[str, Any]:
"""
根据参数信息生成JSON Schema
"""
prop_schema = {}
# 根据类型设置schema FIXME 不完整
if param_type:
param_type_lower = param_type.lower()
if param_type_lower in ["str", "string"]:
prop_schema["type"] = "string"
elif param_type_lower in ["int", "integer"]:
prop_schema["type"] = "integer"
elif param_type_lower in ["float", "number"]:
prop_schema["type"] = "number"
elif param_type_lower in ["bool", "boolean"]:
prop_schema["type"] = "boolean"
elif param_type_lower in ["list", "array"]:
prop_schema["type"] = "array"
elif param_type_lower in ["dict", "object"]:
prop_schema["type"] = "object"
# 处理嵌套类型Tuple[str]
if isinstance(param_type, tuple):
if len(param_type) == 2:
outer_type, inner_type = param_type
outer_json_type = self._get_json_schema_type(outer_type)
inner_json_type = self._get_json_schema_type(inner_type)
prop_schema["type"] = outer_json_type
# 根据外层类型设置内层类型信息
if outer_json_type == "array":
prop_schema["items"] = {"type": inner_json_type}
elif outer_json_type == "object":
prop_schema["additionalProperties"] = {"type": inner_json_type}
else:
# 默认为字符串类型
# 不是标准的嵌套类型,默认为字符串
prop_schema["type"] = "string"
else:
# 如果没有类型信息,默认为字符串
prop_schema["type"] = "string"
# 处理非嵌套类型
if param_type:
prop_schema["type"] = self._get_json_schema_type(param_type)
else:
# 如果没有类型信息,默认为字符串
prop_schema["type"] = "string"
# 设置默认值
if param_default is not None:
@@ -456,7 +492,7 @@ class Registry:
{k: v["return_type"] for k, v in enhanced_info["status_methods"].items()}
)
for status_name, status_type in device_config["class"]["status_types"].items():
if status_type in ["Any", "None", "Unknown"]:
if isinstance(status_type, tuple) or status_type in ["Any", "None", "Unknown"]:
status_type = "String" # 替换成ROS的String便于显示
device_config["class"]["status_types"][status_name] = status_type
target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}")