匹配init param schema格式

This commit is contained in:
Xuwznln
2025-06-30 12:29:30 +08:00
parent f2753fc69a
commit e7521972e4
22 changed files with 2261 additions and 2609 deletions

View File

@@ -197,6 +197,60 @@ class Registry:
logger.error(f"[UniLab Registry] 无法找到类型 '{type_name}' 用于设备 {device_id}{field_name}")
sys.exit(1)
def _generate_schema_from_info(
self,
param_name: str,
param_type: str,
param_default: Any,
) -> Dict[str, Any]:
"""
根据参数信息生成JSON Schema
"""
prop_schema = {}
# 根据类型设置schema FIXME 不完整
if param_type:
param_type_lower = param_type.lower()
if param_type_lower in ["str", "string"]:
prop_schema["type"] = "string"
elif param_type_lower in ["int", "integer"]:
prop_schema["type"] = "integer"
elif param_type_lower in ["float", "number"]:
prop_schema["type"] = "number"
elif param_type_lower in ["bool", "boolean"]:
prop_schema["type"] = "boolean"
elif param_type_lower in ["list", "array"]:
prop_schema["type"] = "array"
elif param_type_lower in ["dict", "object"]:
prop_schema["type"] = "object"
else:
# 默认为字符串类型
prop_schema["type"] = "string"
else:
# 如果没有类型信息,默认为字符串
prop_schema["type"] = "string"
# 设置默认值
if param_default is not None:
prop_schema["default"] = param_default
return prop_schema
def _generate_status_types_schema(self, status_types: Dict[str, Any]) -> Dict[str, Any]:
"""
根据状态类型生成JSON Schema
"""
status_schema = {
"type": "object",
"properties": {},
"required": [],
}
for status_name, status_type in status_types.items():
status_schema["properties"][status_name] = self._generate_schema_from_info(
status_name, status_type["return_type"], None
)
status_schema["required"].append(status_name)
return status_schema
def _generate_unilab_json_command_schema(
self, method_args: List[Dict[str, Any]], method_name: str
) -> Dict[str, Any]:
@@ -211,54 +265,24 @@ class Registry:
JSON Schema格式的参数schema
"""
schema = {
"description": f"UniLabJsonCommand {method_name} 的参数schema",
"type": "object",
"properties": {},
"required": [],
}
for arg_info in method_args:
param_name = arg_info.get("name", "")
param_type = arg_info.get("type")
param_type = arg_info.get("type", "")
param_default = arg_info.get("default")
param_required = arg_info.get("required", True)
prop_schema = {"description": f"参数: {param_name}"}
# 根据类型设置schema FIXME 不完整
if param_type:
param_type_lower = param_type.lower()
if param_type_lower in ["str", "string"]:
prop_schema["type"] = "string"
elif param_type_lower in ["int", "integer"]:
prop_schema["type"] = "integer"
elif param_type_lower in ["float", "number"]:
prop_schema["type"] = "number"
elif param_type_lower in ["bool", "boolean"]:
prop_schema["type"] = "boolean"
elif param_type_lower in ["list", "array"]:
prop_schema["type"] = "array"
elif param_type_lower in ["dict", "object"]:
prop_schema["type"] = "object"
else:
# 默认为字符串类型
prop_schema["type"] = "string"
else:
# 如果没有类型信息,默认为字符串
prop_schema["type"] = "string"
# 设置默认值
if param_default is not None:
prop_schema["default"] = param_default
schema["properties"][param_name] = prop_schema
# 如果是必需参数添加到required列表
schema["properties"][param_name] = self._generate_schema_from_info(
param_name, param_type, param_default
)
if param_required:
schema["required"].append(param_name)
return {
"title": f"{method_name} 命令参数",
"description": f"UniLabJsonCommand {method_name} 的参数schema",
"title": f"{method_name}参数",
"description": f"{method_name}的参数schema",
"type": "object",
"properties": {"goal": schema, "feedback": {}, "result": {}},
"required": ["goal"],
@@ -314,14 +338,21 @@ class Registry:
status_type = "String" # 替换成ROS的String便于显示
device_config["class"]["status_types"][status_name] = status_type
target_type = self._replace_type_with_class(status_type, device_id, f"状态 {status_name}")
if target_type in [dict, list]: # 对于嵌套类型返回的对象,暂时处理成字符串,无法直接进行转换
if target_type in [
dict,
list,
]: # 对于嵌套类型返回的对象,暂时处理成字符串,无法直接进行转换
target_type = String
status_str_type_mapping[status_type] = target_type
device_config["class"]["status_types"] = dict(
sorted(device_config["class"]["status_types"].items())
)
if complete_registry:
device_config["class"]["action_value_mappings"] = {k:v for k, v in device_config["class"]["action_value_mappings"].items() if not k.startswith("auto-")}
device_config["class"]["action_value_mappings"] = {
k: v
for k, v in device_config["class"]["action_value_mappings"].items()
if not k.startswith("auto-")
}
# 处理动作值映射
device_config["class"]["action_value_mappings"].update(
{
@@ -337,9 +368,14 @@ class Registry:
for k, v in enhanced_info["action_methods"].items()
}
)
device_config["init_param_schema"] = self._generate_unilab_json_command_schema(
device_config["init_param_schema"] = {}
device_config["init_param_schema"]["config"] = self._generate_unilab_json_command_schema(
enhanced_info["init_params"], "__init__"
)["properties"]["goal"]
device_config["init_param_schema"]["data"] = self._generate_status_types_schema(
enhanced_info["status_methods"]
)
device_config.pop("schema", None)
device_config["class"]["action_value_mappings"] = dict(
sorted(device_config["class"]["action_value_mappings"].items())