diff --git a/unilabos/app/backend.py b/unilabos/app/backend.py index 5bc4ad3..9f7d427 100644 --- a/unilabos/app/backend.py +++ b/unilabos/app/backend.py @@ -8,6 +8,7 @@ def start_backend( backend: str, devices_config: dict = {}, resources_config: list = [], + resources_edge_config: list = [], graph=None, controllers_config: dict = {}, bridges=[], @@ -31,7 +32,7 @@ def start_backend( backend_thread = threading.Thread( target=main if not without_host else slave, - args=(devices_config, resources_config, graph, controllers_config, bridges, visual, resources_mesh_config), + args=(devices_config, resources_config, resources_edge_config, graph, controllers_config, bridges, visual, resources_mesh_config), name="backend_thread", daemon=True, ) diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 0f6b2f4..918ef59 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -10,7 +10,7 @@ from copy import deepcopy import yaml -from unilabos.resources.graphio import tree_to_list +from unilabos.resources.graphio import tree_to_list, modify_to_backend_format # 首先添加项目根目录到路径 current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -136,15 +136,16 @@ def main(): # 注册表 build_registry(args_dict["registry_path"]) - + resource_edge_info = [] devices_and_resources = None if args_dict["graph"] is not None: import unilabos.resources.graphio as graph_res - graph_res.physical_setup_graph = ( - read_node_link_json(args_dict["graph"]) - if args_dict["graph"].endswith(".json") - else read_graphml(args_dict["graph"]) - ) + if args_dict["graph"].endswith(".json"): + graph, data = read_node_link_json(args_dict["graph"]) + else: + graph, data = read_graphml(args_dict["graph"]) + graph_res.physical_setup_graph = graph + resource_edge_info = modify_to_backend_format(data["links"]) devices_and_resources = dict_from_graph(graph_res.physical_setup_graph) # args_dict["resources_config"] = initialize_resources(list(deepcopy(devices_and_resources).values())) args_dict["resources_config"] = list(devices_and_resources.values()) @@ -185,6 +186,7 @@ def main(): signal.signal(signal.SIGTERM, _exit) mqtt_client.start() args_dict["resources_mesh_config"] = {} + args_dict["resources_edge_config"] = resource_edge_info # web visiualize 2D if args_dict["visual"] != "disable": enable_rviz = args_dict["visual"] == "rviz" diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 6932948..98d3b3f 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -30,7 +30,25 @@ class HTTPClient: self.auth = MQConfig.lab_id info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}") - def resource_add(self, resources: List[Dict[str, Any]], database_process_later:bool) -> requests.Response: + def resource_edge_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response: + """ + 添加资源 + + Args: + resources: 要添加的资源列表 + database_process_later: 后台处理资源 + Returns: + Response: API响应对象 + """ + response = requests.post( + f"{self.remote_addr}/lab/resource/edge/batch_create/?database_process_later={1 if database_process_later else 0}", + json=resources, + headers={"Authorization": f"lab {self.auth}"}, + timeout=5, + ) + return response + + def resource_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response: """ 添加资源 diff --git a/unilabos/app/web/pages.py b/unilabos/app/web/pages.py index a08cebb..51b109b 100644 --- a/unilabos/app/web/pages.py +++ b/unilabos/app/web/pages.py @@ -16,7 +16,6 @@ from jinja2 import Environment, FileSystemLoader from unilabos.config.config import BasicConfig from unilabos.registry.registry import lab_registry -from unilabos.app.mq import mqtt_client from unilabos.ros.msgs.message_converter import msg_converter_manager from unilabos.utils.log import error from unilabos.utils.type_check import TypeEncoder diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 734e509..36137b9 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -1,7 +1,7 @@ import importlib import inspect import json -from typing import Union +from typing import Union, Any import numpy as np import networkx as nx from unilabos_msgs.msg import Resource @@ -84,6 +84,8 @@ def canonicalize_links_ports(data: dict) -> dict: # 第一遍处理:将字符串类型的port转换为字典格式 for link in data.get("links", []): port = link.get("port") + if link["type"] == "physical": + link["type"] = "fluid" if isinstance(port, int): port = str(port) if isinstance(port, str): @@ -157,7 +159,28 @@ def read_node_link_json(json_file): physical_setup_graph = nx.node_link_graph(data, multigraph=False) # edges="links" 3.6 warning handle_communications(physical_setup_graph) - return physical_setup_graph + return physical_setup_graph, data + + +def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]: + for edge in data: + port = edge.pop("port", {}) + source = edge["source"] + target = edge["target"] + if source in port: + edge["sourceHandle"] = port[source] + elif "source_port" in edge: + edge["sourceHandle"] = edge.pop("source_port") + if target in port: + edge["targetHandle"] = port[target] + elif "target_port" in edge: + edge["targetHandle"] = edge.pop("target_port") + if "id" not in edge: + edge["id"] = f"link_generated_{source}_{target}" + for key in ["source_port", "target_port"]: + if key in edge: + edge.pop(key) + return data def read_graphml(graphml_file): @@ -182,7 +205,7 @@ def read_graphml(graphml_file): physical_setup_graph = nx.node_link_graph(data, edges="links", multigraph=False) # edges="links" 3.6 warning handle_communications(physical_setup_graph) - return physical_setup_graph + return physical_setup_graph, data def dict_from_graph(graph: nx.Graph) -> dict: diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index bbd6359..7ee2683 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -45,6 +45,7 @@ def exit() -> None: def main( devices_config: Dict[str, Any] = {}, resources_config: list=[], + resources_edge_config: list=[], graph: Optional[Dict[str, Any]] = None, controllers_config: Dict[str, Any] = {}, bridges: List[Any] = [], @@ -62,6 +63,7 @@ def main( "host_node", devices_config, resources_config, + resources_edge_config, graph, controllers_config, bridges, @@ -97,6 +99,7 @@ def main( def slave( devices_config: Dict[str, Any] = {}, resources_config=[], + resources_edge_config=[], graph: Optional[Dict[str, Any]] = None, controllers_config: Dict[str, Any] = {}, bridges: List[Any] = [], diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index fef9d64..6f52b4c 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -58,6 +58,7 @@ class HostNode(BaseROS2DeviceNode): device_id: str, devices_config: Dict[str, Any], resources_config: list, + resources_edge_config: list[dict], physical_setup_graph: Optional[Dict[str, Any]] = None, controllers_config: Optional[Dict[str, Any]] = None, bridges: Optional[List[Any]] = None, @@ -96,6 +97,7 @@ class HostNode(BaseROS2DeviceNode): self.server_latest_timestamp = 0.0 # self.devices_config = devices_config self.resources_config = resources_config + self.resources_edge_config = resources_edge_config self.physical_setup_graph = physical_setup_graph if controllers_config is None: controllers_config = {} @@ -203,12 +205,19 @@ class HostNode(BaseROS2DeviceNode): try: for bridge in self.bridges: if hasattr(bridge, "resource_add"): + from unilabos.app.web.client import HTTPClient + client: HTTPClient = bridge resource_start_time = time.time() - resource_add_res = bridge.resource_add(add_schema(resource_with_parent_name), True) + resource_add_res = client.resource_add(add_schema(resource_with_parent_name), True) resource_end_time = time.time() self.lab_logger().info( f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" ) + resource_add_res = client.resource_edge_add(self.resources_edge_config, True) + resource_edge_end_time = time.time() + self.lab_logger().info( + f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms" + ) except Exception as ex: self.lab_logger().error("[Host Node-Resource] 添加物料出错!") self.lab_logger().error(traceback.format_exc()) @@ -757,8 +766,10 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info(f"[Host Node-Resource] Add request received: {len(resources)} resources") success = False - if len(self.bridges) > 0: - r = self.bridges[-1].resource_add(add_schema(resources)) + if len(self.bridges) > 0: # 边的提交待定 + from unilabos.app.web.client import HTTPClient + client: HTTPClient = self.bridges[-1] + r = client.resource_add(add_schema(resources), False) success = bool(r) response.success = success