增加注册表版本参数,支持将auto-指令人工检查后非auto,不生成人工已检查的指令,取消不必要的description生成

This commit is contained in:
Xuwznln
2025-07-16 01:05:16 +08:00
parent 10cb645191
commit f9aae44174
24 changed files with 247 additions and 4286 deletions

View File

@@ -2,6 +2,8 @@ import copy
import io
import os
import sys
import inspect
import importlib
from pathlib import Path
from typing import Any, Dict, List
@@ -99,6 +101,12 @@ class Registry:
}
]
},
# todo: support nested keys, switch to non ros message schema
"placeholder_keys": {
"res_id": "unilabos_resources", # 将当前实验室的全部物料id作为下拉框可选择
"device_id": "unilabos_devices", # 将当前实验室的全部设备id作为下拉框可选择
"parent": "unilabos_devices", # 将当前实验室的全部设备id作为下拉框可选择
},
},
"test_latency": {
"type": self.EmptyIn,
@@ -161,6 +169,54 @@ class Registry:
else:
logger.debug(f"[UniLab Registry] Res File-{i+1}/{len(files)} Not Valid YAML File: {file.absolute()}")
def _extract_class_docstrings(self, module_string: str) -> Dict[str, str]:
"""
从模块字符串中提取类和方法的docstring信息
Args:
module_string: 模块字符串,格式为 "module.path:ClassName"
Returns:
包含类和方法docstring信息的字典
"""
docstrings = {"class_docstring": "", "methods": {}}
if not module_string or ":" not in module_string:
return docstrings
try:
module_path, class_name = module_string.split(":", 1)
# 动态导入模块
module = importlib.import_module(module_path)
# 获取类
if hasattr(module, class_name):
cls = getattr(module, class_name)
# 获取类的docstring
class_doc = inspect.getdoc(cls)
if class_doc:
docstrings["class_docstring"] = class_doc.strip()
# 获取所有方法的docstring
for method_name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
method_doc = inspect.getdoc(method)
if method_doc:
docstrings["methods"][method_name] = method_doc.strip()
# 也获取属性方法的docstring
for method_name, method in inspect.getmembers(cls, predicate=lambda x: isinstance(x, property)):
if hasattr(method, "fget") and method.fget:
method_doc = inspect.getdoc(method.fget)
if method_doc:
docstrings["methods"][method_name] = method_doc.strip()
except Exception as e:
logger.warning(f"[UniLab Registry] 无法提取docstring信息模块: {module_string}, 错误: {str(e)}")
return docstrings
def _replace_type_with_class(self, type_name: str, device_id: str, field_name: str) -> Any:
"""
将类型名称替换为实际的类对象
@@ -274,15 +330,13 @@ class Registry:
param_type = arg_info.get("type", "")
param_default = arg_info.get("default")
param_required = arg_info.get("required", True)
schema["properties"][param_name] = self._generate_schema_from_info(
param_name, param_type, param_default
)
schema["properties"][param_name] = self._generate_schema_from_info(param_name, param_type, param_default)
if param_required:
schema["required"].append(param_name)
return {
"title": f"{method_name}参数",
"description": f"{method_name}的参数schema",
"description": f"",
"type": "object",
"properties": {"goal": schema, "feedback": {}, "result": {}},
"required": ["goal"],
@@ -348,6 +402,14 @@ class Registry:
sorted(device_config["class"]["status_types"].items())
)
if complete_registry:
# 保存原有的description信息
old_descriptions = {}
for action_name, action_config in device_config["class"]["action_value_mappings"].items():
if "description" in action_config.get("schema", {}):
description = action_config["schema"]["description"]
if len(description):
old_descriptions[action_name] = action_config["schema"]["description"]
device_config["class"]["action_value_mappings"] = {
k: v
for k, v in device_config["class"]["action_value_mappings"].items()
@@ -366,9 +428,14 @@ class Registry:
"handles": [],
}
# 不生成已配置action的动作
for k, v in enhanced_info["action_methods"].items() if k not in device_config["class"]["action_value_mappings"]
for k, v in enhanced_info["action_methods"].items()
if k not in device_config["class"]["action_value_mappings"]
}
)
# 恢复原有的description信息auto开头的不修改
for action_name, description in old_descriptions.items():
device_config["class"]["action_value_mappings"][action_name]["schema"]["description"] = description
device_config["init_param_schema"] = {}
device_config["init_param_schema"]["config"] = self._generate_unilab_json_command_schema(
enhanced_info["init_params"], "__init__"
@@ -432,6 +499,8 @@ class Registry:
}
if "registry_type" not in device_config:
device_config["registry_type"] = "device"
if "version" not in device_config:
device_config["version"] = "0.0.1"
device_config["file_path"] = str(file.absolute()).replace("\\", "/")
device_config["registry_type"] = "device"
logger.debug(
@@ -472,6 +541,13 @@ class Registry:
},
**schema["properties"]["goal"]["properties"],
}
# 将 placeholder_keys 信息添加到 schema 中
if "placeholder_keys" in action_config and action_config.get("schema", {}).get(
"properties", {}
).get("goal", {}):
action_config["schema"]["properties"]["goal"]["_unilabos_placeholder_info"] = action_config[
"placeholder_keys"
]
msg = {"id": device_id, **device_info_copy}
devices.append(msg)