注册表编辑器

This commit is contained in:
Xuwznln
2025-09-07 20:57:48 +08:00
parent c25283ae04
commit 361eae2f6d
5 changed files with 571 additions and 103 deletions

View File

@@ -7,6 +7,8 @@ API模块
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import asyncio
import yaml
from unilabos.app.controler import devices, job_add, job_info
from unilabos.app.model import (
Resp,
@@ -19,6 +21,8 @@ from unilabos.app.model import (
JobFinishReq,
)
from unilabos.app.web.utils.host_utils import get_host_node_info
from unilabos.registry.registry import lab_registry
from unilabos.utils.type_check import NoAliasDumper
# 创建API路由器
api = APIRouter()
@@ -603,6 +607,7 @@ async def handle_file_content_import(websocket: WebSocket, request_data: dict):
file_size = request_data.get("file_size", 0)
registry_type = request_data.get("registry_type", "device")
class_name = request_data.get("class_name")
module_prefix = request_data.get("module_prefix", "")
async def send_log(message: str, level: str = "info"):
"""发送日志消息到客户端"""
@@ -656,7 +661,12 @@ async def handle_file_content_import(websocket: WebSocket, request_data: dict):
# 确定模块名
module_name = file_name.replace(".py", "").replace("-", "_").replace(" ", "_")
# 如果有 module_prefix则使用完整的模块路径
full_module_name = f"{module_prefix}.{module_name}" if module_prefix else module_name
await send_log(f"使用模块名: {module_name}")
if module_prefix:
await send_log(f"完整模块路径: {full_module_name}")
# 导入模块
try:
@@ -697,7 +707,7 @@ async def handle_file_content_import(websocket: WebSocket, request_data: dict):
from unilabos.utils.import_manager import get_enhanced_class_info
# 分析类信息
enhanced_info = get_enhanced_class_info(f"{module_name}:{class_name}", use_dynamic=True)
enhanced_info = get_enhanced_class_info(f"{full_module_name}:{class_name}", use_dynamic=True)
if not enhanced_info.get("dynamic_import_success", False):
await send_error("动态导入类信息失败")
@@ -705,47 +715,102 @@ async def handle_file_content_import(websocket: WebSocket, request_data: dict):
await send_log("成功分析类信息")
# 生成注册表schema
registry_schema = {
"class_name": class_name,
"module": f"{module_name}:{class_name}",
"type": "python",
"description": enhanced_info.get("class_docstring", ""),
"version": "1.0.0",
"category": [registry_type],
"status_types": {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()},
"action_value_mappings": {},
"init_param_schema": {},
"registry_type": registry_type,
"file_path": f"uploaded_file://{file_name}",
}
# 处理动作方法
for method_name, method_info in enhanced_info["action_methods"].items():
registry_schema["action_value_mappings"][f"auto-{method_name}"] = {
"type": "UniLabJsonCommandAsync" if method_info["is_async"] else "UniLabJsonCommand",
"goal": {},
"feedback": {},
"result": {},
"args": method_info["args"],
"description": method_info.get("docstring", ""),
# 根据注册表类型生成不同的schema
if registry_type == "resource":
# 资源类型的简单结构
category_name = file_name.replace(".py", "") if file_name else "unknown"
registry_schema = {
"description": enhanced_info.get("class_docstring", ""),
"category": [category_name],
"class": {
"module": f"{full_module_name}:{class_name}",
"type": "python",
},
"handles": [],
"icon": "",
"init_param_schema": {},
"registry_type": "resource",
"version": "1.0.0",
"file_path": f"uploaded_file://{file_name}",
}
else:
# 设备类型的复杂结构
registry_schema = {
"description": enhanced_info.get("class_docstring", ""),
"class": {
"module": f"{full_module_name}:{class_name}",
"type": "python",
"status_types": {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()},
"action_value_mappings": {},
},
"version": "1.0.0",
"handles": [],
"init_param_schema": {},
"registry_type": "device",
"file_path": f"uploaded_file://{file_name}",
}
# 处理动作方法(仅对设备类型)
for method_name, method_info in enhanced_info["action_methods"].items():
registry_schema["class"]["action_value_mappings"][f"auto-{method_name}"] = {
"type": "UniLabJsonCommandAsync" if method_info["is_async"] else "UniLabJsonCommand",
"goal": {},
"feedback": {},
"result": {},
"args": method_info["args"],
"description": method_info.get("docstring", ""),
}
await send_log("成功生成注册表schema")
# 格式化状态方法信息
status_info = {}
for status_name, status_data in enhanced_info.get("status_methods", {}).items():
status_info[status_name] = {
"return_type": status_data.get("return_type", "未知类型"),
"docstring": status_data.get("docstring", "无描述"),
"is_property": status_data.get("is_property", False),
}
# 格式化动作方法信息
action_info = {}
for action_name, action_data in enhanced_info.get("action_methods", {}).items():
args = action_data.get("args", [])
action_info[action_name] = {
"param_count": len(args),
"params": [
{"name": arg.get("name", ""), "type": arg.get("type", ""), "default": arg.get("default")}
for arg in args
],
"is_async": action_data.get("is_async", False),
"docstring": action_data.get("docstring", "无描述"),
"return_suggestion": "建议返回字典类型 (dict) 以便更好地结构化结果数据",
}
# 准备结果数据
result = {
"class_info": {
"class_name": class_name,
"module_name": module_name,
"module_prefix": module_prefix,
"full_module_name": full_module_name,
"file_name": file_name,
"file_size": file_size,
"docstring": enhanced_info.get("class_docstring", ""),
"dynamic_import_success": enhanced_info.get("dynamic_import_success", False),
"registry_type": registry_type,
},
"registry_schema": registry_schema,
"action_methods": enhanced_info["action_methods"],
"status_methods": enhanced_info["status_methods"],
"class_analysis": {
"status_methods": status_info,
"action_methods": action_info,
"init_params": enhanced_info.get("init_params", []),
"status_methods_count": len(status_info),
"action_methods_count": len(action_info),
},
# 保持向后兼容
"action_methods": enhanced_info.get("action_methods", {}),
"status_methods": enhanced_info.get("status_methods", {}),
}
# 发送结果
@@ -794,6 +859,11 @@ async def handle_file_import(websocket: WebSocket, request_data: dict):
registry_type = request_data.get("registry_type", "device")
class_name = request_data.get("class_name")
module_name = request_data.get("module_name")
description = request_data.get("description", "")
safe_class_name = request_data.get("safe_class_name", "")
icon = request_data.get("icon", "")
module_prefix = request_data.get("module_prefix", "")
handles = request_data.get("handles", [])
async def send_log(message: str, level: str = "info"):
"""发送日志消息到客户端"""
@@ -862,7 +932,12 @@ async def handle_file_import(websocket: WebSocket, request_data: dict):
# 确定模块名
if not module_name:
module_name = full_file_path.stem
# 如果有 module_prefix则使用完整的模块路径
full_module_name = f"{module_prefix}.{module_name}" if module_prefix else module_name
await send_log(f"使用模块名: {module_name}")
if module_prefix:
await send_log(f"完整模块路径: {full_module_name}")
# 导入模块
try:
@@ -930,7 +1005,7 @@ async def handle_file_import(websocket: WebSocket, request_data: dict):
from unilabos.utils.import_manager import get_enhanced_class_info
# 分析类信息
enhanced_info = get_enhanced_class_info(f"{module_name}:{target_class_name}", use_dynamic=True)
enhanced_info = get_enhanced_class_info(f"{full_module_name}:{target_class_name}", use_dynamic=True)
if not enhanced_info.get("dynamic_import_success", False):
await send_error("动态导入类信息失败")
@@ -938,55 +1013,124 @@ async def handle_file_import(websocket: WebSocket, request_data: dict):
await send_log("成功分析类信息")
# 生成注册表schema
registry_schema = {
"class_name": target_class_name,
"module": f"{module_name}:{target_class_name}",
"type": "python",
"description": enhanced_info.get("class_docstring", ""),
"version": "1.0.0",
"category": [registry_type],
"status_types": {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()},
"action_value_mappings": {},
"init_param_schema": {},
"registry_type": registry_type,
"file_path": str(full_file_path),
}
# 处理动作方法
for method_name, method_info in enhanced_info["action_methods"].items():
registry_schema["action_value_mappings"][f"auto-{method_name}"] = {
"type": "UniLabJsonCommandAsync" if method_info["is_async"] else "UniLabJsonCommand",
"goal": {},
"feedback": {},
"result": {},
"args": method_info["args"],
"description": method_info.get("docstring", ""),
# 根据注册表类型生成不同的schema
if registry_type == "resource":
# 资源类型的简单结构
category_name = Path(file_path).stem if file_path else "unknown"
registry_schema = {
"description": description or enhanced_info.get("class_docstring", ""),
"category": [category_name],
"class": {
"module": f"{full_module_name}:{target_class_name}",
"type": "python",
},
"handles": handles,
"icon": icon,
"init_param_schema": {},
"registry_type": "resource",
"version": "1.0.0",
}
else:
# 设备类型的复杂结构
registry_schema = {
"description": description or enhanced_info.get("class_docstring", ""),
"class": {
"module": f"{full_module_name}:{target_class_name}",
"type": "python",
"status_types": {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()},
"action_value_mappings": {
f"auto-{k}": {
"type": "UniLabJsonCommandAsync" if v["is_async"] else "UniLabJsonCommand",
"goal": {},
"feedback": {},
"result": {},
"schema": lab_registry._generate_unilab_json_command_schema(v["args"], k),
"goal_default": {i["name"]: i["default"] for i in v["args"]},
"handles": [],
}
# 不生成已配置action的动作
for k, v in enhanced_info["action_methods"].items()
},
},
"version": "1.0.0",
"handles": handles,
"icon": icon,
"init_param_schema": {
"config": lab_registry._generate_unilab_json_command_schema(
enhanced_info["init_params"], "__init__"
)["properties"]["goal"],
"data": lab_registry._generate_status_types_schema(enhanced_info["status_methods"]),
},
"registry_type": "device",
}
await send_log("成功生成注册表schema")
# 转换为YAML格式
import yaml
from unilabos.utils.type_check import NoAliasDumper
# 创建最终的YAML配置使用设备ID作为根键
class_name_safe = class_name or "unknown"
suffix = "_device" if registry_type == "device" else "_resource"
device_id = f"{class_name_safe.lower()}{suffix}"
final_config = {device_id: registry_schema}
# 创建最终的YAML配置使用ID作为根键
if safe_class_name:
item_id = safe_class_name
else:
class_name_safe = (target_class_name or "unknown").lower()
if registry_type == "resource":
# 资源ID通常直接使用类名不加后缀
item_id = class_name_safe
else:
# 设备ID使用类名加_device后缀
item_id = f"{class_name_safe}_device"
final_config = {item_id: registry_schema}
yaml_content = yaml.dump(
final_config, allow_unicode=True, default_flow_style=False, Dumper=NoAliasDumper, sort_keys=True
)
# 准备结果数据只保留YAML结果
# 格式化状态方法信息
status_info = {}
for status_name, status_data in enhanced_info.get("status_methods", {}).items():
status_info[status_name] = {
"return_type": status_data.get("return_type", "未知类型"),
"docstring": status_data.get("docstring", "无描述"),
"is_property": status_data.get("is_property", False),
}
# 格式化动作方法信息
action_info = {}
for action_name, action_data in enhanced_info.get("action_methods", {}).items():
args = action_data.get("args", [])
action_info[action_name] = {
"param_count": len(args),
"params": [
{"name": arg.get("name", ""), "type": arg.get("type", ""), "default": arg.get("default")}
for arg in args
],
"is_async": action_data.get("is_async", False),
"docstring": action_data.get("docstring", "无描述"),
"return_suggestion": "建议返回字典类型 (dict) 以便更好地结构化结果数据",
}
# 准备结果数据(包含详细的类分析信息)
result = {
"registry_schema": yaml_content,
"device_id": device_id,
"class_name": class_name,
"item_id": item_id,
"registry_type": registry_type,
"class_name": target_class_name,
"module_name": module_name,
"file_path": file_path,
"config_params": {
"safe_class_name": safe_class_name or item_id,
"description": description,
"icon": icon,
"module_prefix": module_prefix,
"full_module_name": full_module_name,
"handles_count": len(handles),
"handles": handles,
},
"class_analysis": {
"class_docstring": enhanced_info.get("class_docstring", ""),
"status_methods": status_info,
"action_methods": action_info,
"init_params": enhanced_info.get("init_params", []),
"dynamic_import_success": enhanced_info.get("dynamic_import_success", False),
},
}
# 发送结果
@@ -1034,12 +1178,11 @@ def get_file_browser_data(path: str = ""):
items = []
parent_path = target_path.parent
relative_parent = parent_path.relative_to(working_dir)
items.append(
{
"name": "..",
"type": "directory",
"path": str(relative_parent) if relative_parent != Path(".") else "",
"path": str(parent_path),
"size": 0,
"is_parent": True,
}
@@ -1048,16 +1191,12 @@ def get_file_browser_data(path: str = ""):
# 获取子目录和文件
try:
for item in sorted(target_path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
if item.name.startswith("."): # 跳过隐藏文件
continue
item_type = "directory" if item.is_dir() else "file"
relative_path = item.relative_to(working_dir)
item_info = {
"name": item.name,
"type": item_type,
"path": str(relative_path),
"path": str(item),
"size": item.stat().st_size if item.is_file() else 0,
"is_python": item.suffix == ".py" if item.is_file() else False,
"is_parent": False,
@@ -1068,7 +1207,7 @@ def get_file_browser_data(path: str = ""):
return Resp(
data={
"current_path": str(target_path.relative_to(working_dir)) if target_path != working_dir else "",
"current_path": str(target_path),
"working_dir": str(working_dir),
"items": items,
}