From 5610c28b672526a9ee6092e7ed62601340ec7114 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Fri, 10 Oct 2025 07:13:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=89=A9=E6=96=99=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + test/experiments/prcxi_9300.json | 2 +- unilabos/app/backend.py | 7 +- unilabos/app/main.py | 66 +- unilabos/app/register.py | 5 - unilabos/app/web/client.py | 78 ++- unilabos/app/ws_client.py | 112 +++- unilabos/resources/graphio.py | 282 +++++--- unilabos/ros/main_slave_run.py | 125 ++-- unilabos/ros/nodes/base_device_node.py | 287 ++++++-- unilabos/ros/nodes/presets/host_node.py | 327 +++++++-- unilabos/ros/nodes/presets/workstation.py | 2 +- unilabos/ros/nodes/resource_tracker.py | 771 +++++++++++++++++++++- unilabos/ros/utils/driver_creator.py | 61 +- 14 files changed, 1801 insertions(+), 325 deletions(-) diff --git a/.gitignore b/.gitignore index 9d40cbca..71d8d0d5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ configs/ temp/ output/ unilabos_data/ +pyrightconfig.json ## Python # Byte-compiled / optimized / DLL files diff --git a/test/experiments/prcxi_9300.json b/test/experiments/prcxi_9300.json index 3b99a89d..4901c5d8 100644 --- a/test/experiments/prcxi_9300.json +++ b/test/experiments/prcxi_9300.json @@ -21,7 +21,7 @@ "timeout": 10.0, "axis": "Left", "channel_num": 8, - "setup": true, + "setup": false, "debug": true, "simulator": true, "matrix_id": "71593" diff --git a/unilabos/app/backend.py b/unilabos/app/backend.py index 3e7e35f5..d43b9544 100644 --- a/unilabos/app/backend.py +++ b/unilabos/app/backend.py @@ -1,14 +1,15 @@ import threading +from unilabos.ros.nodes.resource_tracker import ResourceTreeSet from unilabos.utils import logger # 根据选择的 backend 启动相应的功能 def start_backend( backend: str, - devices_config: dict = {}, - resources_config: list = [], - resources_edge_config: list = [], + devices_config: ResourceTreeSet, + resources_config: ResourceTreeSet, + resources_edge_config: list[dict] = [], graph=None, controllers_config: dict = {}, bridges=[], diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 316ff59b..1844fe77 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -7,9 +7,12 @@ import sys import threading import time from copy import deepcopy +from typing import Dict, Any, List +import networkx as nx import yaml +from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict # 首先添加项目根目录到路径 current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -225,6 +228,15 @@ def main(): else: HTTPConfig.remote_addr = args_dict.get("addr", "") + # 设置BasicConfig参数 + if args_dict.get("ak", ""): + BasicConfig.ak = args_dict.get("ak", "") + print_status("传入了ak参数,优先采用传入参数!", "info") + if args_dict.get("sk", ""): + BasicConfig.sk = args_dict.get("sk", "") + print_status("传入了sk参数,优先采用传入参数!", "info") + + # 使用远程资源启动 if args_dict["use_remote_resource"]: print_status("使用远程资源启动", "info") from unilabos.app.web import http_client @@ -236,13 +248,6 @@ def main(): else: print_status("远程资源不存在,本地将进行首次上报!", "info") - # 设置BasicConfig参数 - if args_dict.get("ak", ""): - BasicConfig.ak = args_dict.get("ak", "") - print_status("传入了ak参数,优先采用传入参数!", "info") - if args_dict.get("sk", ""): - BasicConfig.sk = args_dict.get("sk", "") - print_status("传入了sk参数,优先采用传入参数!", "info") BasicConfig.working_dir = working_dir BasicConfig.is_host_mode = not args_dict.get("is_slave", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) @@ -278,6 +283,10 @@ def main(): if not BasicConfig.ak or not BasicConfig.sk: print_status("后续运行必须拥有一个实验室,请前往 https://uni-lab.bohrium.com 注册实验室!", "warning") os._exit(1) + graph: nx.Graph + resource_tree_set: ResourceTreeSet + resource_links: List[Dict[str, Any]] + if args_dict["graph"] is None: request_startup_json = http_client.request_startup_json() if not request_startup_json: @@ -287,58 +296,54 @@ def main(): os._exit(1) else: print_status("联网获取设备加载文件成功", "info") - graph, data = read_node_link_json(request_startup_json) + graph, resource_tree_set, resource_links = read_node_link_json(request_startup_json) else: file_path = args_dict["graph"] if file_path.endswith(".json"): - graph, data = read_node_link_json(file_path) + graph, resource_tree_set, resource_links = read_node_link_json(file_path) else: - graph, data = read_graphml(file_path) + graph, resource_tree_set, resource_links = read_graphml(file_path) import unilabos.resources.graphio as graph_res graph_res.physical_setup_graph = graph - resource_edge_info = modify_to_backend_format(data["links"]) + resource_edge_info = modify_to_backend_format(resource_links) materials = lab_registry.obtain_registry_resource_info() materials.extend(lab_registry.obtain_registry_device_info()) materials = {k["id"]: k for k in materials} - nodes = {k["id"]: k for k in data["nodes"]} + # 从 ResourceTreeSet 中获取节点信息 + nodes = {node.res_content.id: node.res_content for node in resource_tree_set.all_nodes} edge_info = len(resource_edge_info) for ind, i in enumerate(resource_edge_info[::-1]): - source_node = nodes[i["source"]] - target_node = nodes[i["target"]] + source_node: ResourceDict = nodes[i["source"]] + target_node: ResourceDict = nodes[i["target"]] source_handle = i["sourceHandle"] target_handle = i["targetHandle"] source_handler_keys = [ - h["handler_key"] for h in materials[source_node["class"]]["handles"] if h["io_type"] == "source" + h["handler_key"] for h in materials[source_node.klass]["handles"] if h["io_type"] == "source" ] target_handler_keys = [ - h["handler_key"] for h in materials[target_node["class"]]["handles"] if h["io_type"] == "target" + h["handler_key"] for h in materials[target_node.klass]["handles"] if h["io_type"] == "target" ] if source_handle not in source_handler_keys: print_status( - f"节点 {source_node['id']} 的source端点 {source_handle} 不存在,请检查,支持的端点 {source_handler_keys}", + f"节点 {source_node.id} 的source端点 {source_handle} 不存在,请检查,支持的端点 {source_handler_keys}", "error", ) resource_edge_info.pop(edge_info - ind - 1) continue if target_handle not in target_handler_keys: print_status( - f"节点 {target_node['id']} 的target端点 {target_handle} 不存在,请检查,支持的端点 {target_handler_keys}", + f"节点 {target_node.id} 的target端点 {target_handle} 不存在,请检查,支持的端点 {target_handler_keys}", "error", ) resource_edge_info.pop(edge_info - ind - 1) continue - devices_and_resources = dict_from_graph(graph_res.physical_setup_graph) - # args_dict["resources_config"] = initialize_resources(list(deepcopy(devices_and_resources).values())) - args_dict["resources_config"] = list(devices_and_resources.values()) - args_dict["devices_config"] = dict_to_nested_dict(deepcopy(devices_and_resources), devices_only=False) + # 使用 ResourceTreeSet 代替 list + args_dict["resources_config"] = resource_tree_set + args_dict["devices_config"] = resource_tree_set args_dict["graph"] = graph_res.physical_setup_graph - print_status(f"{len(args_dict['resources_config'])} Resources loaded:", "info") - for i in args_dict["resources_config"]: - print_status(f"DeviceId: {i['id']}, Class: {i['class']}", "info") - if BasicConfig.upload_registry: # 设备注册到服务端 - 需要 ak 和 sk if args_dict.get("ak") and args_dict.get("sk"): @@ -351,9 +356,7 @@ def main(): else: print_status("未提供 ak 和 sk,跳过设备注册", "info") else: - print_status( - "本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning" - ) + print_status("本次启动注册表不报送云端,如果您需要联网调试,请在启动命令增加--upload_registry", "warning") if args_dict["controllers"] is not None: args_dict["controllers_config"] = yaml.safe_load(open(args_dict["controllers"], encoding="utf-8")) @@ -383,13 +386,16 @@ def main(): # web visiualize 2D if args_dict["visual"] != "disable": enable_rviz = args_dict["visual"] == "rviz" + devices_and_resources = dict_from_graph(graph_res.physical_setup_graph) if devices_and_resources is not None: from unilabos.device_mesh.resource_visalization import ( ResourceVisualization, ) # 此处开启后,logger会变更为INFO,有需要请调整 resource_visualization = ResourceVisualization( - devices_and_resources, args_dict["resources_config"], enable_rviz=enable_rviz + devices_and_resources, + [n.res_content for n in args_dict["resources_config"].all_nodes], # type: ignore # FIXME + enable_rviz=enable_rviz, ) args_dict["resources_mesh_config"] = resource_visualization.resource_model start_backend(**args_dict) diff --git a/unilabos/app/register.py b/unilabos/app/register.py index 204e8175..f456183d 100644 --- a/unilabos/app/register.py +++ b/unilabos/app/register.py @@ -1,11 +1,6 @@ -import argparse import json import time -from unilabos.config.config import BasicConfig -from unilabos.registry.registry import build_registry - -from unilabos.app.main import load_config_from_file from unilabos.utils.log import logger from unilabos.utils.type_check import TypeEncoder diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 1420c448..510647f0 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -9,6 +9,7 @@ import os from typing import List, Dict, Any, Optional import requests +from unilabos.ros.nodes.resource_tracker import ResourceTreeSet from unilabos.utils.log import info from unilabos.config.config import HTTPConfig, BasicConfig from unilabos.utils import logger @@ -46,7 +47,7 @@ class HTTPClient: Response: API响应对象 """ response = requests.post( - f"{self.remote_addr}/lab/material/edge", + f"{self.remote_addr}/edge/material/edge", json={ "edges": resources, }, @@ -61,6 +62,81 @@ class HTTPClient: logger.error(f"添加物料关系失败: {response.status_code}, {response.text}") return response + def resource_tree_add(self, resources: ResourceTreeSet, mount_uuid: str, first_add: bool) -> Dict[str, str]: + """ + 添加资源 + + Args: + resources: 要添加的资源树集合(ResourceTreeSet) + mount_uuid: 要挂载的资源的uuid + first_add: 是否为首次添加资源,可以是host也可以是slave来的 + Returns: + Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} + """ + # 从序列化数据中提取所有节点的UUID(保存旧UUID) + old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} + if not self.initialized or first_add: + self.initialized = True + info(f"首次添加资源,当前远程地址: {self.remote_addr}") + response = requests.post( + f"{self.remote_addr}/edge/material", + json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=100, + ) + else: + response = requests.put( + f"{self.remote_addr}/edge/material", + json={"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=100, + ) + + # 处理响应,构建UUID映射 + uuid_mapping = {} + if response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"添加物料失败: {response.text}") + else: + data = res["data"] + for i in data: + uuid_mapping[i["uuid"]] = i["cloud_uuid"] + else: + logger.error(f"添加物料失败: {response.text}") + for u, n in old_uuids.items(): + if u in uuid_mapping: + n.res_content.uuid = uuid_mapping[u] + else: + logger.warning(f"资源UUID未更新: {u}") + return uuid_mapping + + def resource_tree_get(self, uuid_list: List[str], with_children: bool) -> List[Dict[str, Any]]: + """ + 添加资源 + + Args: + uuid_list: List[str] + Returns: + Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} + """ + response = requests.post( + f"{self.remote_addr}/edge/material/query", + json={"uuids": uuid_list, "with_children": with_children}, + headers={"Authorization": f"Lab {self.auth}"}, + timeout=100, + ) + if response.status_code == 200: + res = response.json() + if "code" in res and res["code"] != 0: + logger.error(f"查询物料失败: {response.text}") + else: + data = res["data"]["nodes"] + return data + else: + logger.error(f"查询物料失败: {response.text}") + return [] + def resource_add(self, resources: List[Dict[str, Any]]) -> requests.Response: """ 添加资源 diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 1a859671..c91ca8ec 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -19,9 +19,12 @@ import websockets import ssl as ssl_module from queue import Queue, Empty from dataclasses import dataclass, field -from typing import Optional, Dict, Any, Callable, List, Set +from typing import Optional, Dict, Any, List from urllib.parse import urlparse from enum import Enum + +from jedi.inference.gradual.typing import TypedDict + from unilabos.app.model import JobAddReq from unilabos.ros.nodes.presets.host_node import HostNode from unilabos.utils.type_check import serialize_result_info @@ -96,6 +99,14 @@ class WebSocketMessage: timestamp: float = field(default_factory=time.time) +class WSResourceChatData(TypedDict): + uuid: str + device_uuid: str + device_id: str + device_old_uuid: str + device_old_id: str + + class DeviceActionManager: """设备动作管理器 - 管理每个device_action_key的任务队列""" @@ -543,7 +554,7 @@ class MessageProcessor: async def _process_message(self, data: Dict[str, Any]): """处理收到的消息""" message_type = data.get("action", "") - message_data = data.get("data", {}) + message_data = data.get("data") logger.debug(f"[MessageProcessor] Processing message: {message_type}") @@ -556,8 +567,12 @@ class MessageProcessor: await self._handle_job_start(message_data) elif message_type == "cancel_action" or message_type == "cancel_task": await self._handle_cancel_action(message_data) - elif message_type == "": - return + elif message_type == "add_material": + await self._handle_resource_tree_update(message_data, "add") + elif message_type == "update_material": + await self._handle_resource_tree_update(message_data, "update") + elif message_type == "remove_material": + await self._handle_resource_tree_update(message_data, "remove") else: logger.debug(f"[MessageProcessor] Unknown message type: {message_type}") @@ -574,6 +589,7 @@ class MessageProcessor: async def _handle_query_action_state(self, data: Dict[str, Any]): """处理query_action_state消息""" device_id = data.get("device_id", "") + device_uuid = data.get("device_uuid", "") action_name = data.get("action_name", "") task_id = data.get("task_id", "") job_id = data.get("job_id", "") @@ -760,6 +776,92 @@ class MessageProcessor: else: logger.warning("[MessageProcessor] Cancel request missing both task_id and job_id") + async def _handle_resource_tree_update(self, resource_uuid_list: List[WSResourceChatData], action: str): + """处理资源树更新消息(add_material/update_material/remove_material)""" + if not resource_uuid_list: + return + + # 按device_id和action分组 + # device_action_groups: {(device_id, action): [uuid_list]} + device_action_groups = {} + + for item in resource_uuid_list: + device_id = item["device_id"] + if not device_id: + device_id = "host_node" + + # 特殊处理update action: 检查是否设备迁移 + if action == "update": + device_old_id = item.get("device_old_id", "") + if not device_old_id: + device_old_id = "host_node" + + # 设备迁移:device_id != device_old_id + if device_id != device_old_id: + # 给旧设备发送remove + key_remove = (device_old_id, "remove") + if key_remove not in device_action_groups: + device_action_groups[key_remove] = [] + device_action_groups[key_remove].append(item["uuid"]) + + # 给新设备发送add + key_add = (device_id, "add") + if key_add not in device_action_groups: + device_action_groups[key_add] = [] + device_action_groups[key_add].append(item["uuid"]) + + logger.info( + f"[MessageProcessor] Resource migrated: {item['uuid'][:8]} from {device_old_id} to {device_id}" + ) + else: + # 正常update + key = (device_id, "update") + if key not in device_action_groups: + device_action_groups[key] = [] + device_action_groups[key].append(item["uuid"]) + else: + # add或remove action,直接分组 + key = (device_id, action) + if key not in device_action_groups: + device_action_groups[key] = [] + device_action_groups[key].append(item["uuid"]) + + logger.info(f"触发物料更新 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}") + + # 为每个(device_id, action)创建独立的更新线程 + for (device_id, actual_action), items in device_action_groups.items(): + logger.info(f"设备 {device_id} 物料更新 {actual_action} 数量: {len(items)}") + + def _notify_resource_tree(dev_id, act, item_list): + try: + host_node = HostNode.get_instance(timeout=5) + if not host_node: + logger.error(f"[MessageProcessor] HostNode instance not available for {act}") + return + + success = host_node.notify_resource_tree_update(dev_id, act, item_list) + + if success: + logger.info( + f"[MessageProcessor] Resource tree {act} completed for device {dev_id}, " + f"items: {len(item_list)}" + ) + else: + logger.warning(f"[MessageProcessor] Resource tree {act} failed for device {dev_id}") + + except Exception as e: + logger.error(f"[MessageProcessor] Error in resource tree {act} for device {dev_id}: {str(e)}") + logger.error(traceback.format_exc()) + + # 在新线程中执行通知 + thread = threading.Thread( + target=_notify_resource_tree, + args=(device_id, actual_action, items), + daemon=True, + name=f"ResourceTreeUpdate-{actual_action}-{device_id}", + ) + thread.start() + async def _send_action_state_response( self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int ): @@ -1008,6 +1110,8 @@ class WebSocketClient(BaseCommunicationClient): # 构建WebSocket URL self.websocket_url = self._build_websocket_url() + if not self.websocket_url: + self.websocket_url = "" # 默认空字符串,避免None # 两个核心线程 self.message_processor = MessageProcessor(self.websocket_url, self.send_queue, self.device_manager) diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index 0c55127a..b994d065 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -1,89 +1,127 @@ import importlib import inspect import json -from typing import Union, Any, Dict -import numpy as np +import traceback +from typing import Union, Any, Dict, List import networkx as nx from pylabrobot.resources import ResourceHolder from unilabos_msgs.msg import Resource from unilabos.resources.container import RegularContainer from unilabos.ros.msgs.message_converter import convert_to_ros_msg +from unilabos.ros.nodes.resource_tracker import ( + ResourceDictInstance, + ResourceTreeSet, +) +from unilabos.utils.banner_print import print_status try: from pylabrobot.resources.resource import Resource as ResourcePLR except ImportError: pass -from typing import Union, get_origin +from typing import get_origin physical_setup_graph: nx.Graph = None -def canonicalize_nodes_data(data: dict, parent_relation: dict = {}) -> dict: - for node in data.get("nodes", []): +def canonicalize_nodes_data( + nodes: List[Dict[str, Any]], parent_relation: Dict[str, List[str]] = {} +) -> ResourceTreeSet: + """ + 标准化节点数据,使用 ResourceInstanceDictFlatten 进行规范化并创建 ResourceTreeSet + + Args: + nodes: 原始节点列表 + parent_relation: 父子关系映射 {parent_id: [child_id1, child_id2, ...]} + + Returns: + ResourceTreeSet: 标准化后的资源树集合 + """ + print_status(f"{len(nodes)} Resources loaded:", "info") + + # 第一步:基本预处理(处理graphml的label字段) + for node in nodes: if node.get("label") is not None: - id = node.pop("label") - node["id"] = node["name"] = id - if "id" not in node: - node["id"] = node.get("name", "NaN") - if "name" not in node: - node["name"] = node["id"] - if node.get("position") is None: - node["position"] = { - "x": node.pop("x", 0.0), - "y": node.pop("y", 0.0), - "z": node.pop("z", 0.0), - } - if node.get("config") is None: - node["config"] = {} - node["data"] = {} - for k in list(node.keys()): - if k not in [ - "id", - "name", - "class", - "type", - "position", - "children", - "parent", - "config", - "data", - ]: - if k in ["chemical", "current_volume"]: - if node["data"].get("liquids") is None: - node["data"]["liquids"] = [{}] - if k == "chemical": - node["data"]["liquids"][0]["liquid_name"] = node.pop(k) - elif k == "current_volume": - node["data"]["liquids"][0]["liquid_volume"] = node.pop(k) - elif k == "max_volume": - node["data"]["max_volume"] = node.pop(k) - elif k == "url": - node.pop(k) - else: - node["config"][k] = node.pop(k) - if "class" not in node: - node["class"] = None - if "type" not in node: - node["type"] = ( - "container" - if node["class"] is None - else "device" if node["class"] not in ["container", "plate"] else node["class"] - ) - if "children" not in node: - node["children"] = [] + node_id = node.pop("label") + node["id"] = node["name"] = node_id - id2idx = {node_data["id"]: idx for idx, node_data in enumerate(data["nodes"])} + # 第二步:处理parent_relation + id2idx = {node["id"]: idx for idx, node in enumerate(nodes)} for parent, children in parent_relation.items(): - data["nodes"][id2idx[parent]]["children"] = children - for child in children: - data["nodes"][id2idx[child]]["parent"] = parent - return data + if parent in id2idx: + nodes[id2idx[parent]]["children"] = children + for child in children: + if child in id2idx: + nodes[id2idx[child]]["parent"] = parent + + # 第三步:使用 ResourceInstanceDictFlatten 标准化每个节点 + standardized_instances = [] + known_nodes: Dict[str, ResourceDictInstance] = {} # {node_id: ResourceDictInstance} + uuid_to_instance: Dict[str, ResourceDictInstance] = {} # {uuid: ResourceDictInstance} + + for node in nodes: + try: + print_status(f"DeviceId: {node['id']}, Class: {node['class']}", "info") + # 使用标准化方法 + resource_instance = ResourceDictInstance.get_resource_instance_from_dict(node) + known_nodes[node["id"]] = resource_instance + uuid_to_instance[resource_instance.res_content.uuid] = resource_instance + standardized_instances.append(resource_instance) + except Exception as e: + print_status(f"Failed to standardize node {node.get('id', 'unknown')}:\n{traceback.format_exc()}", "error") + continue + + # 第四步:建立 parent 和 children 关系 + for node in nodes: + node_id = node["id"] + if node_id not in known_nodes: + continue + + current_instance = known_nodes[node_id] + + # 优先使用 parent_uuid 进行匹配,如果不存在则使用 parent + parent_uuid = node.get("parent_uuid") + parent_id = node.get("parent") + parent_instance = None + + # 优先用 parent_uuid 匹配 + if parent_uuid and parent_uuid in uuid_to_instance: + parent_instance = uuid_to_instance[parent_uuid] + # 否则用 parent_id 匹配 + elif parent_id and parent_id in known_nodes: + parent_instance = known_nodes[parent_id] + + # 设置 parent 引用 + if parent_instance: + current_instance.res_content.parent = parent_instance.res_content + # 将当前节点添加到父节点的 children 列表 + parent_instance.children.append(current_instance) + + # 第五步:创建 ResourceTreeSet + resource_tree_set = ResourceTreeSet.from_nested_list(standardized_instances) + return resource_tree_set -def canonicalize_links_ports(data: dict) -> dict: +def canonicalize_links_ports( + links: List[Dict[str, Any]], resource_tree_set: ResourceTreeSet +) -> List[Dict[str, Any]]: + """ + 标准化边/连接的端口信息 + + Args: + links: 原始连接列表 + resource_tree_set: 资源树集合,用于获取节点的UUID信息 + + Returns: + 标准化后的连接列表 + """ + # 构建 id 到 uuid 的映射 + id_to_uuid: Dict[str, str] = {} + for node in resource_tree_set.all_nodes: + id_to_uuid[node.res_content.id] = node.res_content.uuid + # 第一遍处理:将字符串类型的port转换为字典格式 - for link in data.get("links", []): + for link in links: port = link.get("port") if link.get("type", "physical") == "physical": link["type"] = "fluid" @@ -107,11 +145,11 @@ def canonicalize_links_ports(data: dict) -> dict: link["port"] = {link["source"]: None, link["target"]: None} # 构建边字典,键为(source节点, target节点),值为对应的port信息 - edges = {(link["source"], link["target"]): link["port"] for link in data.get("links", [])} + edges = {(link["source"], link["target"]): link["port"] for link in links} # 第二遍处理:填充反向边的dest信息 delete_reverses = [] - for i, link in enumerate(data.get("links", [])): + for i, link in enumerate(links): s, t = link["source"], link["target"] current_port = link["port"] if current_port.get(t) is None: @@ -127,9 +165,22 @@ def canonicalize_links_ports(data: dict) -> dict: # 若不存在反向边,初始化为空结构 current_port[t] = current_port[s] # 删除已被使用反向端口信息的反向边 - data["links"] = [link for i, link in enumerate(data.get("links", [])) if i not in delete_reverses] + standardized_links = [link for i, link in enumerate(links) if i not in delete_reverses] - return data + # 第三遍处理:为每个 link 添加 source_uuid 和 target_uuid + for link in standardized_links: + source_id = link.get("source") + target_id = link.get("target") + + # 添加 source_uuid + if source_id and source_id in id_to_uuid: + link["source_uuid"] = id_to_uuid[source_id] + + # 添加 target_uuid + if target_id and target_id in id_to_uuid: + link["target_uuid"] = id_to_uuid[target_id] + + return standardized_links def handle_communications(G: nx.Graph): @@ -151,18 +202,43 @@ def handle_communications(G: nx.Graph): G.nodes[device]["config"]["io_device_port"] = int(edata["port"][device_comm]) -def read_node_link_json(json_info: Union[str, Dict[str, Any]]) -> tuple[nx.Graph, dict]: +def read_node_link_json( + json_info: Union[str, Dict[str, Any]], +) -> tuple[nx.Graph, ResourceTreeSet, List[Dict[str, Any]]]: + """ + 读取节点-边的JSON数据并构建图 + + Args: + json_info: JSON文件路径或字典数据 + + Returns: + tuple[nx.Graph, ResourceTreeSet, List[Dict[str, Any]]]: + 返回NetworkX图对象、资源树集合和标准化后的连接列表 + """ global physical_setup_graph if isinstance(json_info, str): data = json.load(open(json_info, encoding="utf-8")) else: data = json_info - data = canonicalize_nodes_data(data) - data = canonicalize_links_ports(data) - physical_setup_graph = nx.node_link_graph(data, multigraph=False) # edges="links" 3.6 warning + # 标准化节点数据并创建 ResourceTreeSet + nodes = data.get("nodes", []) + resource_tree_set = canonicalize_nodes_data(nodes) + + # 标准化边数据 + links = data.get("links", []) + standardized_links = canonicalize_links_ports(links, resource_tree_set) + + # 构建 NetworkX 图(需要转换回 dict 格式) + # 从 ResourceTreeSet 获取所有节点 + graph_data = { + "nodes": [node.res_content.model_dump(by_alias=True) for node in resource_tree_set.all_nodes], + "links": standardized_links, + } + physical_setup_graph = nx.node_link_graph(graph_data, edges="links", multigraph=False) handle_communications(physical_setup_graph) - return physical_setup_graph, data + + return physical_setup_graph, resource_tree_set, standardized_links def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]]: @@ -185,7 +261,17 @@ def modify_to_backend_format(data: list[dict[str, Any]]) -> list[dict[str, Any]] return data -def read_graphml(graphml_file): +def read_graphml(graphml_file: str) -> tuple[nx.Graph, ResourceTreeSet, List[Dict[str, Any]]]: + """ + 读取GraphML文件并构建图 + + Args: + graphml_file: GraphML文件路径 + + Returns: + tuple[nx.Graph, ResourceTreeSet, List[Dict[str, Any]]]: + 返回NetworkX图对象、资源树集合和标准化后的连接列表 + """ global physical_setup_graph G = nx.read_graphml(graphml_file) @@ -202,12 +288,25 @@ def read_graphml(graphml_file): G2 = nx.relabel_nodes(G, mapping) data = nx.node_link_data(G2) - data = canonicalize_nodes_data(data, parent_relation=parent_relation) - data = canonicalize_links_ports(data) - physical_setup_graph = nx.node_link_graph(data, edges="links", multigraph=False) # edges="links" 3.6 warning + # 标准化节点数据并创建 ResourceTreeSet + nodes = data.get("nodes", []) + resource_tree_set = canonicalize_nodes_data(nodes, parent_relation=parent_relation) + + # 标准化边数据 + links = data.get("links", []) + standardized_links = canonicalize_links_ports(links, resource_tree_set) + + # 构建 NetworkX 图(需要转换回 dict 格式) + # 从 ResourceTreeSet 获取所有节点 + graph_data = { + "nodes": [node.res_content.model_dump(by_alias=True) for node in resource_tree_set.all_nodes], + "links": standardized_links, + } + physical_setup_graph = nx.node_link_graph(graph_data, link="links", multigraph=False) handle_communications(physical_setup_graph) - return physical_setup_graph, data + + return physical_setup_graph, resource_tree_set, standardized_links def dict_from_graph(graph: nx.Graph) -> dict: @@ -229,11 +328,7 @@ def dict_to_tree(nodes: dict, devices_only: bool = False) -> list[dict]: is_root[child_id] = False # 找到根节点并返回 - root_nodes = [ - node - for node in nodes_list - if is_root.get(node["id"], False) or len(nodes_list) == 1 - ] + root_nodes = [node for node in nodes_list if is_root.get(node["id"], False) or len(nodes_list) == 1] # 如果存在多个根节点,返回所有根节点 return root_nodes @@ -258,11 +353,7 @@ def dict_to_nested_dict(nodes: dict, devices_only: bool = False) -> dict: node["config"]["children"] = node["children"] # 找到根节点并返回 - root_nodes = { - node["id"]: node - for node in nodes_list - if is_root.get(node["id"], False) or len(nodes_list) == 1 - } + root_nodes = {node["id"]: node for node in nodes_list if is_root.get(node["id"], False) or len(nodes_list) == 1} # 如果存在多个根节点,返回所有根节点 return root_nodes @@ -337,6 +428,7 @@ def nested_dict_to_list(nested_dict: dict) -> list[dict]: # FIXME 是tree? return result + def convert_resources_to_type( resources_list: list[dict], resource_type: Union[type, list[type]], *, plr_model: bool = False ) -> Union[list[dict], dict, None, "ResourcePLR"]: @@ -369,7 +461,9 @@ def convert_resources_to_type( return None -def convert_resources_from_type(resources_list, resource_type: Union[type, list[type]], *, is_plr: bool = False) -> Union[list[dict], dict, None, "ResourcePLR"]: +def convert_resources_from_type( + resources_list, resource_type: Union[type, list[type]], *, is_plr: bool = False +) -> Union[list[dict], dict, None, "ResourcePLR"]: """ Convert resources from a given type (PyLabRobot or NestedDict) to flattened list of dictionaries. @@ -432,6 +526,7 @@ def resource_ulab_to_plr(resource: dict, plr_model=False) -> "ResourcePLR": d = resource_ulab_to_plr_inner(resource) """无法通过Resource进行反序列化,例如TipSpot必须内部序列化好,直接用TipSpot序列化会多参数,导致出错""" from pylabrobot.utils.object_parsing import find_subclass + sub_cls = find_subclass(d["type"], ResourcePLR) spect = inspect.signature(sub_cls) if "category" not in spect.parameters: @@ -456,6 +551,7 @@ def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None, w else: print("转换pylabrobot的时候,出现未知类型", source) return "container" + def resource_plr_to_ulab_inner(d: dict, all_states: dict, child=True) -> dict: r = { "id": d["name"], @@ -474,6 +570,7 @@ def resource_plr_to_ulab(resource_plr: "ResourcePLR", parent_name: str = None, w "data": all_states[d["name"]], } return r + d = resource_plr.serialize() all_states = resource_plr.serialize_all_state() r = resource_plr_to_ulab_inner(d, all_states, with_children) @@ -510,8 +607,10 @@ def resource_bioyond_to_plr(bioyond_materials: list[dict], type_mapping: dict = (detail.get("y", 0) - 1) bottle = plr_material[number] bottle.code = detail.get("code", "") - bottle.tracker.liquids = [(detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0)] - + bottle.tracker.liquids = [ + (detail["name"], float(detail.get("quantity", 0)) if detail.get("quantity") else 0) + ] + plr_materials.append(plr_material) if deck and hasattr(deck, "warehouses"): @@ -541,6 +640,7 @@ def initialize_resource(resource_config: dict, resource_type: Any = None) -> Uni None """ from unilabos.registry.registry import lab_registry + resource_class_config = resource_config.get("class", None) if resource_class_config is None: return [resource_config] @@ -570,7 +670,9 @@ def initialize_resource(resource_config: dict, resource_type: Any = None) -> Uni r = resource_plr elif resource_class_config["type"] == "unilabos": res_instance: RegularContainer = RESOURCE(id=resource_config["name"]) - res_instance.ulr_resource = convert_to_ros_msg(Resource, {k:v for k,v in resource_config.items() if k != "class"}) + res_instance.ulr_resource = convert_to_ros_msg( + Resource, {k: v for k, v in resource_config.items() if k != "class"} + ) r = [res_instance.get_ulr_resource_as_dict()] elif isinstance(RESOURCE, dict): r = [RESOURCE.copy()] diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index e7c6e5d6..aca3beba 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -1,25 +1,22 @@ import copy import json -import os import threading import time from typing import Optional, Dict, Any, List import rclpy +from unilabos_msgs.srv._serial_command import SerialCommand_Response + from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher -from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceAdd, SerialCommand # type: ignore +from unilabos_msgs.srv import SerialCommand # type: ignore from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node from rclpy.timer import Timer from unilabos.registry.registry import lab_registry from unilabos.ros.initialize_device import initialize_device_from_dict -from unilabos.ros.msgs.message_converter import ( - convert_to_ros_msg, -) from unilabos.ros.nodes.presets.host_node import HostNode from unilabos.utils import logger from unilabos.config.config import BasicConfig @@ -43,9 +40,9 @@ def exit() -> None: def main( - devices_config: Dict[str, Any] = {}, - resources_config: list=[], - resources_edge_config: list=[], + devices_config: ResourceTreeSet, + resources_config: ResourceTreeSet, + resources_edge_config: list[dict] = [], graph: Optional[Dict[str, Any]] = None, controllers_config: Dict[str, Any] = {}, bridges: List[Any] = [], @@ -73,18 +70,22 @@ def main( if visual != "disable": from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher + # 将 ResourceTreeSet 转换为 list 用于 visual 组件 + resources_list = ( + [node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes] + if resources_config + else [] + ) resource_mesh_manager = ResourceMeshManager( resources_mesh_config, - resources_config, - resource_tracker = host_node.resource_tracker, - device_id = 'resource_mesh_manager', + resources_list, + resource_tracker=host_node.resource_tracker, + device_id="resource_mesh_manager", ) - joint_republisher = JointRepublisher( - 'joint_republisher', - host_node.resource_tracker + joint_republisher = JointRepublisher("joint_republisher", host_node.resource_tracker) + lh_joint_pub = LiquidHandlerJointPublisher( + resources_config=resources_list, resource_tracker=host_node.resource_tracker ) - lh_joint_pub = LiquidHandlerJointPublisher(resources_config=resources_config, - resource_tracker=host_node.resource_tracker) executor.add_node(resource_mesh_manager) executor.add_node(joint_republisher) executor.add_node(lh_joint_pub) @@ -97,9 +98,9 @@ def main( def slave( - devices_config: Dict[str, Any] = {}, - resources_config=[], - resources_edge_config=[], + devices_config: ResourceTreeSet, + resources_config: ResourceTreeSet, + resources_edge_config: list = [], graph: Optional[Dict[str, Any]] = None, controllers_config: Dict[str, Any] = {}, bridges: List[Any] = [], @@ -113,11 +114,12 @@ def slave( executor = rclpy.__executor if not executor: executor = rclpy.__executor = MultiThreadedExecutor() - devices_config_copy = copy.deepcopy(devices_config) - for device_id, device_config in devices_config.items(): - d = initialize_device_from_dict(device_id, device_config) - if d is None: - continue + devices_instances = {} + for device_config in devices_config.root_nodes: + device_id = device_config.res_content.id + if device_config.res_content.type != "device": + d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) + devices_instances[device_id] = d # 默认初始化 # if d is not None and isinstance(d, Node): # executor.add_node(d) @@ -129,20 +131,17 @@ def slave( if visual != "disable": from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher + resource_mesh_manager = ResourceMeshManager( resources_mesh_config, - resources_config, - resource_tracker= DeviceNodeResourceTracker(), - device_id = 'resource_mesh_manager', - ) - joint_republisher = JointRepublisher( - 'joint_republisher', - DeviceNodeResourceTracker() + resources_config, # type: ignore FIXME + resource_tracker=DeviceNodeResourceTracker(), + device_id="resource_mesh_manager", ) + joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker()) executor.add_node(resource_mesh_manager) executor.add_node(joint_republisher) - thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread") thread.start() @@ -151,25 +150,61 @@ def slave( sclient.wait_for_service() request = SerialCommand.Request() - request.command = json.dumps({ - "machine_name": BasicConfig.machine_name, - "type": "slave", - "devices_config": devices_config_copy, - "registry_config": lab_registry.obtain_registry_device_info() - }, ensure_ascii=False, cls=TypeEncoder) + request.command = json.dumps( + { + "machine_name": BasicConfig.machine_name, + "type": "slave", + "devices_config": devices_config.dump(), + "registry_config": lab_registry.obtain_registry_device_info(), + }, + ensure_ascii=False, + cls=TypeEncoder, + ) response = sclient.call_async(request).result() logger.info(f"Slave node info updated.") - rclient = n.create_client(ResourceAdd, "/resources/add") + # 使用新的 c2s_update_resource_tree 服务 + rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree") rclient.wait_for_service() - request = ResourceAdd.Request() - request.resources = [convert_to_ros_msg(Resource, resource) for resource in resources_config] - response = rclient.call_async(request).result() - logger.info(f"Slave resource added.") + # 序列化 ResourceTreeSet 为 JSON + if resources_config: + request = SerialCommand.Request() + request.command = json.dumps( + { + "data": { + "data": resources_config.dump(), + "mount_uuid": "", + "first_add": True, + }, + "action": "add", + }, + ensure_ascii=False, + ) + tree_response: SerialCommand_Response = rclient.call_async(request).result() + uuid_mapping = json.loads(tree_response.response) + for node in resources_config.root_nodes: + if node.res_content.type == "device": + for sub_node in node.children: + # 只有二级子设备 + if sub_node.res_content.type != "device": + device_tracker = devices_instances[node.res_content.id].resource_tracker + resource_instance = device_tracker.figure_resource( # todo: 要换成uuid进行figure + {"name": sub_node.res_content.name}) + device_tracker.loop_update_uuid(resource_instance, uuid_mapping) + else: + logger.error("Slave模式不允许新增非设备节点下的物料") + continue + if tree_response: + logger.info(f"Slave resource tree added. Response: {tree_response.response}") + else: + logger.warning("Slave resource tree add response is None") + else: + logger.info("No resources to add.") while True: time.sleep(1) + if __name__ == "__main__": main() diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index f46c7d6f..9a8b4776 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -5,7 +5,7 @@ import threading import time import traceback import uuid -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, Union +from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, Union, TYPE_CHECKING from concurrent.futures import ThreadPoolExecutor import asyncio @@ -25,7 +25,6 @@ from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialComma from unilabos.resources.container import RegularContainer from unilabos.resources.graphio import ( convert_resources_to_type, - convert_resources_from_type, resource_ulab_to_plr, initialize_resources, dict_to_tree, @@ -49,12 +48,16 @@ from unilabos_msgs.srv import ( ) # type: ignore from unilabos_msgs.msg import Resource # type: ignore -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet, ResourceDict, \ + ResourceDictInstance from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func from unilabos.utils.log import info, debug, warning, error, critical, logger, trace -from unilabos.utils.type_check import get_type_class, TypeEncoder, serialize_result_info, get_result_info_str +from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str + +if TYPE_CHECKING: + from pylabrobot.resources import Resource as ResourcePLR T = TypeVar("T") @@ -178,7 +181,9 @@ class PropertyPublisher: try: self.publisher_ = node.create_publisher(msg_type, f"{name}", 10) except AttributeError as ex: - self.node.lab_logger().error(f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}") + self.node.lab_logger().error( + f"创建发布者 {name} 失败,可能由于注册表有误,类型: {msg_type},错误: {ex}\n{traceback.format_exc()}" + ) self.timer = node.create_timer(self.timer_period, self.publish_property) self.__loop = get_event_loop() str_msg_type = str(msg_type)[8:-2] @@ -187,48 +192,48 @@ class PropertyPublisher: def get_property(self): if asyncio.iscoroutinefunction(self.get_method): # 如果是异步函数,运行事件循环并等待结果 - self.node.lab_logger().trace(f"【PropertyPublisher.get_property】获取异步属性: {self.name}") + self.node.lab_logger().trace(f"【.get_property】获取异步属性: {self.name}") loop = self.__loop if loop: future = asyncio.run_coroutine_threadsafe(self.get_method(), loop) self._value = future.result() return self._value else: - self.node.lab_logger().error(f"【PropertyPublisher.get_property】事件循环未初始化") + self.node.lab_logger().error(f"【.get_property】事件循环未初始化") return None else: # 如果是同步函数,直接调用并返回结果 - self.node.lab_logger().trace(f"【PropertyPublisher.get_property】获取同步属性: {self.name}") + self.node.lab_logger().trace(f"【.get_property】获取同步属性: {self.name}") self._value = self.get_method() return self._value async def get_property_async(self): try: # 获取异步属性值 - self.node.lab_logger().trace(f"【PropertyPublisher.get_property_async】异步获取属性: {self.name}") + self.node.lab_logger().trace(f"【.get_property_async】异步获取属性: {self.name}") self._value = await self.get_method() except Exception as e: - self.node.lab_logger().error(f"【PropertyPublisher.get_property_async】获取异步属性出错: {str(e)}") + self.node.lab_logger().error(f"【.get_property_async】获取异步属性出错: {str(e)}") def publish_property(self): try: - self.node.lab_logger().trace(f"【PropertyPublisher.publish_property】开始发布属性: {self.name}") + self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") value = self.get_property() if self.print_publish: - self.node.lab_logger().trace(f"【PropertyPublisher.publish_property】发布 {self.msg_type}: {value}") + self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") if value is not None: msg = convert_to_ros_msg(self.msg_type, value) self.publisher_.publish(msg) - self.node.lab_logger().trace(f"【PropertyPublisher.publish_property】属性 {self.name} 发布成功") + self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") except Exception as e: - self.node.lab_logger().error(f"【PropertyPublisher.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}") + self.node.lab_logger().error( + f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}" + ) def change_frequency(self, period): # 动态改变定时器频率 self.timer_period = period - self.node.get_logger().info( - f"【PropertyPublisher.change_frequency】修改 {self.name} 定时器周期为: {self.timer_period} 秒" - ) + self.node.get_logger().info(f"【.change_frequency】修改 {self.name} 定时器周期为: {self.timer_period} 秒") # 重置定时器 self.timer.cancel() @@ -262,7 +267,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], print_publish=True, - resource_tracker: Optional["DeviceNodeResourceTracker"] = None, + resource_tracker: "DeviceNodeResourceTracker" = None, # type: ignore ): """ 初始化ROS2设备节点 @@ -313,7 +318,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 创建动作服务 if self.create_action_server: for action_name, action_value_mapping in self._action_value_mappings.items(): - if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith("UniLabJsonCommand"): + if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith( + "UniLabJsonCommand" + ): continue self.create_ros_action_server(action_name, action_value_mapping) @@ -329,14 +336,99 @@ class BaseROS2DeviceNode(Node, Generic[T]): "resource_delete": self.create_client(ResourceDelete, "/resources/delete"), "resource_update": self.create_client(ResourceUpdate, "/resources/update"), "resource_list": self.create_client(ResourceList, "/resources/list"), + "c2s_update_resource_tree": self.create_client(SerialCommand, "/c2s_update_resource_tree"), } - def query_host_name_cb(req, res): + def re_register_device(req, res): self.register_device() self.lab_logger().info("Host要求重新注册当前节点") res.response = "" return res + async def s2c_resource_tree(req: SerialCommand_Request, res: SerialCommand_Response): + """ + 处理资源树更新请求 + + 支持三种操作: + - add: 添加新资源到资源树 + - update: 更新现有资源 + - remove: 从资源树中移除资源 + """ + try: + data = json.loads(req.command) + results = [] + + for i in data: + action = i.get("action") # remove, add, update + resources_uuid: List[str] = i.get("data") # 资源数据 + self.lab_logger().info( + f"[Resource Tree Update] Processing {action} operation, " + f"resources count: {len(resources_uuid)}" + ) + tree_set = None + if action in ["add", "update"]: + response: SerialCommand.Response = await self._resource_clients[ + "c2s_update_resource_tree" + ].call_async( + SerialCommand.Request( + command=json.dumps( + {"data": {"data": resources_uuid, "with_children": False}, "action": "get"} + ) + ) + ) # type: ignore + raw_nodes = json.loads(response.response) + nodes = [ResourceDictInstance.get_resource_instance_from_dict(n) for n in raw_nodes] + uuids_to_nodes = {u["uuid"]: n for u, n in zip(raw_nodes, nodes)} + for u, n in zip(raw_nodes, nodes): + n.res_content.parent = uuids_to_nodes.get(u["parent_uuid"]).res_content if u["parent_uuid"] in uuids_to_nodes else None + print(n.res_content.parent) + tree_set = ResourceTreeSet.from_nested_list(nodes) + try: + if action == "add": + # 添加资源到资源跟踪器 + plr_resource = tree_set.to_plr_resources() # FIXME: 转成plr的实例 + func = getattr(self.driver_instance, "resource_tree_add", None) + if callable(func): + func(tree_set) + results.append({"success": True, "action": "add"}) + elif action == "update": + # 更新资源 + func = getattr(self.driver_instance, "resource_tree_update", None) + if callable(func): + func(tree_set) + results.append({"success": True, "action": "update"}) + elif action == "remove": + # 移除资源 + func = getattr(self.driver_instance, "resource_tree_remove", None) + if callable(func): + resources_instance: List[ResourcePLR] = [self.resource_tracker.uuid_to_resources[i] for + i in resources_uuid] + func(resources_instance) + [r.parent.unassign_child_resource(r) for r in resources_instance if r is not None] + results.append({"success": True, "action": "remove"}) + except Exception as e: + error_msg = f"Error processing {action} operation: {str(e)}" + self.lab_logger().error(f"[Resource Tree Update] {error_msg}") + self.lab_logger().error(traceback.format_exc()) + results.append({"success": False, "action": action, "error": error_msg}) + + # 返回处理结果 + result_json = {"results": results, "total": len(data)} + res.response = json.dumps(result_json, ensure_ascii=False) + self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations") + + except json.JSONDecodeError as e: + error_msg = f"Invalid JSON format: {str(e)}" + self.lab_logger().error(f"[Resource Tree Update] {error_msg}") + res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False) + except Exception as e: + error_msg = f"Unexpected error: {str(e)}" + self.lab_logger().error(f"[Resource Tree Update] {error_msg}") + self.lab_logger().error(traceback.format_exc()) + res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False) + + return res + async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response): # 物料传输到对应的node节点 rclient = self.create_client(ResourceAdd, "/resources/add") @@ -380,12 +472,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): if len(LIQUID_INPUT_SLOT) and LIQUID_INPUT_SLOT[0] == -1: container_instance = request.resources[0] container_query_dict: dict = resources - found_resources = self.resource_tracker.figure_resource({"id": container_query_dict["name"]}, try_mode=True) + found_resources = self.resource_tracker.figure_resource( + {"id": container_query_dict["name"]}, try_mode=True + ) if not len(found_resources): self.resource_tracker.add_resource(container_instance) logger.info(f"添加物料{container_query_dict['name']}到资源跟踪器") else: - assert len(found_resources) == 1, f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统" + assert ( + len(found_resources) == 1 + ), f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统" resource = found_resources[0] if isinstance(resource, Resource): regular_container = RegularContainer(resource.id) @@ -399,12 +495,14 @@ class BaseROS2DeviceNode(Node, Generic[T]): request.resources[0].name = resource["name"] logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict") else: - logger.info(f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}") + logger.info( + f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}" + ) response: ResourceAdd.Response = await rclient.call_async(request) # 应该先add_resource了 final_response = { "created_resources": [ROS2MessageInstance(i).get_python_dict() for i in request.resources], - "liquid_input_resources": [] + "liquid_input_resources": [], } res.response = json.dumps(final_response) # 如果driver自己就有assign的方法,那就使用driver自己的assign方法 @@ -423,12 +521,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) res.response = get_result_info_str("", True, ret) except Exception as e: - self.lab_logger().error(f"运行设备的create_resource出错:{create_resource_func}\n{traceback.format_exc()}") + self.lab_logger().error( + f"运行设备的create_resource出错:{create_resource_func}\n{traceback.format_exc()}" + ) res.response = get_result_info_str(traceback.format_exc(), False, {}) return res # 接下来该根据bind_parent_id进行assign了,目前只有plr可以进行assign,不然没有办法输入到物料系统中 if bind_parent_id != self.node_name: - resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) # 拿到父节点,进行具体assign等操作 + resource = self.resource_tracker.figure_resource( + {"name": bind_parent_id} + ) # 拿到父节点,进行具体assign等操作 # request.resources = [convert_to_ros_msg(Resource, resources)] try: @@ -452,9 +554,15 @@ class BaseROS2DeviceNode(Node, Generic[T]): empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume) plr_instance.set_well_liquids(empty_liquid_info_in) input_wells_ulr = [ - convert_to_ros_msg(Resource, resource_plr_to_ulab(plr_instance.get_well(LIQUID_INPUT_SLOT), with_children=False)) for r in LIQUID_INPUT_SLOT + convert_to_ros_msg( + Resource, + resource_plr_to_ulab(plr_instance.get_well(LIQUID_INPUT_SLOT), with_children=False), + ) + for r in LIQUID_INPUT_SLOT + ] + final_response["liquid_input_resources"] = [ + ROS2MessageInstance(i).get_python_dict() for i in input_wells_ulr ] - final_response["liquid_input_resources"] = [ROS2MessageInstance(i).get_python_dict() for i in input_wells_ulr] res.response = json.dumps(final_response) if isinstance(resource, OTDeck) and "slot" in other_calling_param: other_calling_param["slot"] = int(other_calling_param["slot"]) @@ -499,16 +607,22 @@ class BaseROS2DeviceNode(Node, Generic[T]): # noinspection PyTypeChecker self._service_server: Dict[str, Service] = { - "query_host_name": self.create_service( + "re_register_device": self.create_service( SerialCommand, - f"/srv{self.namespace}/query_host_name", - query_host_name_cb, + f"/srv{self.namespace}/re_register_device", + re_register_device, callback_group=self.callback_group, ), "append_resource": self.create_service( SerialCommand, f"/srv{self.namespace}/append_resource", - append_resource, + append_resource, # type: ignore + callback_group=self.callback_group, + ), + "s2c_resource_tree": self.create_service( + SerialCommand, + f"/srv{self.namespace}/s2c_resource_tree", + s2c_resource_tree, # type: ignore callback_group=self.callback_group, ), } @@ -518,15 +632,17 @@ class BaseROS2DeviceNode(Node, Generic[T]): rclpy.get_global_executor().add_node(self) self.lab_logger().debug(f"ROS节点初始化完成") - async def update_resource(self, resources: List[Any]): - r = ResourceUpdate.Request() - unique_resources = [] - for resource in resources: # resource是list[ResourcePLR] - # 目前更新资源只支持传入plr的对象,后面要更新convert_resources_from_type函数 - converted_list = convert_resources_from_type([resource], resource_type=[object], is_plr=True) - unique_resources.extend([convert_to_ros_msg(Resource, converted) for converted in converted_list]) - r.resources = unique_resources - response = await self._resource_clients["resource_update"].call_async(r) + async def update_resource(self, resources: List["ResourcePLR"]): + r = SerialCommand.Request() + tree_set = ResourceTreeSet.from_plr_resources(resources) + r.command = json.dumps({"data": {"data": tree_set.dump()}, "action": "update"}) + response: SerialCommand_Response = await self._resource_clients["c2s_update_resource_tree"].call_async(r) # type: ignore + try: + uuid_maps = json.loads(response.response) + self.resource_tracker.loop_update_uuid(resources, uuid_maps) + except Exception as e: + self.lab_logger().error(f"更新资源uuid失败: {e}") + self.lab_logger().error(traceback.format_exc()) self.lab_logger().debug(f"资源更新结果: {response}") def register_device(self): @@ -657,7 +773,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = False action_return_value = None - ##### self.lab_logger().info(f"执行动作: {action_name}") + ##### self.lab_logger().info(f"执行动作: {action_name}") goal = goal_handle.request # 从目标消息中提取参数, 并调用对应的方法 @@ -672,7 +788,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().info(f"执行序列动作后续步骤: {action}") self.get_real_function(self.driver_instance, action)[0]() - action_paramtypes = self.get_real_function(self.driver_instance, action_value_mapping["sequence"][0])[1] + action_paramtypes = self.get_real_function(self.driver_instance, action_value_mapping["sequence"][0])[ + 1 + ] else: ACTION, action_paramtypes = self.get_real_function(self.driver_instance, action_name) @@ -718,7 +836,10 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 判断 ACTION 是否需要特殊的物料类型如 pylabrobot.resources.Resource,并做转换 else: resources_list: List[List[Dict[str, Any]]] = [[convert_from_ros_msg(rs) for rs in sub_res_list] for sub_res_list in current_resources] # type: ignore - final_resource = [convert_resources_to_type(sub_res_list, final_type)[0] for sub_res_list in resources_list] + final_resource = [ + convert_resources_to_type(sub_res_list, final_type)[0] + for sub_res_list in resources_list + ] try: action_kwargs[k] = self.resource_tracker.figure_resource(final_resource, try_mode=False) except Exception as e: @@ -745,7 +866,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = True except Exception as e: execution_error = traceback.format_exc() - error(f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + error( + f"异步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) error(traceback.format_exc()) future.add_done_callback(_handle_future_exception) @@ -754,7 +877,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): execution_success = False self.lab_logger().error(f"创建异步任务失败: {traceback.format_exc()}") else: - ##### self.lab_logger().info(f"同步执行动作 {ACTION}") + ##### self.lab_logger().info(f"同步执行动作 {ACTION}") future = self._executor.submit(ACTION, **action_kwargs) def _handle_future_exception(fut): @@ -763,7 +886,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): action_return_value = fut.result() execution_success = True except Exception as e: - error(f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}") + error( + f"同步任务 {ACTION.__name__} 报错了\n{traceback.format_exc()}\n原始输入:{action_kwargs}" + ) future.add_done_callback(_handle_future_exception) @@ -807,7 +932,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().info(f"动作 {action_name} 已取消") return action_type.Result() - ##### self.lab_logger().info(f"动作执行完成: {action_name}") + # self.lab_logger().info(f"动作执行完成: {action_name}") del future # 向Host更新物料当前状态 @@ -816,27 +941,25 @@ class BaseROS2DeviceNode(Node, Generic[T]): if v not in ["unilabos_msgs/Resource", "sequence"]: continue self.lab_logger().info(f"更新资源状态: {k}") - r = ResourceUpdate.Request() # 仅当action_kwargs[k]不为None时尝试转换 - akv = action_kwargs[k] # 已经是完成转换的物料了,只需要转换成ros msg Resource了 + akv = action_kwargs[k] # 已经是完成转换的物料了 apv = action_paramtypes[k] final_type = get_type_class(apv) if final_type is None: continue try: + # 去重:使用 seen 集合获取唯一的资源对象 seen = set() unique_resources = [] - for rs in akv: + for rs in akv: # todo: 这里目前只支持plr的类型 res = self.resource_tracker.parent_resource(rs) # 获取 resource 对象 if id(res) not in seen: seen.add(id(res)) - converted_list = convert_resources_from_type([res], final_type) - unique_resources.extend([convert_to_ros_msg(Resource, converted) for converted in converted_list]) + unique_resources.append(res) - r.resources = unique_resources - - response = await self._resource_clients["resource_update"].call_async(r) - self.lab_logger().debug(f"资源更新结果: {response}") + # 使用新的资源树接口 + if unique_resources: + await self.update_resource(unique_resources) except Exception as e: self.lab_logger().error(f"资源更新失败: {e}") self.lab_logger().error(traceback.format_exc()) @@ -860,7 +983,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): if attr_name in ["success", "reached_goal"]: setattr(result_msg, attr_name, True) elif attr_name == "return_info": - setattr(result_msg, attr_name, get_result_info_str(execution_error, execution_success, action_return_value)) + setattr( + result_msg, + attr_name, + get_result_info_str(execution_error, execution_success, action_return_value), + ) ##### self.lab_logger().info(f"动作 {action_name} 完成并返回结果") return result_msg @@ -887,9 +1014,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): class DeviceInitError(Exception): pass + class JsonCommandInitError(Exception): pass + class ROS2DeviceNode: """ ROS2设备节点类 @@ -980,11 +1109,18 @@ class ROS2DeviceNode: ) else: from unilabos.devices.workstation.workstation_base import WorkstationBase - if issubclass(self._driver_class, WorkstationBase): # 是WorkstationNode的子节点,就要调用WorkstationNodeCreator + + if issubclass( + self._driver_class, WorkstationBase + ): # 是WorkstationNode的子节点,就要调用WorkstationNodeCreator self.driver_is_workstation = True - self._driver_creator = WorkstationNodeCreator(driver_class, children=children, resource_tracker=self.resource_tracker) + self._driver_creator = WorkstationNodeCreator( + driver_class, children=children, resource_tracker=self.resource_tracker + ) else: - self._driver_creator = DeviceClassCreator(driver_class, children=children, resource_tracker=self.resource_tracker) + self._driver_creator = DeviceClassCreator( + driver_class, children=children, resource_tracker=self.resource_tracker + ) if driver_is_ros: driver_params["device_id"] = device_id @@ -999,6 +1135,7 @@ class ROS2DeviceNode: self._ros_node = self._driver_instance # type: ignore elif self.driver_is_workstation: from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode + self._ros_node = ROS2WorkstationNode( protocol_type=driver_params["protocol_type"], children=children, @@ -1038,16 +1175,22 @@ class ROS2DeviceNode: try: target = yaml.safe_load(io.StringIO(string)) except Exception as ex2: - raise JsonCommandInitError(f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}") + raise JsonCommandInitError( + f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}" + ) try: function_name = target["function_name"] function_args = target["function_args"] assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) - assert callable(function), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" + assert callable( + function + ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" return function(**function_args) except KeyError as ex: - raise JsonCommandInitError(f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}") + raise JsonCommandInitError( + f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" + ) async def _execute_driver_command_async(self, string: str): try: @@ -1056,17 +1199,25 @@ class ROS2DeviceNode: try: target = yaml.safe_load(io.StringIO(string)) except Exception as ex2: - raise JsonCommandInitError(f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}") + raise JsonCommandInitError( + f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}" + ) try: function_name = target["function_name"] function_args = target["function_args"] assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) - assert callable(function), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" - assert asyncio.iscoroutinefunction(function), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" + assert callable( + function + ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" + assert asyncio.iscoroutinefunction( + function + ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" return await function(**function_args) except KeyError as ex: - raise JsonCommandInitError(f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}") + raise JsonCommandInitError( + f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" + ) def _start_loop(self): def run_event_loop(): diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 27f53cba..af92de8a 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,5 +1,4 @@ import collections -import copy from dataclasses import dataclass, field import json import threading @@ -13,7 +12,6 @@ from geometry_msgs.msg import Point from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.service import Service -from rosidl_runtime_py import set_message_fields from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.srv import ( ResourceAdd, @@ -23,6 +21,7 @@ from unilabos_msgs.srv import ( ResourceList, SerialCommand, ) # type: ignore +from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unique_identifier_msgs.msg import UUID from unilabos.registry.registry import lab_registry @@ -38,11 +37,16 @@ from unilabos.ros.msgs.message_converter import ( ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode +from unilabos.ros.nodes.resource_tracker import ( + ResourceDictInstance, + ResourceTreeSet, + ResourceTreeInstance, +) from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.type_check import serialize_result_info if TYPE_CHECKING: - from unilabos.app.ws_client import QueueItem + from unilabos.app.ws_client import QueueItem, WSResourceChatData @dataclass @@ -62,6 +66,7 @@ class HostNode(BaseROS2DeviceNode): _device_action_status: ClassVar[collections.defaultdict[str, DeviceActionStatus]] = collections.defaultdict( DeviceActionStatus ) + _resource_tracker: ClassVar[DeviceNodeResourceTracker] = DeviceNodeResourceTracker() # 资源管理器实例 @classmethod def get_instance(cls, timeout=None) -> Optional["HostNode"]: @@ -72,8 +77,8 @@ class HostNode(BaseROS2DeviceNode): def __init__( self, device_id: str, - devices_config: Dict[str, Any], - resources_config: list, + devices_config: ResourceTreeSet, + resources_config: ResourceTreeSet, resources_edge_config: list[dict], physical_setup_graph: Optional[Dict[str, Any]] = None, controllers_config: Optional[Dict[str, Any]] = None, @@ -103,7 +108,7 @@ class HostNode(BaseROS2DeviceNode): action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"], hardware_interface={}, print_publish=False, - resource_tracker=DeviceNodeResourceTracker(), # host node并不是通过initialize 包一层传进来的 + resource_tracker=self._resource_tracker, # host node并不是通过initialize 包一层传进来的 ) # 设置单例实例 @@ -112,7 +117,7 @@ class HostNode(BaseROS2DeviceNode): # 初始化配置 self.server_latest_timestamp = 0.0 # self.devices_config = devices_config - self.resources_config = resources_config + self.resources_config = resources_config # 直接保存 ResourceTreeSet self.resources_edge_config = resources_edge_config self.physical_setup_graph = physical_setup_graph if controllers_config is None: @@ -167,10 +172,11 @@ class HostNode(BaseROS2DeviceNode): self._discover_devices() # 初始化所有本机设备节点,多一次过滤,防止重复初始化 - for device_id, device_config in devices_config.items(): - if device_config.get("type", "device") != "device": + for device_config in devices_config.root_nodes: + device_id = device_config.res_content.id + if device_config.res_content.type != "device": self.lab_logger().debug( - f"[Host Node] Skipping type {device_config['type']} {device_id} already existed, skipping." + f"[Host Node] Skipping type {device_config.res_content.type} {device_id} already existed, skipping." ) continue if device_id not in self.devices_names: @@ -186,58 +192,68 @@ class HostNode(BaseROS2DeviceNode): ].items(): controller_config["update_rate"] = update_rate self.initialize_controller(controller_id, controller_config) - resources_config.insert( - 0, - { - "id": "host_node", - "name": "host_node", - "parent": None, - "type": "device", - "class": "host_node", - "position": {"x": 0, "y": 0, "z": 0}, - "config": {}, - "data": {}, - "children": [], - }, - ) - resource_with_dirs_name = [] - resource_ids_to_instance = {i["id"]: i for i in resources_config} - for res in resources_config: - temp_res = res - res_paths = [res] - while temp_res.get("parent"): - temp_res = resource_ids_to_instance[temp_res.get("parent")] - res_paths.append(temp_res) - dirs = "/" + "/".join([res["id"] for res in res_paths[::-1]]) - new_res = copy.deepcopy(res) - new_res["data"]["unilabos_dirs"] = dirs - resource_with_dirs_name.append(new_res) + # 创建 host_node 作为一个单独的 ResourceTree + + host_node_dict = { + "id": "host_node", + "uuid": str(uuid.uuid4()), + "parent_uuid": "", + "name": "host_node", + "type": "device", + "class": "host_node", + "config": {}, + "data": {}, + "children": [], + "description": "", + "schema": {}, + "model": {}, + "icon": "", + } + + # 创建 host_node 的 ResourceTree + host_node_instance = ResourceDictInstance.get_resource_instance_from_dict(host_node_dict) + host_node_tree = ResourceTreeInstance(host_node_instance) + resources_config.trees.insert(0, host_node_tree) try: for bridge in self.bridges: - if hasattr(bridge, "resource_add"): + if hasattr(bridge, "resource_tree_add") and resources_config: from unilabos.app.web.client import HTTPClient client: HTTPClient = bridge resource_start_time = time.time() - resource_add_res = client.resource_add(add_schema(resources_config)) - # DEBUG ONLY - # for i in resource_with_dirs_name: - # http_req = self.bridges[-1].resource_get(i["data"]["unilabos_dirs"], True) - # res = self._resource_get_process(http_req) - # print(res) + # 传递 ResourceTreeSet 对象,在 client 中转换为字典并获取 UUID 映射 + uuid_mapping = client.resource_tree_add(resources_config, "", True) resource_end_time = time.time() self.lab_logger().info( f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" ) + for edge in self.resources_edge_config: + edge["source_uuid"] = uuid_mapping.get(edge["source_uuid"], edge["source_uuid"]) + edge["target_uuid"] = uuid_mapping.get(edge["target_uuid"], edge["target_uuid"]) resource_add_res = client.resource_edge_add(self.resources_edge_config) resource_edge_end_time = time.time() self.lab_logger().info( f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms" ) + # resources_config 通过各个设备的 resource_tracker 进行uuid更新,利用uuid_mapping + # resources_config 的 root node 是 + for node in resources_config.root_nodes: + if node.res_content.type == "device": + for sub_node in node.children: + # 只有二级子设备 + if sub_node.res_content.type != "device": + # slave节点走c2s更新接口,拿到add自行update uuid + device_tracker = self.devices_instances[node.res_content.id].resource_tracker + resource_instance = device_tracker.figure_resource( # todo: 要换成uuid进行figure + {"name": sub_node.res_content.name}) + device_tracker.loop_update_uuid(resource_instance, uuid_mapping) + else: + resource_instance = self.resource_tracker.figure_resource({"name": node.res_content.name}) + self._resource_tracker.loop_update_uuid(resource_instance, uuid_mapping) + except Exception as ex: self.lab_logger().error("[Host Node-Resource] 添加物料出错!") self.lab_logger().error(traceback.format_exc()) - # 创建定时器,定期发现设备 self._discovery_timer = self.create_timer( discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup() @@ -286,23 +302,23 @@ class HostNode(BaseROS2DeviceNode): self.devices_names[edge_device_id] = namespace self._create_action_clients_for_device(device_id, namespace) self._online_devices.add(device_key) - sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name") + sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device") threading.Thread( target=self._send_re_register, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_query_host_name_{namespace}", + name=f"ROSDevice{self.device_id}_re_register_device_{namespace}", ).start() elif device_key not in self._online_devices: # 设备重新上线 self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}") self._online_devices.add(device_key) - sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name") + sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device") threading.Thread( target=self._send_re_register, args=(sclient,), daemon=True, - name=f"ROSDevice{self.device_id}_query_host_name_{namespace}", + name=f"ROSDevice{self.device_id}_re_register_device_{namespace}", ).start() # 检测离线设备 @@ -473,16 +489,13 @@ class HostNode(BaseROS2DeviceNode): for i in response: res = json.loads(i) new_li.append(res) - return { - "resources": new_li, - "liquid_input_resources": new_li - } + return {"resources": new_li, "liquid_input_resources": new_li} except Exception as ex: pass _n = "\n" raise ValueError(f"创建资源时失败!\n{_n.join(response)}") - def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: + def initialize_device(self, device_id: str, device_config: ResourceDictInstance) -> None: """ 根据配置初始化设备, @@ -495,9 +508,8 @@ class HostNode(BaseROS2DeviceNode): """ self.lab_logger().info(f"[Host Node] Initializing device: {device_id}") - device_config_copy = copy.deepcopy(device_config) try: - d = initialize_device_from_dict(device_id, device_config_copy) + d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) except DeviceClassInvalid as e: self.lab_logger().error(f"[Host Node] Device class invalid: {e}") d = None @@ -677,9 +689,7 @@ class HostNode(BaseROS2DeviceNode): feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg), goal_uuid=goal_uuid_obj, ) - future.add_done_callback( - lambda future: self.goal_response_callback(item, action_id, future) - ) + future.add_done_callback(lambda future: self.goal_response_callback(item, action_id, future)) def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None: """目标响应回调""" @@ -816,8 +826,125 @@ class HostNode(BaseROS2DeviceNode): self._node_info_update_callback, callback_group=ReentrantCallbackGroup(), ), + "c2s_update_resource_tree": self.create_service( + SerialCommand, + "/c2s_update_resource_tree", + self._resource_tree_update_callback, + callback_group=ReentrantCallbackGroup(), + ), } + def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK + resource_tree_set = ResourceTreeSet.load(data["data"]) + mount_uuid = data["mount_uuid"] + first_add = data["first_add"] + + self.lab_logger().info( + f"[Host Node-Resource] Loaded ResourceTreeSet with {len(resource_tree_set.trees)} trees, " + f"{len(resource_tree_set.all_nodes)} total nodes" + ) + + # 处理资源添加逻辑 + success = False + uuid_mapping = {} + if len(self.bridges) > 0: + from unilabos.app.web.client import HTTPClient + + client: HTTPClient = self.bridges[-1] + resource_start_time = time.time() + uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add) + success = True + resource_end_time = time.time() + self.lab_logger().info( + f"[Host Node-Resource] 物料创建上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" + ) + if uuid_mapping: + self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点") + + if success: + from unilabos.resources.graphio import physical_setup_graph + + # 将资源添加到本地图中 + for node in resource_tree_set.all_nodes: + resource_dict = node.res_content.model_dump(by_alias=True) + if resource_dict.get("id") not in physical_setup_graph.nodes: + physical_setup_graph.add_node(resource_dict["id"], **resource_dict) + else: + physical_setup_graph.nodes[resource_dict["id"]]["data"].update(resource_dict.get("data", {})) + + response.response = json.dumps(uuid_mapping) if success else "FAILED" + self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}") + + def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK + uuid_list: List[str] = data["data"] + with_children: bool = data["with_children"] + from unilabos.app.web.client import http_client + resource_response = http_client.resource_tree_get(uuid_list, with_children) + response.response = json.dumps(resource_response) + + def _resource_tree_action_remove_callback(self, data: dict, response: SerialCommand_Response): + """ + 子节点通知Host物料树删除 + """ + self.lab_logger().info(f"[Host Node-Resource] Resource tree remove request received") + response.response = "OK" + self.lab_logger().info(f"[Host Node-Resource] Resource tree remove completed") + + def _resource_tree_action_update_callback(self, data: dict, response: SerialCommand_Response): + """ + 子节点通知Host物料树更新 + """ + resource_tree_set = ResourceTreeSet.load(data["data"]) + + self.lab_logger().info( + f"[Host Node-Resource] Loaded ResourceTreeSet with {len(resource_tree_set.trees)} trees, " + f"{len(resource_tree_set.all_nodes)} total nodes" + ) + + from unilabos.app.web.client import http_client + resource_start_time = time.time() + uuid_mapping = http_client.resource_tree_update(resource_tree_set, "", False) + success = bool(uuid_mapping) + resource_end_time = time.time() + self.lab_logger().info( + f"[Host Node-Resource] 物料更新上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" + ) + if uuid_mapping: + self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点") + # 还需要加入到资源图中,暂不实现,考虑资源图新的获取方式 + response.response = json.dumps(uuid_mapping) + self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}") + + def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response): + """ + 子节点通知Host物料树更新 + + 接收序列化的 ResourceTreeSet 数据并进行处理 + """ + self.lab_logger().info(f"[Host Node-Resource] Resource tree add request received") + try: + # 解析请求数据 + data = json.loads(request.command) + action = data["action"] + data = data["data"] + if action == "add": + self._resource_tree_action_add_callback(data, response) + elif action == "get": + self._resource_tree_action_get_callback(data, response) + elif action == "update": + self._resource_tree_action_update_callback(data, response) + elif action == "remove": + self._resource_tree_action_remove_callback(data, response) + else: + self.lab_logger().error(f"[Host Node-Resource] Invalid action: {action}") + response.response = "ERROR" + except Exception as e: + self.lab_logger().error(f"[Host Node-Resource] Error adding resource tree: {e}") + self.lab_logger().error(traceback.format_exc()) + response.response = f"ERROR: {str(e)}" + + return response + def _node_info_update_callback(self, request, response): """ 更新节点信息回调 @@ -907,7 +1034,13 @@ class HostNode(BaseROS2DeviceNode): return response except Exception as e: self.lab_logger().error(f"[Host Node-Resource] Error retrieving from bridge: {str(e)}") - r = [resource for resource in self.resources_config if resource.get("id") == request.id] + # 从 ResourceTreeSet 中查找资源 + resources_list = ( + [node.res_content.model_dump(by_alias=True) for node in self.resources_config.all_nodes] + if self.resources_config + else [] + ) + r = [resource for resource in resources_list if resource.get("id") == request.id] self.lab_logger().debug(f"[Host Node-Resource] Retrieved from local: {len(r)} resources") response.resources = [convert_to_ros_msg(Resource, resource) for resource in r] return response @@ -1094,6 +1227,7 @@ class HostNode(BaseROS2DeviceNode): else: self.lab_logger().warning("⚠️ 无法获取服务端任务下发时间,跳过任务延迟分析") + raw_delay_ms = -1 corrected_delay_ms = -1 self.lab_logger().info("=" * 60) @@ -1129,3 +1263,78 @@ class HostNode(BaseROS2DeviceNode): ) else: self.lab_logger().warning("⚠️ 收到无效的Pong响应(缺少ping_id)") + + def notify_resource_tree_update( + self, device_id: str, action: str, resource_uuid_list: List[str] + ) -> bool: + """ + 通知设备节点更新资源树 + + Args: + device_id: 目标设备ID + action: 操作类型 "add", "update", "remove" + resource_uuid_list: 资源UUIDs + + Returns: + bool: 操作是否成功 + """ + try: + # 检查设备是否存在 + if device_id not in self.devices_names: + self.lab_logger().error(f"[Host Node-Resource] Device {device_id} not found in devices_names") + return False + + namespace = self.devices_names[device_id] + device_key = f"{namespace}/{device_id}" + + # 检查设备是否在线 + if device_key not in self._online_devices: + self.lab_logger().error(f"[Host Node-Resource] Device {device_key} is offline") + return False + + # 构建服务地址 + srv_address = f"/srv{namespace}/s2c_resource_tree" + self.lab_logger().info(f"[Host Node-Resource] Notifying {device_id} for resource tree {action} operation") + + # 创建服务客户端 + sclient = self.create_client(SerialCommand, srv_address) + + # 等待服务可用(设置超时) + if not sclient.wait_for_service(timeout_sec=5.0): + self.lab_logger().error(f"[Host Node-Resource] Service {srv_address} not available") + return False + + # 构建请求数据 + request_data = [ + { + "action": action, + "data": resource_uuid_list, + } + ] + + # 创建请求 + request = SerialCommand.Request() + request.command = json.dumps(request_data, ensure_ascii=False) + + # 发送异步请求 + future = sclient.call_async(request) + + # 等待响应 + timeout = 30.0 + start_time = time.time() + while not future.done(): + if time.time() - start_time > timeout: + self.lab_logger().error(f"[Host Node-Resource] Timeout waiting for response from {device_id}") + return False + time.sleep(0.01) + + response = future.result() + self.lab_logger().info( + f"[Host Node-Resource] Resource tree {action} notification completed for {device_id}" + ) + return True + + except Exception as e: + self.lab_logger().error(f"[Host Node-Resource] Error notifying resource tree update: {str(e)}") + self.lab_logger().error(traceback.format_exc()) + return False diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index fdf27b94..cb801222 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -130,7 +130,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})" ) - self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") + self.lab_logger().info(f"ROS2WorkstationNode {device_id} initialized with protocols: {self.protocol_names}") def _setup_protocol_names(self, protocol_type): # 处理协议类型 diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 06fc1c27..82925851 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -1,11 +1,528 @@ -from typing import List, Tuple, Any, Dict, TYPE_CHECKING -from abc import ABC, abstractmethod +import uuid +from pydantic import BaseModel, field_serializer, field_validator +from pydantic import Field +from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING from unilabos.utils.log import logger if TYPE_CHECKING: - from unilabos.devices.workstation.workstation_base import WorkstationBase - from pylabrobot.resources import Resource as PLRResource + # from unilabos.devices.workstation.workstation_base import WorkstationBase + from pylabrobot.resources import Resource as PLRResource, corning_6_wellplate_16point8ml_flat + + +class ResourceDictPositionSize(BaseModel): + depth: float = Field(description="Depth", default=0.0) + width: float = Field(description="Width", default=0.0) + height: float = Field(description="Height", default=0.0) + + +class ResourceDictPositionScale(BaseModel): + x: float = Field(description="x scale", default=0.0) + y: float = Field(description="y scale", default=0.0) + z: float = Field(description="z scale", default=0.0) + + +class ResourceDictPositionObject(BaseModel): + x: float = Field(description="X coordinate", default=0.0) + y: float = Field(description="Y coordinate", default=0.0) + z: float = Field(description="Z coordinate", default=0.0) + + +class ResourceDictPosition(BaseModel): + size: ResourceDictPositionSize = Field(description="Resource size", default_factory=ResourceDictPositionSize) + scale: ResourceDictPositionScale = Field(description="Resource scale", default_factory=ResourceDictPositionScale) + layout: Literal["2d"] = Field(description="Resource layout", default="2d") + position: ResourceDictPositionObject = Field( + description="Resource position", default_factory=ResourceDictPositionObject + ) + position3d: ResourceDictPositionObject = Field( + description="Resource position in 3D space", default_factory=ResourceDictPositionObject + ) + rotation: ResourceDictPositionObject = Field( + description="Resource rotation", default_factory=ResourceDictPositionObject + ) + + +# 统一的资源字典模型,parent 自动序列化为 parent_uuid,children 不序列化 +class ResourceDict(BaseModel): + id: str = Field(description="Resource ID") + uuid: str = Field(description="Resource UUID") + name: str = Field(description="Resource name") + description: str = Field(description="Resource description", default="") + schema: Dict[str, Any] = Field(description="Resource schema", default_factory=dict) + model: Dict[str, Any] = Field(description="Resource model", default_factory=dict) + icon: str = Field(description="Resource icon", default="") + parent: Optional["ResourceDict"] = Field( + description="Parent resource object", default=None, serialization_alias="parent_uuid" + ) + type: Literal["device"] | str = Field(description="Resource type") + klass: str = Field(alias="class", description="Resource class name") + position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) + config: Dict[str, Any] = Field(description="Resource configuration") + data: Dict[str, Any] = Field(description="Resource data") + + @field_serializer("parent") + def _serialize_parent(self, parent: Optional["ResourceDict"]): + return self.parent_uuid + + @field_validator("parent", mode="before") + @classmethod + def _deserialize_parent(cls, parent: Optional["ResourceDict"]): + if isinstance(parent, ResourceDict): + return parent + else: + return None + + @property + def parent_uuid(self) -> str: + """获取父节点的UUID""" + return self.parent.uuid if self.parent is not None else "" + + @property + def parent_name(self) -> Optional[str]: + """获取父节点的UUID""" + return self.parent.name if self.parent is not None else None + + @property + def is_root_node(self) -> bool: + """判断资源是否为根节点""" + return self.parent is None + + +class GraphData(BaseModel): + """图数据结构,包含节点和边""" + + nodes: List["ResourceTreeInstance"] = Field(description="Resource nodes list", default_factory=list) + links: List[Dict[str, Any]] = Field(description="Resource links/edges list", default_factory=list) + + +class ResourceDictInstance(object): + """ResourceDict的实例,同时提供一些方法""" + + def __init__(self, res_content: "ResourceDict"): + self.res_content = res_content + self.children = [] + self.typ = "dict" + + @classmethod + def get_resource_instance_from_dict(cls, content: Dict[str, Any]) -> "ResourceDictInstance": + """从字典创建资源实例""" + if "id" not in content: + content["id"] = content["name"] + if "uuid" not in content: + content["uuid"] = str(uuid.uuid4()) + if "description" in content and content["description"] is None: + del content["description"] + if "model" in content and content["model"] is None: + del content["model"] + if "schema" in content and content["schema"] is None: + del content["schema"] + if "x" in content.get("position", {}): + # 说明是老版本的position格式,转换成新的 + content["position"] = {"position": content["position"]} + if not content.get("class"): + content["class"] = "" + if not content.get("config"): # todo: 后续从后端保证字段非空 + content["config"] = {} + if not content.get("data"): + content["data"] = {} + return ResourceDictInstance(ResourceDict.model_validate(content)) + + def get_nested_dict(self) -> Dict[str, Any]: + """获取资源实例的嵌套字典表示""" + res_dict = self.res_content.model_dump(by_alias=True) + res_dict["children"] = {child.res_content.name: child.get_nested_dict() for child in self.children} + res_dict["parent"] = self.res_content.parent_name + res_dict["position"] = self.res_content.position.position.model_dump() + return res_dict + + +class ResourceTreeInstance(object): + """ + 资源树,表示一个根节点及其所有子节点的层次结构,继承ResourceDictInstance表示自己是根节点 + """ + + @staticmethod + def _build_uuid_map(resource_list: List[ResourceDictInstance]) -> Dict[str, ResourceDictInstance]: + """构建uuid到资源对象的映射,并检查重复""" + uuid_map: Dict[str, ResourceDictInstance] = {} + for res_instance in resource_list: + res = res_instance.res_content + if res.uuid in uuid_map: + raise ValueError(f"发现重复的uuid: {res.uuid}") + uuid_map[res.uuid] = res_instance + return uuid_map + + @staticmethod + def _build_uuid_instance_map( + resource_list: List[ResourceDictInstance], + ) -> Dict[str, ResourceDictInstance]: + """构建uuid到资源实例的映射""" + return {res_instance.res_content.uuid: res_instance for res_instance in resource_list} + + @staticmethod + def _collect_tree_nodes( + root_instance: ResourceDictInstance, uuid_map: Dict[str, ResourceDict] + ) -> List[ResourceDictInstance]: + """使用BFS收集属于某个根节点的所有节点""" + # BFS遍历,根据parent_uuid字段找到所有属于这棵树的节点 + tree_nodes = [root_instance] + visited = {root_instance.res_content.uuid} + queue = [root_instance.res_content.uuid] + + while queue: + current_uuid = queue.pop(0) + # 查找所有parent_uuid指向当前节点的子节点 + for uuid_str, res in uuid_map.items(): + if res.parent_uuid == current_uuid and uuid_str not in visited: + child_instance = ResourceDictInstance(res) + tree_nodes.append(child_instance) + visited.add(uuid_str) + queue.append(uuid_str) + + return tree_nodes + + def __init__(self, resource: ResourceDictInstance): + self.root_node = resource + self._validate_tree() + + def _validate_tree(self): + """ + 验证树结构的一致性 + - 验证uuid唯一性 + - 验证parent-children关系一致性 + + Raises: + ValueError: 当发现不一致时 + """ + known_uuids: set = set() + + def validate_node(node: ResourceDictInstance): + # 检查uuid唯一性 + if node.res_content.uuid in known_uuids: + raise ValueError(f"发现重复的uuid: {node.res_content.uuid}") + if node.res_content.uuid: + known_uuids.add(node.res_content.uuid) + else: + print(f"警告: 资源 {node.res_content.id} 没有uuid") + + # 验证并递归处理子节点 + for child in node.children: + if child.res_content.parent != node.res_content: + parent_id = child.res_content.parent.id if child.res_content.parent else None + raise ValueError( + f"节点 {child.res_content.id} 的parent引用不正确,应该指向 {node.res_content.id},但实际指向 {parent_id}" + ) + validate_node(child) + + validate_node(self.root_node) + + def get_all_nodes(self) -> List[ResourceDictInstance]: + """ + 获取树中的所有节点(深度优先遍历) + + Returns: + 所有节点的资源实例列表 + """ + nodes = [] + + def collect_nodes(node: ResourceDictInstance): + nodes.append(node) + for child in node.children: + collect_nodes(child) + + collect_nodes(self.root_node) + return nodes + + def find_by_uuid(self, target_uuid: str) -> Optional[ResourceDictInstance]: + """ + 通过uuid查找节点 + + Args: + target_uuid: 目标uuid + + Returns: + 找到的节点资源实例,如果没找到返回None + """ + + def search(node: ResourceDictInstance) -> Optional[ResourceDictInstance]: + if node.res_content.uuid == target_uuid: + return node + for child in node.children: + res = search(child) + if res: + return res + return None + + result = search(self.root_node) + return result + + +class ResourceTreeSet(object): + """ + 多个根节点的resource集合,包含多个ResourceTree + """ + + def __init__(self, resource_list: List[List[ResourceDictInstance]] | List[ResourceTreeInstance]): + """ + 初始化资源树集合 + + Args: + resource_list: 可以是以下两种类型之一: + - List[ResourceTree]: 已经构建好的树列表 + - List[List[ResourceInstanceDict]]: 嵌套列表,每个内部列表代表一棵树 + + Raises: + TypeError: 当传入不支持的类型时 + """ + if not resource_list: + self.trees: List[ResourceTreeInstance] = [] + elif isinstance(resource_list[0], ResourceTreeInstance): + # 已经是ResourceTree列表 + self.trees = cast(List[ResourceTreeInstance], resource_list) + elif isinstance(resource_list[0], list): + pass + else: + raise TypeError( + f"不支持的类型: {type(resource_list[0])}。" + f"ResourceTreeSet 只接受 List[ResourceTree] 或 List[List[ResourceInstanceDict]]" + ) + + @classmethod + def from_plr_resources(cls, resources: List["PLRResource"]) -> "ResourceTreeSet": + """ + 从plr资源创建ResourceTreeSet + """ + + def replace_plr_type(source: str): + replace_info = { + "plate": "plate", + "well": "well", + "tip_spot": "container", + "trash": "container", + "deck": "deck", + "tip_rack": "container", + } + if source in replace_info: + return replace_info[source] + else: + print("转换pylabrobot的时候,出现未知类型", source) + return "container" + + def build_uuid_mapping(res: "PLRResource", uuid_list: list): + """递归构建uuid映射字典""" + uuid_list.append(getattr(res, "unilabos_uuid", "")) + for child in res.children: + build_uuid_mapping(child, uuid_list) + + def resource_plr_inner( + d: dict, parent_resource: Optional[ResourceDict], states: dict, uuids: list + ) -> ResourceDictInstance: + current_uuid = uuids.pop(0) + + # 先构建当前节点的字典(不包含children) + r_dict = { + "id": d["name"], + "uuid": current_uuid, + "name": d["name"], + "parent": parent_resource, # 直接传入 ResourceDict 对象 + "type": replace_plr_type(d.get("category", "")), + "class": d.get("class", ""), + "position": ( + {"x": d["location"]["x"], "y": d["location"]["y"], "z": d["location"]["z"]} + if d["location"] + else {"x": 0, "y": 0, "z": 0} + ), + "config": {k: v for k, v in d.items() if k not in ["name", "children", "parent_name", "location"]}, + "data": states[d["name"]], + } + + # 先转换为 ResourceDictInstance,获取其中的 ResourceDict + current_instance = ResourceDictInstance.get_resource_instance_from_dict(r_dict) + current_resource = current_instance.res_content + + # 递归处理子节点,传入当前节点的 ResourceDict 作为 parent + current_instance.children = [ + resource_plr_inner(child, current_resource, states, uuids) for child in d["children"] + ] + + return current_instance + + trees = [] + for resource in resources: + # 构建uuid列表 + uuid_list = [] + build_uuid_mapping(resource, uuid_list) + + serialized_data = resource.serialize() + all_states = resource.serialize_all_state() + + # 根节点没有父节点,传入 None + root_instance = resource_plr_inner(serialized_data, None, all_states, uuid_list) + tree_instance = ResourceTreeInstance(root_instance) + trees.append(tree_instance) + return cls(trees) + + def to_plr_resources(self) -> Tuple[List["PLRResource"], List[Dict[str, str]]]: + """ + 将 ResourceTreeSet 转换为 PLR 资源列表 + + Returns: + Tuple[List[PLRResource], List[Dict[str, str]]]: + - PLR 资源实例列表 + - 每个资源对应的 name_to_uuid 映射字典列表 + """ + from unilabos.resources.graphio import resource_ulab_to_plr + + plr_resources = [] + name_to_uuid_maps = [] + + def build_name_to_uuid_map(node: ResourceDictInstance, result: Dict[str, str]): + """递归构建 name 到 uuid 的映射""" + result[node.res_content.name] = node.res_content.uuid + for child in node.children: + build_name_to_uuid_map(child, result) + + for tree in self.trees: + # 构建 name_to_uuid 映射 + name_to_uuid = {} + build_name_to_uuid_map(tree.root_node, name_to_uuid) + + # 使用 get_nested_dict 获取字典表示 + resource_dict = tree.root_node.get_nested_dict() + + # 判断是否包含 model(Deck 下没有 model) + plr_model = tree.root_node.res_content.type != "deck" + + try: + # 使用 resource_ulab_to_plr 创建 PLR 资源实例 + plr_resource = resource_ulab_to_plr(resource_dict, plr_model=plr_model) + + # 设置 unilabos_uuid 属性到资源及其所有子节点 + def set_uuid_recursive(plr_res: "PLRResource", node: ResourceDictInstance): + """递归设置 PLR 资源的 unilabos_uuid 属性""" + setattr(plr_res, "unilabos_uuid", node.res_content.uuid) + # 匹配子节点(通过 name) + for plr_child in plr_res.children: + matching_node = next( + (child for child in node.children if child.res_content.name == plr_child.name), + None, + ) + if matching_node: + set_uuid_recursive(plr_child, matching_node) + + set_uuid_recursive(plr_resource, tree.root_node) + + plr_resources.append(plr_resource) + name_to_uuid_maps.append(name_to_uuid) + except Exception as e: + logger.error(f"转换 PLR 资源失败: {e}") + import traceback + + logger.error(f"堆栈: {traceback.format_exc()}") + raise + + return plr_resources, name_to_uuid_maps + + @classmethod + def from_nested_list(cls, nested_list: List[ResourceDictInstance]) -> "ResourceTreeSet": + """ + 从扁平化的资源列表创建ResourceTreeSet,自动按根节点分组 + + Args: + nested_list: 扁平化的资源实例列表,可能包含多个根节点 + + Returns: + ResourceTreeSet实例 + + Raises: + ValueError: 当没有找到任何根节点时 + """ + # 找到所有根节点 + known_uuids = {res_instance.res_content.uuid for res_instance in nested_list} + root_instances = [ + ResourceTreeInstance(res_instance) + for res_instance in nested_list + if res_instance.res_content.is_root_node or res_instance.res_content.parent_uuid not in known_uuids + ] + return cls(root_instances) + + @property + def root_nodes(self) -> List[ResourceDictInstance]: + """ + 获取所有树的根节点 + + Returns: + 所有根节点的资源实例列表 + """ + return [tree.root_node for tree in self.trees] + + @property + def all_nodes(self) -> List[ResourceDictInstance]: + """ + 获取所有树中的所有节点 + + Returns: + 所有节点的资源实例列表 + """ + return [node for tree in self.trees for node in tree.get_all_nodes()] + + def find_by_uuid(self, target_uuid: str) -> Optional[ResourceDictInstance]: + """ + 在所有树中通过uuid查找节点 + + Args: + target_uuid: 目标uuid + + Returns: + 找到的节点资源实例,如果没找到返回None + """ + for tree in self.trees: + result = tree.find_by_uuid(target_uuid) + if result: + return result + return None + + def dump(self) -> List[List[Dict[str, Any]]]: + """ + 将 ResourceTreeSet 序列化为嵌套列表格式 + + 序列化时: + - parent 自动转换为 parent_uuid(在 ResourceDict.model_dump 中处理) + - children 不会被序列化(exclude=True) + + Returns: + List[List[Dict]]: 每个内层列表代表一棵树的扁平化资源字典列表 + """ + result = [] + for tree in self.trees: + # 获取树的所有节点并序列化 + tree_nodes = [node.res_content.model_dump(by_alias=True) for node in tree.get_all_nodes()] + result.append(tree_nodes) + return result + + @classmethod + def load(cls, data: List[List[Dict[str, Any]]]) -> "ResourceTreeSet": + """ + 从序列化的嵌套列表格式反序列化为 ResourceTreeSet + + Args: + data: List[List[Dict]]: 序列化的数据,每个内层列表代表一棵树 + + Returns: + ResourceTreeSet: 反序列化后的资源树集合 + """ + # 将每个字典转换为 ResourceInstanceDict + # FIXME: 需要重新确定parent关系 + nested_lists = [] + for tree_data in data: + flatten_instances = [ + ResourceDictInstance.get_resource_instance_from_dict(node_dict) for node_dict in tree_data + ] + nested_lists.append(flatten_instances) + + # 使用现有的构造函数创建 ResourceTreeSet + return cls(nested_lists) class DeviceNodeResourceTracker(object): @@ -13,6 +530,7 @@ class DeviceNodeResourceTracker(object): def __init__(self): self.resources = [] self.resource2parent_resource = {} + self.uuid_to_resources = {} pass def prefix_path(self, resource): @@ -24,6 +542,109 @@ class DeviceNodeResourceTracker(object): return resource_prefix_path + def map_uuid_to_resource(self, resource, uuid_map: Dict[str, str]): + for old_uuid, new_uuid in uuid_map.items(): + if old_uuid != new_uuid: + if old_uuid in self.uuid_to_resources: + instance = self.uuid_to_resources.pop(old_uuid) + if isinstance(resource, dict): + resource["uuid"] = new_uuid + else: # 实例的 + setattr(instance, "unilabos_uuid", new_uuid) + self.uuid_to_resources[new_uuid] = instance + print(f"更新uuid映射: {old_uuid} -> {new_uuid} | {instance}") + + def loop_set_uuid(self, resource, name_to_uuid_map: Dict[str, str]) -> int: + """ + 递归遍历资源树,根据 name 设置所有节点的 uuid + + Args: + resource: 资源对象(可以是dict或实例) + name_to_uuid_map: name到uuid的映射字典,{name: uuid} + + Returns: + 更新的资源数量 + """ + if isinstance(resource, list): + return sum(self.loop_set_uuid(r, name_to_uuid_map) for r in resource) + + update_count = 0 + + # 先递归处理所有子节点 + children = getattr(resource, "children", []) + for child in children: + update_count += self.loop_set_uuid(child, name_to_uuid_map) + + # 获取当前资源的name + if isinstance(resource, dict): + resource_name = resource.get("name") + else: + resource_name = getattr(resource, "name", None) + + # 如果name在映射中,则设置uuid + if resource_name and resource_name in name_to_uuid_map: + new_uuid = name_to_uuid_map[resource_name] + # 更新资源的uuid + if isinstance(resource, dict): + resource["uuid"] = new_uuid + else: + # 对于PLR资源,设置unilabos_uuid + setattr(resource, "unilabos_uuid", new_uuid) + self.uuid_to_resources[new_uuid] = resource + update_count += 1 + logger.debug(f"设置资源UUID: {resource_name} -> {new_uuid}") + + return update_count + + def loop_update_uuid(self, resource, uuid_map: Dict[str, str]) -> int: + """ + 递归遍历资源树,更新所有节点的uuid + + Args: + resource: 资源对象(可以是dict或实例) + uuid_map: uuid映射字典,{old_uuid: new_uuid} + + Returns: + 更新的资源数量 + """ + if isinstance(resource, list): + return sum(self.loop_update_uuid(r, uuid_map) for r in resource) + + update_count = 0 + + # 先递归处理所有子节点 + children = getattr(resource, "children", []) + for child in children: + update_count += self.loop_update_uuid(child, uuid_map) + + # 获取当前资源的uuid + if isinstance(resource, dict): + current_uuid = resource.get("uuid") + else: + current_uuid = getattr(resource, "unilabos_uuid", None) + + # 如果当前uuid在映射中,则更新 + if current_uuid and current_uuid in uuid_map: + new_uuid = uuid_map[current_uuid] + if current_uuid != new_uuid: + # 更新资源的uuid + if isinstance(resource, dict): + resource["uuid"] = new_uuid + else: + # 对于PLR资源,更新unilabos_uuid + if hasattr(resource, "unilabos_uuid"): + setattr(resource, "unilabos_uuid", new_uuid) + + # 更新uuid_to_resources映射 + if current_uuid in self.uuid_to_resources: + instance = self.uuid_to_resources.pop(current_uuid) + self.uuid_to_resources[new_uuid] = instance + + update_count += 1 + logger.debug(f"更新uuid: {current_uuid} -> {new_uuid} | {resource}") + + return update_count + def parent_resource(self, resource): if id(resource) in self.resource2parent_resource: return self.resource2parent_resource[id(resource)] @@ -47,12 +668,12 @@ class DeviceNodeResourceTracker(object): ): # 临时处理,要删除的,driver有太多类型错误标注 return [self.figure_resource(r, try_mode) for r in query_resource.values()] res_id = ( - query_resource.id + query_resource.id # type: ignore if hasattr(query_resource, "id") else (query_resource.get("id") if isinstance(query_resource, dict) else None) ) res_name = ( - query_resource.name + query_resource.name # type: ignore if hasattr(query_resource, "name") else (query_resource.get("name") if isinstance(query_resource, dict) else None) ) @@ -65,7 +686,7 @@ class DeviceNodeResourceTracker(object): for r in self.resources: if isinstance(query_resource, dict): res_list.extend( - self.loop_find_resource(r, resource_cls_type, identifier_key, query_resource[identifier_key]) + self.loop_find_resource(r, object, identifier_key, query_resource[identifier_key]) ) else: res_list.extend( @@ -93,7 +714,7 @@ class DeviceNodeResourceTracker(object): res_list.extend( self.loop_find_resource(child, target_resource_cls_type, identifier_key, compare_value, resource) ) - if target_resource_cls_type == type(resource): + if issubclass(type(resource), target_resource_cls_type): if target_resource_cls_type == dict: if identifier_key in resource: if resource[identifier_key] == compare_value: @@ -111,3 +732,137 @@ class DeviceNodeResourceTracker(object): if getattr(res, k) == v: new_list.append(res) return new_list + + +if __name__ == "__main__": + import sys + import os + + a = corning_6_wellplate_16point8ml_flat("a").serialize() + # 尝试导入 pylabrobot,如果失败则尝试从本地 pylabrobot_repo 导入 + try: + from pylabrobot.resources import Resource, Coordinate + except ImportError: + # 尝试添加本地 pylabrobot_repo 路径 + # __file__ is unilabos/ros/nodes/resource_tracker.py + # We need to go up 4 levels to get to project root + current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + pylabrobot_path = os.path.join(current_dir, "pylabrobot_repo") + if os.path.exists(pylabrobot_path): + sys.path.insert(0, pylabrobot_path) + try: + from pylabrobot.resources import Resource, Coordinate + except ImportError: + print("pylabrobot 未安装,且无法从本地 pylabrobot_repo 导入") + print("如需运行测试,请先安装: pip install pylabrobot") + exit(0) + else: + print("pylabrobot 未安装,跳过测试") + print("如需运行测试,请先安装: pip install pylabrobot") + exit(0) + + # 创建一个简单的测试资源 + def create_test_resource(name: str): + """创建一个简单的测试用资源""" + # 创建父资源 + parent = Resource(name=name, size_x=100.0, size_y=100.0, size_z=50.0, category="container") + + # 添加一些子资源 + for i in range(3): + child = Resource(name=f"{name}_child_{i}", size_x=20.0, size_y=20.0, size_z=10.0, category="container") + child.location = Coordinate(x=i * 30, y=0, z=0) + parent.assign_child_resource(child, location=child.location) + + return parent + + print("=" * 80) + print("测试 1: 基本序列化和反序列化") + print("=" * 80) + + # 创建原始 PLR 资源 + original_resource = create_test_resource("test_resource") + print(f"\n1. 创建原始 PLR 资源: {original_resource.name}") + print(f" 子节点数量: {len(original_resource.children)}") + + # 手动设置 unilabos_uuid(模拟实际使用场景) + def set_test_uuid(res: "PLRResource", prefix="uuid"): + """递归设置测试用的 uuid""" + import uuid as uuid_module + + setattr(res, "unilabos_uuid", f"{prefix}-{uuid_module.uuid4()}") + for i, child in enumerate(res.children): + set_test_uuid(child, f"{prefix}-{i}") + + set_test_uuid(original_resource, "root") + print(f" 根节点 UUID: {getattr(original_resource, 'unilabos_uuid', 'None')}") + + # 转换为 ResourceTreeSet (from_plr_resources) + print("\n2. 使用 from_plr_resources 转换为 ResourceTreeSet") + resource_tree_set = ResourceTreeSet.from_plr_resources([original_resource]) + print(f" 树的数量: {len(resource_tree_set.trees)}") + print(f" 根节点名称: {resource_tree_set.root_nodes[0].res_content.name}") + print(f" 根节点 UUID: {resource_tree_set.root_nodes[0].res_content.uuid}") + print(f" 总节点数: {len(resource_tree_set.all_nodes)}") + + # 转换回 PLR 资源 (to_plr_resources) + print("\n3. 使用 to_plr_resources 转换回 PLR 资源") + try: + plr_resources, name_to_uuid_maps = resource_tree_set.to_plr_resources() + except ModuleNotFoundError as e: + print(f" ❌ 缺少依赖模块: {e}") + print(" 提示: to_plr_resources 方法实现完成,但需要安装额外的依赖(如 networkx)") + print("\n测试部分完成!from_plr_resources 已验证正常工作。") + exit(0) + print(f" PLR 资源数量: {len(plr_resources)}") + print(f" name_to_uuid 映射数量: {len(name_to_uuid_maps)}") + + restored_resource = plr_resources[0] + name_to_uuid = name_to_uuid_maps[0] + + print(f" 恢复的资源名称: {restored_resource.name}") + print(f" 恢复的资源子节点数: {len(restored_resource.children)}") + print(f" 恢复的资源 UUID: {getattr(restored_resource, 'unilabos_uuid', 'None')}") + print(f" name_to_uuid 映射条目数: {len(name_to_uuid)}") + + # 验证 UUID 映射 + print("\n4. 验证 UUID 映射") + original_uuid = getattr(original_resource, "unilabos_uuid", None) + restored_uuid = getattr(restored_resource, "unilabos_uuid", None) + print(f" 原始根节点 UUID: {original_uuid}") + print(f" 恢复后根节点 UUID: {restored_uuid}") + print(f" UUID 匹配: {original_uuid == restored_uuid}") + + # 验证 name_to_uuid 映射完整性 + def count_all_nodes(res: "PLRResource") -> int: + """递归统计节点总数""" + return 1 + sum(count_all_nodes(child) for child in res.children) + + original_node_count = count_all_nodes(original_resource) + restored_node_count = count_all_nodes(restored_resource) + mapping_count = len(name_to_uuid) + + print(f"\n 原始资源节点总数: {original_node_count}") + print(f" 恢复资源节点总数: {restored_node_count}") + print(f" 映射字典条目数: {mapping_count}") + print(f" 节点数量匹配: {original_node_count == restored_node_count == mapping_count}") + + # 验证子节点的 UUID + print("\n5. 验证子节点 UUID (前3个)") + for i, (original_child, restored_child) in enumerate( + zip(original_resource.children[:3], restored_resource.children[:3]) + ): + orig_uuid = getattr(original_child, "unilabos_uuid", None) + rest_uuid = getattr(restored_child, "unilabos_uuid", None) + print(f" 子节点 {i}: {original_child.name}") + print(f" 原始 UUID: {orig_uuid}") + print(f" 恢复 UUID: {rest_uuid}") + print(f" 匹配: {orig_uuid == rest_uuid}") + + # 测试 name_to_uuid 映射的正确性 + print("\n6. 验证 name_to_uuid 映射内容 (前5个)") + for i, (name, uuid_val) in enumerate(list(name_to_uuid.items())[:5]): + print(f" {name} -> {uuid_val}") + + print("\n" + "=" * 80) + print("测试完成!") + print("=" * 80) diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index f0fdddd0..ccc96cb4 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -4,6 +4,7 @@ 这个模块包含用于创建设备类实例的工厂类。 基础工厂类提供通用的实例创建方法,而特定工厂类提供针对特定设备类的创建方法。 """ + import asyncio import inspect import traceback @@ -53,7 +54,6 @@ class DeviceClassCreator(Generic[T]): if c["type"] != "device": self.resource_tracker.add_resource(c) - def create_instance(self, data: Dict[str, Any]) -> T: """ 创建设备类实例 @@ -118,7 +118,9 @@ class PyLabRobotCreator(DeviceClassCreator[T]): return nested_dict_to_list(resource), Resource return resource, source_type - def _process_resource_references(self, data: Any, to_dict=False, states=None, prefix_path="") -> Any: + def _process_resource_references( + self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None + ) -> Any: """ 递归处理资源引用,替换_resource_child_name对应的资源 @@ -127,11 +129,13 @@ class PyLabRobotCreator(DeviceClassCreator[T]): to_dict: 是否返回字典形式的资源 states: 用于保存所有资源状态 prefix_path: 当前递归路径 + name_to_uuid: name到uuid的映射字典 Returns: 处理后的数据 """ from pylabrobot.resources import Deck, Resource + if states is None: states = {} @@ -155,6 +159,9 @@ class PyLabRobotCreator(DeviceClassCreator[T]): return serialized else: self.resource_tracker.add_resource(resource_instance) + # 立即设置UUID + if name_to_uuid: + self.resource_tracker.loop_set_uuid(resource_instance, name_to_uuid) return resource_instance except Exception as e: logger.warning(f"无法导入资源类型 {type_path}: {e}") @@ -169,12 +176,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): result = {} for key, value in data.items(): new_prefix = f"{prefix_path}.{key}" if prefix_path else key - result[key] = self._process_resource_references(value, to_dict, states, new_prefix) + result[key] = self._process_resource_references(value, to_dict, states, new_prefix, name_to_uuid) return result elif isinstance(data, list): return [ - self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]") + self._process_resource_references(item, to_dict, states, f"{prefix_path}[{i}]", name_to_uuid) for i, item in enumerate(data) ] @@ -193,22 +200,42 @@ class PyLabRobotCreator(DeviceClassCreator[T]): """ deserialize_error = None stack = None + + # 递归遍历 children 构建 name_to_uuid 映射 + def collect_name_to_uuid(children_dict: Dict[str, Any], result: Dict[str, str]): + """递归遍历嵌套的 children 字典,收集 name 到 uuid 的映射""" + for child in children_dict.values(): + if isinstance(child, dict): + result[child["name"]] = child["uuid"] + collect_name_to_uuid(child["children"], result) + + name_to_uuid = {} + collect_name_to_uuid(self.children, name_to_uuid) if self.has_deserialize: deserialize_method = getattr(self.device_cls, "deserialize") spect = inspect.signature(deserialize_method) spec_args = spect.parameters for param_name, param_value in data.copy().items(): - if isinstance(param_value, dict) and "_resource_child_name" in param_value and "_resource_type" not in param_value: + if ( + isinstance(param_value, dict) + and "_resource_child_name" in param_value + and "_resource_type" not in param_value + ): arg_value = spec_args[param_name].annotation data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") # 首先处理资源引用 states = {} - processed_data = self._process_resource_references(data, to_dict=True, states=states) + processed_data = self._process_resource_references( + data, to_dict=True, states=states, name_to_uuid=name_to_uuid + ) try: - self.device_instance = deserialize_method(**processed_data) + from pylabrobot.resources import Resource + + self.device_instance: Resource = deserialize_method(**processed_data) + self.resource_tracker.loop_set_uuid(self.device_instance, name_to_uuid) all_states = self.device_instance.serialize_all_state() for k, v in states.items(): logger.debug(f"PyLabRobot反序列化设置状态:{k}") @@ -229,11 +256,15 @@ class PyLabRobotCreator(DeviceClassCreator[T]): spect = inspect.signature(self.device_cls.__init__) spec_args = spect.parameters for param_name, param_value in data.copy().items(): - if isinstance(param_value, dict) and "_resource_child_name" in param_value and "_resource_type" not in param_value: + if ( + isinstance(param_value, dict) + and "_resource_child_name" in param_value + and "_resource_type" not in param_value + ): arg_value = spec_args[param_name].annotation data[param_name]["_resource_type"] = self.device_cls.__module__ + ":" + arg_value logger.debug(f"自动补充 _resource_type: {data[param_name]['_resource_type']}") - processed_data = self._process_resource_references(data, to_dict=False) + processed_data = self._process_resource_references(data, to_dict=False, name_to_uuid=name_to_uuid) self.device_instance = super(PyLabRobotCreator, self).create_instance(processed_data) except Exception as e: logger.error(f"PyLabRobot创建实例失败: {e}") @@ -247,22 +278,31 @@ class PyLabRobotCreator(DeviceClassCreator[T]): return self.device_instance def post_create(self): - if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction(getattr(self.device_instance, "setup")): + if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction( + getattr(self.device_instance, "setup") + ): from unilabos.ros.nodes.base_device_node import ROS2DeviceNode + def done_cb(*args): from pylabrobot.resources import set_volume_tracking + # from pylabrobot.resources import set_tip_tracking set_volume_tracking(enabled=True) # set_tip_tracking(enabled=True) # 序列化tip_spot has为False logger.debug(f"PyLabRobot设备实例 {self.device_instance} 设置完成") from unilabos.config.config import BasicConfig + if BasicConfig.vis_2d_enable: from pylabrobot.visualizer.visualizer import Visualizer + vis = Visualizer(resource=self.device_instance, open_browser=True) + def vis_done_cb(*args): logger.info(f"PyLabRobot设备实例开启了Visualizer {self.device_instance}") + ROS2DeviceNode.run_async_func(vis.setup).add_done_callback(vis_done_cb) logger.debug(f"PyLabRobot设备实例提交开启Visualizer {self.device_instance}") + ROS2DeviceNode.run_async_func(getattr(self.device_instance, "setup")).add_done_callback(done_cb) @@ -299,6 +339,7 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): deck_dict = data.get("deck") if deck_dict: from pylabrobot.resources import Deck, Resource + plrc = PyLabRobotCreator(Deck, self.children, self.resource_tracker) deck = plrc.create_instance(deck_dict) data["deck"] = deck