mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-05 14:05:12 +00:00
add resource edge upload
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import importlib
|
||||
import inspect
|
||||
import json
|
||||
from typing import Union
|
||||
from typing import Union, Any
|
||||
import numpy as np
|
||||
import networkx as nx
|
||||
from unilabos_msgs.msg import Resource
|
||||
@@ -84,6 +84,8 @@ def canonicalize_links_ports(data: dict) -> dict:
|
||||
# 第一遍处理:将字符串类型的port转换为字典格式
|
||||
for link in data.get("links", []):
|
||||
port = link.get("port")
|
||||
if link["type"] == "physical":
|
||||
link["type"] = "fluid"
|
||||
if isinstance(port, int):
|
||||
port = str(port)
|
||||
if isinstance(port, str):
|
||||
@@ -157,7 +159,28 @@ def read_node_link_json(json_file):
|
||||
|
||||
physical_setup_graph = nx.node_link_graph(data, multigraph=False) # edges="links" 3.6 warning
|
||||
handle_communications(physical_setup_graph)
|
||||
return physical_setup_graph
|
||||
return physical_setup_graph, data
|
||||
|
||||
|
||||
def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
for edge in data:
|
||||
port = edge.pop("port", {})
|
||||
source = edge["source"]
|
||||
target = edge["target"]
|
||||
if source in port:
|
||||
edge["sourceHandle"] = port[source]
|
||||
elif "source_port" in edge:
|
||||
edge["sourceHandle"] = edge.pop("source_port")
|
||||
if target in port:
|
||||
edge["targetHandle"] = port[target]
|
||||
elif "target_port" in edge:
|
||||
edge["targetHandle"] = edge.pop("target_port")
|
||||
if "id" not in edge:
|
||||
edge["id"] = f"link_generated_{source}_{target}"
|
||||
for key in ["source_port", "target_port"]:
|
||||
if key in edge:
|
||||
edge.pop(key)
|
||||
return data
|
||||
|
||||
|
||||
def read_graphml(graphml_file):
|
||||
@@ -182,7 +205,7 @@ def read_graphml(graphml_file):
|
||||
|
||||
physical_setup_graph = nx.node_link_graph(data, edges="links", multigraph=False) # edges="links" 3.6 warning
|
||||
handle_communications(physical_setup_graph)
|
||||
return physical_setup_graph
|
||||
return physical_setup_graph, data
|
||||
|
||||
|
||||
def dict_from_graph(graph: nx.Graph) -> dict:
|
||||
|
||||
Reference in New Issue
Block a user