Fix workstation startup

Update registry
This commit is contained in:
Xuwznln
2025-10-13 15:00:50 +08:00
parent 8c8359fab3
commit aed39b648d
7 changed files with 171 additions and 1731 deletions

View File

@@ -1,12 +1,14 @@
import importlib
import inspect
import json
import os.path
import traceback
from typing import Union, Any, Dict, List, Tuple
import networkx as nx
from pylabrobot.resources import ResourceHolder
from unilabos_msgs.msg import Resource
from unilabos.config.config import BasicConfig
from unilabos.resources.container import RegularContainer
from unilabos.resources.itemized_carrier import ItemizedCarrier
from unilabos.ros.msgs.message_converter import convert_to_ros_msg
@@ -45,6 +47,29 @@ def canonicalize_nodes_data(
if node.get("label") is not None:
node_id = node.pop("label")
node["id"] = node["name"] = node_id
if not isinstance(node.get("config"), dict):
node["config"] = {}
if not node.get("type"):
node["type"] = "device"
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning")
if not node.get("name"):
node["name"] = node.get("id")
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
if not isinstance(node.get("position"), dict):
node["position"] = {"position": {}}
x = node.pop("x", None)
if x is not None:
node["position"]["position"]["x"] = x
y = node.pop("y", None)
if y is not None:
node["position"]["position"]["y"] = y
z = node.pop("z", None)
if z is not None:
node["position"]["position"]["z"] = z
for k in list(node.keys()):
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data"]:
v = node.pop(k)
node["config"][k] = v
# 第二步处理parent_relation
id2idx = {node["id"]: idx for idx, node in enumerate(nodes)}
@@ -302,6 +327,10 @@ def read_graphml(graphml_file: str) -> tuple[nx.Graph, ResourceTreeSet, List[Dic
"nodes": [node.res_content.model_dump(by_alias=True) for node in resource_tree_set.all_nodes],
"links": standardized_links,
}
dump_json_path = os.path.join(BasicConfig.working_dir, os.path.basename(graphml_file).rsplit(".")[0] + ".json")
with open(dump_json_path, "w", encoding="utf-8") as f:
f.write(json.dumps(graph_data, indent=4, ensure_ascii=False))
print_status(f"GraphML converted to JSON and saved to {dump_json_path}", "info")
physical_setup_graph = nx.node_link_graph(graph_data, link="links", multigraph=False)
handle_communications(physical_setup_graph)