diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index d070e690..b8c8bea3 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -73,6 +73,8 @@ class HTTPClient: Returns: Dict[str, str]: 旧UUID到新UUID的映射关系 {old_uuid: new_uuid} """ + with open(os.path.join(BasicConfig.working_dir, "req_resource_tree_add.json"), "w", encoding="utf-8") as f: + f.write(json.dumps({"nodes": [x for xs in resources.dump() for x in xs], "mount_uuid": mount_uuid}, indent=4)) # 从序列化数据中提取所有节点的UUID(保存旧UUID) old_uuids = {n.res_content.uuid: n for n in resources.all_nodes} if not self.initialized or first_add: @@ -92,6 +94,8 @@ class HTTPClient: timeout=100, ) + with open(os.path.join(BasicConfig.working_dir, "res_resource_tree_add.json"), "w", encoding="utf-8") as f: + f.write(f"{response.status_code}" + "\n" + response.text) # 处理响应,构建UUID映射 uuid_mapping = {} if response.status_code == 200: diff --git a/unilabos/ros/initialize_device.py b/unilabos/ros/initialize_device.py index bbc86e04..a92a9f50 100644 --- a/unilabos/ros/initialize_device.py +++ b/unilabos/ros/initialize_device.py @@ -26,6 +26,7 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device d = None original_device_config = copy.deepcopy(device_config) device_class_config = device_config["class"] + uid = device_config["uuid"] if isinstance(device_class_config, str): # 如果是字符串,则直接去lab_registry中查找,获取class if len(device_class_config) == 0: raise DeviceClassInvalid(f"Device [{device_id}] class cannot be an empty string. {device_config}") @@ -50,7 +51,7 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device ) try: d = DEVICE( - device_id=device_id, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {}) + device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {}) ) except DeviceInitError as ex: return d diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index b62ad2d9..d3f224bc 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -132,6 +132,7 @@ class ROSLoggerAdapter: def init_wrapper( self, device_id: str, + device_uuid: str, driver_class: type[T], device_config: Dict[str, Any], status_types: Dict[str, Any], @@ -150,6 +151,7 @@ def init_wrapper( if children is None: children = [] kwargs["device_id"] = device_id + kwargs["device_uuid"] = device_uuid kwargs["driver_class"] = driver_class kwargs["device_config"] = device_config kwargs["driver_params"] = driver_params @@ -266,6 +268,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self, driver_instance: T, device_id: str, + device_uuid: str, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], @@ -278,6 +281,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): Args: driver_instance: 设备实例 device_id: 设备标识符 + device_uuid: 设备标识符 status_types: 需要发布的状态和传感器信息 action_value_mappings: 设备动作 hardware_interface: 硬件接口配置 @@ -285,7 +289,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): """ self.driver_instance = driver_instance self.device_id = device_id - self.uuid = str(uuid.uuid4()) + self.uuid = device_uuid self.publish_high_frequency = False self.callback_group = ReentrantCallbackGroup() self.resource_tracker = resource_tracker @@ -554,6 +558,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): async def update_resource(self, resources: List["ResourcePLR"]): r = SerialCommand.Request() tree_set = ResourceTreeSet.from_plr_resources(resources) + for tree in tree_set.trees: + root_node = tree.root_node + if not root_node.res_content.uuid_parent: + logger.warning(f"更新无父节点物料{root_node},自动以当前设备作为根节点") + root_node.res_content.parent_uuid = self.uuid r.command = json.dumps({"data": {"data": tree_set.dump()}, "action": "update"}) response: SerialCommand_Response = await self._resource_clients["c2s_update_resource_tree"].call_async(r) # type: ignore try: @@ -1347,6 +1356,7 @@ class ROS2DeviceNode: def __init__( self, device_id: str, + device_uuid: str, driver_class: Type[T], device_config: Dict[str, Any], driver_params: Dict[str, Any], @@ -1362,6 +1372,7 @@ class ROS2DeviceNode: Args: device_id: 设备标识符 + device_uuid: 设备uuid driver_class: 设备类 device_config: 原始初始化的json driver_params: driver初始化的参数 @@ -1436,6 +1447,7 @@ class ROS2DeviceNode: children=children, driver_instance=self._driver_instance, # type: ignore device_id=device_id, + device_uuid=device_uuid, status_types=status_types, action_value_mappings=action_value_mappings, hardware_interface=hardware_interface, @@ -1446,6 +1458,7 @@ class ROS2DeviceNode: self._ros_node = BaseROS2DeviceNode( driver_instance=self._driver_instance, device_id=device_id, + device_uuid=device_uuid, status_types=status_types, action_value_mappings=action_value_mappings, hardware_interface=hardware_interface, diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 1b9a84b8..58a20e6f 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -18,7 +18,7 @@ from unilabos_msgs.srv import ( ResourceDelete, ResourceUpdate, ResourceList, - SerialCommand, + SerialCommand, ResourceGet, ) # type: ignore from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unique_identifier_msgs.msg import UUID @@ -41,6 +41,7 @@ from unilabos.ros.nodes.resource_tracker import ( ResourceTreeSet, ResourceTreeInstance, ) +from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.type_check import serialize_result_info from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot @@ -99,17 +100,6 @@ class HostNode(BaseROS2DeviceNode): """ if self._instance is not None: self._instance.lab_logger().critical("[Host Node] HostNode instance already exists.") - # 初始化Node基类,传递空参数覆盖列表 - BaseROS2DeviceNode.__init__( - self, - driver_instance=self, - device_id=device_id, - status_types={}, - action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"], - hardware_interface={}, - print_publish=False, - resource_tracker=self._resource_tracker, # host node并不是通过initialize 包一层传进来的 - ) # 设置单例实例 self.__class__._instance = self @@ -127,6 +117,91 @@ class HostNode(BaseROS2DeviceNode): bridges = [] self.bridges = bridges + # 创建 host_node 作为一个单独的 ResourceTree + host_node_dict = { + "id": "host_node", + "uuid": str(uuid.uuid4()), + "parent_uuid": "", + "name": "host_node", + "type": "device", + "class": "host_node", + "config": {}, + "data": {}, + "children": [], + "description": "", + "schema": {}, + "model": {}, + "icon": "", + } + + # 创建 host_node 的 ResourceTree + host_node_instance = ResourceDictInstance.get_resource_instance_from_dict(host_node_dict) + host_node_tree = ResourceTreeInstance(host_node_instance) + resources_config.trees.insert(0, host_node_tree) + try: + for bridge in self.bridges: + if hasattr(bridge, "resource_tree_add") and resources_config: + from unilabos.app.web.client import HTTPClient + + client: HTTPClient = bridge + resource_start_time = time.time() + # 传递 ResourceTreeSet 对象,在 client 中转换为字典并获取 UUID 映射 + uuid_mapping = client.resource_tree_add(resources_config, "", True) + device_uuid = resources_config.root_nodes[0].res_content.uuid + resource_end_time = time.time() + logger.info( + f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" + ) + for edge in self.resources_edge_config: + edge["source_uuid"] = uuid_mapping.get(edge["source_uuid"], edge["source_uuid"]) + edge["target_uuid"] = uuid_mapping.get(edge["target_uuid"], edge["target_uuid"]) + resource_add_res = client.resource_edge_add(self.resources_edge_config) + resource_edge_end_time = time.time() + logger.info( + f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms" + ) + # resources_config 通过各个设备的 resource_tracker 进行uuid更新,利用uuid_mapping + # resources_config 的 root node 是 + # # 创建反向映射:new_uuid -> old_uuid + # reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()} + # for tree in resources_config.trees: + # node = tree.root_node + # if node.res_content.type == "device": + # if node.res_content.id == "host_node": + # continue + # # slave节点走c2s更新接口,拿到add自行update uuid + # device_tracker = self.devices_instances[node.res_content.id].resource_tracker + # old_uuid = reverse_uuid_mapping.get(node.res_content.uuid) + # if old_uuid: + # # 找到旧UUID,使用UUID查找 + # resource_instance = device_tracker.uuid_to_resources.get(old_uuid) + # else: + # # 未找到旧UUID,使用name查找 + # resource_instance = device_tracker.figure_resource( + # {"name": node.res_content.name} + # ) + # device_tracker.loop_update_uuid(resource_instance, uuid_mapping) + # else: + # try: + # for plr_resource in ResourceTreeSet([tree]).to_plr_resources(): + # self.resource_tracker.add_resource(plr_resource) + # except Exception as ex: + # self.lab_logger().warning("[Host Node-Resource] 根节点物料序列化失败!") + except Exception as ex: + logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}") + # 初始化Node基类,传递空参数覆盖列表 + BaseROS2DeviceNode.__init__( + self, + driver_instance=self, + device_id=device_id, + device_uuid=host_node_dict["uuid"], + status_types={}, + action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"], + hardware_interface={}, + print_publish=False, + resource_tracker=self._resource_tracker, # host node并不是通过initialize 包一层传进来的 + ) + # 创建设备、动作客户端和目标存储 self.devices_names: Dict[str, str] = {device_id: self.namespace} # 存储设备名称和命名空间的映射 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 @@ -207,81 +282,7 @@ class HostNode(BaseROS2DeviceNode): ].items(): controller_config["update_rate"] = update_rate self.initialize_controller(controller_id, controller_config) - # 创建 host_node 作为一个单独的 ResourceTree - host_node_dict = { - "id": "host_node", - "uuid": str(uuid.uuid4()), - "parent_uuid": "", - "name": "host_node", - "type": "device", - "class": "host_node", - "config": {}, - "data": {}, - "children": [], - "description": "", - "schema": {}, - "model": {}, - "icon": "", - } - - # 创建 host_node 的 ResourceTree - host_node_instance = ResourceDictInstance.get_resource_instance_from_dict(host_node_dict) - host_node_tree = ResourceTreeInstance(host_node_instance) - resources_config.trees.insert(0, host_node_tree) - try: - for bridge in self.bridges: - if hasattr(bridge, "resource_tree_add") and resources_config: - from unilabos.app.web.client import HTTPClient - - client: HTTPClient = bridge - resource_start_time = time.time() - # 传递 ResourceTreeSet 对象,在 client 中转换为字典并获取 UUID 映射 - uuid_mapping = client.resource_tree_add(resources_config, "", True) - resource_end_time = time.time() - self.lab_logger().info( - f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" - ) - for edge in self.resources_edge_config: - edge["source_uuid"] = uuid_mapping.get(edge["source_uuid"], edge["source_uuid"]) - edge["target_uuid"] = uuid_mapping.get(edge["target_uuid"], edge["target_uuid"]) - resource_add_res = client.resource_edge_add(self.resources_edge_config) - resource_edge_end_time = time.time() - self.lab_logger().info( - f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms" - ) - # resources_config 通过各个设备的 resource_tracker 进行uuid更新,利用uuid_mapping - # resources_config 的 root node 是 - # 创建反向映射:new_uuid -> old_uuid - reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()} - for tree in resources_config.trees: - node = tree.root_node - if node.res_content.type == "device": - for sub_node in node.children: - # 只有二级子设备 - if sub_node.res_content.type != "device": - # slave节点走c2s更新接口,拿到add自行update uuid - device_tracker = self.devices_instances[node.res_content.id].resource_tracker - # sub_node.res_content.uuid 已经是新UUID,需要用旧UUID去查找 - old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid) - if old_uuid: - # 找到旧UUID,使用UUID查找 - resource_instance = device_tracker.uuid_to_resources.get(old_uuid) - else: - # 未找到旧UUID,使用name查找 - resource_instance = device_tracker.figure_resource( - {"name": sub_node.res_content.name} - ) - device_tracker.loop_update_uuid(resource_instance, uuid_mapping) - else: - try: - for plr_resource in ResourceTreeSet([tree]).to_plr_resources(): - self.resource_tracker.add_resource(plr_resource) - except Exception as ex: - self.lab_logger().warning("[Host Node-Resource] 根节点物料序列化失败!") - except Exception as ex: - self.lab_logger().error("[Host Node-Resource] 添加物料出错!") - self.lab_logger().error(traceback.format_exc()) # 创建定时器,定期发现设备 self._discovery_timer = self.create_timer( discovery_interval, self._discovery_devices_callback, callback_group=ReentrantCallbackGroup() @@ -862,7 +863,7 @@ class HostNode(BaseROS2DeviceNode): ), } - def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK + async def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK resource_tree_set = ResourceTreeSet.load(data["data"]) mount_uuid = data["mount_uuid"] first_add = data["first_add"] @@ -903,7 +904,7 @@ class HostNode(BaseROS2DeviceNode): response.response = json.dumps(uuid_mapping) if success else "FAILED" self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}") - def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK + async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK uuid_list: List[str] = data["data"] with_children: bool = data["with_children"] from unilabos.app.web.client import http_client @@ -911,7 +912,7 @@ class HostNode(BaseROS2DeviceNode): resource_response = http_client.resource_tree_get(uuid_list, with_children) response.response = json.dumps(resource_response) - def _resource_tree_action_remove_callback(self, data: dict, response: SerialCommand_Response): + async def _resource_tree_action_remove_callback(self, data: dict, response: SerialCommand_Response): """ 子节点通知Host物料树删除 """ @@ -919,7 +920,7 @@ class HostNode(BaseROS2DeviceNode): response.response = "OK" self.lab_logger().info(f"[Host Node-Resource] Resource tree remove completed") - def _resource_tree_action_update_callback(self, data: dict, response: SerialCommand_Response): + async def _resource_tree_action_update_callback(self, data: dict, response: SerialCommand_Response): """ 子节点通知Host物料树更新 """ @@ -937,14 +938,16 @@ class HostNode(BaseROS2DeviceNode): uuid_to_trees[tree.root_node.res_content.uuid].append(tree) for uid, trees in uuid_to_trees.items(): - new_tree_set = ResourceTreeSet(trees) resource_start_time = time.time() + self.lab_logger().info( + f"[Host Node-Resource] 物料 {[root_node.res_content.id for root_node in new_tree_set.root_nodes]} {uid} 挂载 {trees[0].root_node.res_content.parent_uuid} 请求更新上传" + ) uuid_mapping = http_client.resource_tree_add(new_tree_set, uid, False) success = bool(uuid_mapping) resource_end_time = time.time() self.lab_logger().info( - f"[Host Node-Resource] 物料 {[root_node.res_content.id for root_node in new_tree_set.root_nodes]} 挂载 {uid} P{trees[0].root_node.res_content.parent} 更新上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" + f"[Host Node-Resource] 物料更新上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" ) if uuid_mapping: self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点") @@ -952,7 +955,7 @@ class HostNode(BaseROS2DeviceNode): response.response = json.dumps(uuid_mapping) self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}") - def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response): + async def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response): """ 子节点通知Host物料树更新 @@ -965,13 +968,13 @@ class HostNode(BaseROS2DeviceNode): action = data["action"] data = data["data"] if action == "add": - self._resource_tree_action_add_callback(data, response) + await self._resource_tree_action_add_callback(data, response) elif action == "get": - self._resource_tree_action_get_callback(data, response) + await self._resource_tree_action_get_callback(data, response) elif action == "update": - self._resource_tree_action_update_callback(data, response) + await self._resource_tree_action_update_callback(data, response) elif action == "remove": - self._resource_tree_action_remove_callback(data, response) + await self._resource_tree_action_remove_callback(data, response) else: self.lab_logger().error(f"[Host Node-Resource] Invalid action: {action}") response.response = "ERROR" diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 07e35ee6..5a1eaa75 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -11,8 +11,7 @@ from unilabos.messages import * # type: ignore # protocol names from rclpy.action import ActionServer, ActionClient from rclpy.action.server import ServerGoalHandle from rclpy.callback_groups import ReentrantCallbackGroup -from unilabos_msgs.msg import Resource # type: ignore -from unilabos_msgs.srv import ResourceGet, ResourceUpdate # type: ignore +from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unilabos.compile import action_protocol_generators from unilabos.resources.graphio import list_to_nested_dict, nested_dict_to_list @@ -20,11 +19,11 @@ from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( get_action_type, convert_to_ros_msg, - convert_from_ros_msg, convert_from_ros_msg_with_mapping, ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode -from unilabos.utils.type_check import serialize_result_info, get_result_info_str +from unilabos.ros.nodes.resource_tracker import ResourceTreeSet +from unilabos.utils.type_check import get_result_info_str if TYPE_CHECKING: from unilabos.devices.workstation.workstation_base import WorkstationBase @@ -50,6 +49,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): *, driver_instance: "WorkstationBase", device_id: str, + device_uuid: str, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], @@ -64,6 +64,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): super().__init__( driver_instance=driver_instance, device_id=device_id, + device_uuid=device_uuid, status_types=status_types, action_value_mappings={**action_value_mappings, **self.protocol_action_mappings}, hardware_interface=hardware_interface, @@ -222,16 +223,28 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): # 向Host查询物料当前状态 for k, v in goal.get_fields_and_field_types().items(): if v in ["unilabos_msgs/Resource", "sequence"]: - r = ResourceGet.Request() - resource_id = ( - protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"] - ) - r.id = resource_id - r.with_children = True - response = await self._resource_clients["resource_get"].call_async(r) - protocol_kwargs[k] = list_to_nested_dict( - [convert_from_ros_msg(rs) for rs in response.resources] - ) + self.lab_logger().info(f"{protocol_name} 查询资源状态: Key: {k} Type: {v}") + + try: + # 统一处理单个或多个资源 + resource_id = ( + protocol_kwargs[k]["id"] if v == "unilabos_msgs/Resource" else protocol_kwargs[k][0]["id"] + ) + r = SerialCommand_Request() + r.command = json.dumps({"id": resource_id, "with_children": True}) + # 发送请求并等待响应 + response: SerialCommand_Response = await self._resource_clients[ + "resource_get" + ].call_async( + r + ) # type: ignore + raw_data = json.loads(response.response) + tree_set = ResourceTreeSet.from_raw_list(raw_data) + target = tree_set.dump() + protocol_kwargs[k] = target[0] + except Exception as ex: + self.lab_logger().error(f"查询资源失败: {k}, 错误: {ex}\n{traceback.format_exc()}") + raise self.lab_logger().info(f"🔍 最终的 vessel: {protocol_kwargs.get('vessel', 'NOT_FOUND')}") diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 15f4b8ca..e6c1beea 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -849,6 +849,7 @@ class DeviceNodeResourceTracker(object): def process(res): current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid") + replaced = 0 if current_uuid and current_uuid in uuid_map: new_uuid = uuid_map[current_uuid] if current_uuid != new_uuid: @@ -858,8 +859,8 @@ class DeviceNodeResourceTracker(object): self.uuid_to_resources.pop(current_uuid) self.uuid_to_resources[new_uuid] = res logger.debug(f"更新uuid: {current_uuid} -> {new_uuid}") - return 1 - return 0 + replaced = 1 + return replaced return self._traverse_and_process(resource, process) @@ -911,9 +912,23 @@ class DeviceNodeResourceTracker(object): Args: resource: 资源对象(可以是dict或实例) """ + root_uuids = {} for r in self.resources: + res_uuid = r.get("uuid") if isinstance(r, dict) else getattr(r, "unilabos_uuid", None) + if res_uuid: + root_uuids[res_uuid] = r if id(r) == id(resource): return + + # 这里只做uuid的根节点比较 + if isinstance(resource, dict): + res_uuid = resource.get("uuid") + else: + res_uuid = getattr(resource, "unilabos_uuid", None) + if res_uuid in root_uuids: + old_res = root_uuids[res_uuid] + # self.remove_resource(old_res) + logger.warning(f"资源{resource}已存在,旧资源: {old_res}") self.resources.append(resource) # 递归收集uuid映射 self._collect_uuid_mapping(resource)