mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 04:51:10 +00:00
Fix workstation startup
Update registry
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
import importlib
|
||||
import inspect
|
||||
import json
|
||||
import os.path
|
||||
import traceback
|
||||
from typing import Union, Any, Dict, List, Tuple
|
||||
import networkx as nx
|
||||
from pylabrobot.resources import ResourceHolder
|
||||
from unilabos_msgs.msg import Resource
|
||||
|
||||
from unilabos.config.config import BasicConfig
|
||||
from unilabos.resources.container import RegularContainer
|
||||
from unilabos.resources.itemized_carrier import ItemizedCarrier
|
||||
from unilabos.ros.msgs.message_converter import convert_to_ros_msg
|
||||
@@ -45,6 +47,29 @@ def canonicalize_nodes_data(
|
||||
if node.get("label") is not None:
|
||||
node_id = node.pop("label")
|
||||
node["id"] = node["name"] = node_id
|
||||
if not isinstance(node.get("config"), dict):
|
||||
node["config"] = {}
|
||||
if not node.get("type"):
|
||||
node["type"] = "device"
|
||||
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'type', defaulting to 'device'", "warning")
|
||||
if not node.get("name"):
|
||||
node["name"] = node.get("id")
|
||||
print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning")
|
||||
if not isinstance(node.get("position"), dict):
|
||||
node["position"] = {"position": {}}
|
||||
x = node.pop("x", None)
|
||||
if x is not None:
|
||||
node["position"]["position"]["x"] = x
|
||||
y = node.pop("y", None)
|
||||
if y is not None:
|
||||
node["position"]["position"]["y"] = y
|
||||
z = node.pop("z", None)
|
||||
if z is not None:
|
||||
node["position"]["position"]["z"] = z
|
||||
for k in list(node.keys()):
|
||||
if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data"]:
|
||||
v = node.pop(k)
|
||||
node["config"][k] = v
|
||||
|
||||
# 第二步:处理parent_relation
|
||||
id2idx = {node["id"]: idx for idx, node in enumerate(nodes)}
|
||||
@@ -302,6 +327,10 @@ def read_graphml(graphml_file: str) -> tuple[nx.Graph, ResourceTreeSet, List[Dic
|
||||
"nodes": [node.res_content.model_dump(by_alias=True) for node in resource_tree_set.all_nodes],
|
||||
"links": standardized_links,
|
||||
}
|
||||
dump_json_path = os.path.join(BasicConfig.working_dir, os.path.basename(graphml_file).rsplit(".")[0] + ".json")
|
||||
with open(dump_json_path, "w", encoding="utf-8") as f:
|
||||
f.write(json.dumps(graph_data, indent=4, ensure_ascii=False))
|
||||
print_status(f"GraphML converted to JSON and saved to {dump_json_path}", "info")
|
||||
physical_setup_graph = nx.node_link_graph(graph_data, link="links", multigraph=False)
|
||||
handle_communications(physical_setup_graph)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user