From 3a11eb90d4287d32694bab8685c6bf7820d6b686 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sat, 11 Oct 2025 03:38:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=85=81=E8=AE=B8=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E9=9D=9E=E6=9C=AC=E8=8A=82=E7=82=B9=E7=89=A9=E6=96=99=EF=BC=8C?= =?UTF-8?q?=E5=90=8E=E9=9D=A2=E5=8F=AF=E4=BB=A5=E9=80=9A=E8=BF=87decoratio?= =?UTF-8?q?n=E8=BF=9B=E8=A1=8C=E5=8C=BA=E5=88=86=EF=BC=8C=E5=B0=B1?= =?UTF-8?q?=E4=B8=8D=E8=BF=9B=E8=A1=8Cwarning=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/ros/nodes/base_device_node.py | 285 +++++++++++++++++++------ 1 file changed, 222 insertions(+), 63 deletions(-) diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index ae5f5597..d0193525 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -6,7 +6,7 @@ import threading import time import traceback import uuid -from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, Union, TYPE_CHECKING +from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING from concurrent.futures import ThreadPoolExecutor import asyncio @@ -25,7 +25,6 @@ from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialComma from unilabos.resources.container import RegularContainer from unilabos.resources.graphio import ( - convert_resources_to_type, resource_ulab_to_plr, initialize_resources, dict_to_tree, @@ -35,7 +34,6 @@ from unilabos.resources.graphio import ( from unilabos.resources.plr_additional_res_reg import register from unilabos.ros.msgs.message_converter import ( convert_to_ros_msg, - convert_from_ros_msg, convert_from_ros_msg_with_mapping, convert_to_ros_msg_with_mapping, ) @@ -49,11 +47,14 @@ from unilabos_msgs.srv import ( ) # type: ignore from unilabos_msgs.msg import Resource # type: ignore -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet, ResourceDict, \ - ResourceDictInstance +from unilabos.ros.nodes.resource_tracker import ( + DeviceNodeResourceTracker, + ResourceTreeSet, +) from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func +from unilabos.utils.import_manager import default_manager from unilabos.utils.log import info, debug, warning, error, critical, logger, trace from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str @@ -346,7 +347,6 @@ class BaseROS2DeviceNode(Node, Generic[T]): res.response = "" return res - async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response): # 物料传输到对应的node节点 rclient = self.create_client(ResourceAdd, "/resources/add") @@ -581,8 +581,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): resources_uuid: List[str] = i.get("data") # 资源数据 additional_add_params = i.get("additional_add_params", {}) # 额外参数 self.lab_logger().info( - f"[Resource Tree Update] Processing {action} operation, " - f"resources count: {len(resources_uuid)}" + f"[Resource Tree Update] Processing {action} operation, " f"resources count: {len(resources_uuid)}" ) tree_set = None if action in ["add", "update"]: @@ -608,7 +607,8 @@ class BaseROS2DeviceNode(Node, Generic[T]): parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid) if parent_resource is None: self.lab_logger().warning( - f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在") + f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在" + ) else: try: # 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步 @@ -617,10 +617,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): spec = inspect.signature(parent_resource.assign_child_resource) if "spot" in spec.parameters: additional_params["spot"] = site - parent_resource.assign_child_resource(plr_resource, location=None, **additional_params) + parent_resource.assign_child_resource( + plr_resource, location=None, **additional_params + ) except Exception as e: self.lab_logger().warning( - f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}") + f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}" + ) func = getattr(self.driver_instance, "resource_tree_add", None) if callable(func): func(plr_resources) @@ -631,10 +634,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): for plr_resource, tree in zip(plr_resources, tree_set.trees): states = plr_resource.serialize_all_state() original_instance: ResourcePLR = self.resource_tracker.figure_resource( - {"uuid": tree.root_node.res_content.uuid}, try_mode=False) + {"uuid": tree.root_node.res_content.uuid}, try_mode=False + ) original_instance.load_all_state(states) self.lab_logger().info( - f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())} 个") + f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())} 个" + ) func = getattr(self.driver_instance, "resource_tree_update", None) if callable(func): @@ -642,8 +647,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): results.append({"success": True, "action": "update"}) elif action == "remove": # 移除资源 - plr_resources: List[ResourcePLR] = [self.resource_tracker.uuid_to_resources[i] for - i in resources_uuid] + plr_resources: List[ResourcePLR] = [ + self.resource_tracker.uuid_to_resources[i] for i in resources_uuid + ] func = getattr(self.driver_instance, "resource_tree_remove", None) if callable(func): func(plr_resources) @@ -674,7 +680,13 @@ class BaseROS2DeviceNode(Node, Generic[T]): return res - async def transfer_resource_to_another(self, plr_resources: List["ResourcePLR"], target_device_id: str, target_resources: List["ResourcePLR"], sites: List[str]): + async def transfer_resource_to_another( + self, + plr_resources: List["ResourcePLR"], + target_device_id: str, + target_resources: List["ResourcePLR"], + sites: List[str], + ): # 准备工作 uids = [] target_uids = [] @@ -696,10 +708,12 @@ class BaseROS2DeviceNode(Node, Generic[T]): raise ValueError(f"[{self.device_id} Node-Resource] Service {srv_address} not available") # 先从当前节点移除资源 - await self.s2c_resource_tree(SerialCommand_Request(command=json.dumps([{ - "action": "remove", - "data": uids # 只移除父节点 - }], ensure_ascii=False)), SerialCommand_Response()) + await self.s2c_resource_tree( + SerialCommand_Request( + command=json.dumps([{"action": "remove", "data": uids}], ensure_ascii=False) # 只移除父节点 + ), + SerialCommand_Response(), + ) # 通知云端转运资源 for plr_resource, target_uid, site in zip(plr_resources, target_uids, sites): @@ -714,18 +728,25 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 创建请求 request = SerialCommand.Request() - request.command = json.dumps([{ - "action": "add", - "data": tree_set.all_nodes_uuid, # 只添加父节点,子节点会自动添加 - "additional_add_params": {"site": site} - }], ensure_ascii=False) + request.command = json.dumps( + [ + { + "action": "add", + "data": tree_set.all_nodes_uuid, # 只添加父节点,子节点会自动添加 + "additional_add_params": {"site": site}, + } + ], + ensure_ascii=False, + ) future = sclient.call_async(request) timeout = 30.0 start_time = time.time() while not future.done(): if time.time() - start_time > timeout: - self.lab_logger().error(f"[{self.device_id} Node-Resource] Timeout waiting for response from {target_device_id}") + self.lab_logger().error( + f"[{self.device_id} Node-Resource] Timeout waiting for response from {target_device_id}" + ) return False time.sleep(0.05) self.lab_logger().info(f"资源本地增加到{target_device_id}结果: {response.response}") @@ -888,44 +909,34 @@ class BaseROS2DeviceNode(Node, Generic[T]): for k, v in goal.get_fields_and_field_types().items(): if v in ["unilabos_msgs/Resource", "sequence"]: self.lab_logger().info(f"{action_name} 查询资源状态: Key: {k} Type: {v}") - current_resources: List[List[Dict[str, Any]]] = [] - # TODO: resource后面需要分组 - only_one_resource = False + try: - if isinstance(action_kwargs[k], list) and len(action_kwargs[k]) > 1: - for i in action_kwargs[k]: - r = ResourceGet.Request() - r.id = i["id"] # splash optional - r.with_children = True - response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(r) - current_resources.append(json.loads(response.response)) - else: - only_one_resource = True - r = ResourceGet.Request() - r.id = ( - action_kwargs[k]["id"] - if v == "unilabos_msgs/Resource" - else action_kwargs[k][0]["id"] - ) - r.with_children = True - response = await self._resource_clients["resource_get"].call_async(r) - current_resources.append(json.loads(response.response)) - except Exception: - logger.error(f"资源查询失败,默认使用本地资源") - # 删除对response.resources的检查,因为它总是存在 - type_hint = action_paramtypes[k] - final_type = get_type_class(type_hint) - if only_one_resource: - tree_set = ResourceTreeSet.from_raw_list(current_resources[0]) - self.lab_logger().debug(f"资源查询结果: {len(tree_set.all_nodes)} 个资源") - final_resource: List[ResourcePLR] | ResourcePLR = tree_set.to_plr_resources()[0] - # 判断 ACTION 是否需要特殊的物料类型如 pylabrobot.resources.Resource,并做转换 - else: - final_resource: List[ResourcePLR] | ResourcePLR = [] - for entry in current_resources: - final_resource.append(ResourceTreeSet.from_raw_list(entry)[0]) # type: ignore - try: - action_kwargs[k] = self.resource_tracker.figure_resource(final_resource, try_mode=False) + # 统一处理单个或多个资源 + is_sequence = v != "unilabos_msgs/Resource" + resource_inputs = action_kwargs[k] if is_sequence else [action_kwargs[k]] + + # 批量查询资源 + queried_resources = [] + for resource_data in resource_inputs: + r = SerialCommand.Request() + r.command = json.dumps({"id": resource_data["id"], "with_children": True}) + # 发送请求并等待响应 + response: SerialCommand_Response = await self._resource_clients[ + "resource_get" + ].call_async(r) + raw_data = json.loads(response.response) + + # 转换为 PLR 资源 + tree_set = ResourceTreeSet.from_raw_list(raw_data) + plr_resource = tree_set.to_plr_resources()[0] + queried_resources.append(plr_resource) + + self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源") + + # 通过资源跟踪器获取本地实例 + final_resources = queried_resources if is_sequence else queried_resources[0] + action_kwargs[k] = self.resource_tracker.figure_resource(final_resources, try_mode=False) + except Exception as e: self.lab_logger().error(f"{action_name} 物料实例获取失败: {e}\n{traceback.format_exc()}") error_skip = True @@ -1096,11 +1107,92 @@ class BaseROS2DeviceNode(Node, Generic[T]): assert callable( function ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" + + # 处理 ResourceSlot 类型参数 + args_list = default_manager._analyze_method_signature(function)["args"] + for arg in args_list: + arg_name = arg["name"] + arg_type = arg["type"] + + # 跳过不在 function_args 中的参数 + if arg_name not in function_args: + continue + + # 处理单个 ResourceSlot + if arg_type == "unilabos.registry.placeholder_type:ResourceSlot": + resource_data = function_args[arg_name] + if isinstance(resource_data, dict) and "id" in resource_data: + try: + converted_resource = self._convert_resource_sync(resource_data) + function_args[arg_name] = converted_resource + except Exception as e: + self.lab_logger().error( + f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" + ) + raise JsonCommandInitError(f"ResourceSlot参数转换失败: {arg_name}") + + # 处理 ResourceSlot 列表 + elif isinstance(arg_type, tuple) and len(arg_type) == 2: + resource_slot_type = "unilabos.registry.placeholder_type:ResourceSlot" + if arg_type[0] == "list" and arg_type[1] == resource_slot_type: + resource_list = function_args[arg_name] + if isinstance(resource_list, list): + try: + converted_resources = [] + for resource_data in resource_list: + if isinstance(resource_data, dict) and "id" in resource_data: + converted_resource = self._convert_resource_sync(resource_data) + converted_resources.append(converted_resource) + function_args[arg_name] = converted_resources + except Exception as e: + self.lab_logger().error( + f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" + ) + raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}") + return function(**function_args) except KeyError as ex: raise JsonCommandInitError( f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" ) + def _convert_resource_sync(self, resource_data: Dict[str, Any]): + """同步转换资源数据为实例""" + # 创建资源查询请求 + r = SerialCommand.Request() + r.command = json.dumps({"id": resource_data["id"], "with_children": True}) + + # 同步调用资源查询服务 + future = self._resource_clients["resource_get"].call_async(r) + + # 等待结果(使用while循环,每次sleep 0.5秒,最多等待5秒) + timeout = 30.0 + elapsed = 0.0 + while not future.done() and elapsed < timeout: + time.sleep(0.05) + elapsed += 0.05 + + if not future.done(): + raise Exception(f"资源查询超时: {resource_data['id']}") + + response = future.result() + if response is None: + raise Exception(f"资源查询返回空结果: {resource_data['id']}") + + current_resources = json.loads(response.response) + + # 转换为 PLR 资源 + tree_set = ResourceTreeSet.from_raw_list(current_resources) + plr_resource = tree_set.to_plr_resources()[0] + + # 通过资源跟踪器获取本地实例 + res = self.resource_tracker.figure_resource(plr_resource, try_mode=True) + if len(res) == 0: + self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data},返回新建实例") + return plr_resource + elif len(res) == 1: + return res[0] + else: + raise ValueError(f"资源转换得到多个实例: {res}") async def _execute_driver_command_async(self, string: str): try: @@ -1123,12 +1215,79 @@ class BaseROS2DeviceNode(Node, Generic[T]): assert asyncio.iscoroutinefunction( function ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" + + # 处理 ResourceSlot 类型参数 + args_list = default_manager._analyze_method_signature(function)["args"] + for arg in args_list: + arg_name = arg["name"] + arg_type = arg["type"] + + # 跳过不在 function_args 中的参数 + if arg_name not in function_args: + continue + + # 处理单个 ResourceSlot + if arg_type == "unilabos.registry.placeholder_type:ResourceSlot": + resource_data = function_args[arg_name] + if isinstance(resource_data, dict) and "id" in resource_data: + try: + converted_resource = await self._convert_resource_async(resource_data) + function_args[arg_name] = converted_resource + except Exception as e: + self.lab_logger().error( + f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" + ) + raise JsonCommandInitError(f"ResourceSlot参数转换失败: {arg_name}") + + # 处理 ResourceSlot 列表 + elif isinstance(arg_type, tuple) and len(arg_type) == 2: + resource_slot_type = "unilabos.registry.placeholder_type:ResourceSlot" + if arg_type[0] == "list" and arg_type[1] == resource_slot_type: + resource_list = function_args[arg_name] + if isinstance(resource_list, list): + try: + converted_resources = [] + for resource_data in resource_list: + if isinstance(resource_data, dict) and "id" in resource_data: + converted_resource = await self._convert_resource_async(resource_data) + converted_resources.append(converted_resource) + function_args[arg_name] = converted_resources + except Exception as e: + self.lab_logger().error( + f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}" + ) + raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}") + return await function(**function_args) except KeyError as ex: raise JsonCommandInitError( f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" ) + async def _convert_resource_async(self, resource_data: Dict[str, Any]): + """异步转换资源数据为实例""" + # 创建资源查询请求 + r = SerialCommand.Request() + r.command = json.dumps({"id": resource_data["id"], "with_children": True}) + + # 异步调用资源查询服务 + response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(r) + current_resources = json.loads(response.response) + + # 转换为 PLR 资源 + tree_set = ResourceTreeSet.from_raw_list(current_resources) + plr_resource = tree_set.to_plr_resources()[0] + + # 通过资源跟踪器获取本地实例 + res = self.resource_tracker.figure_resource(plr_resource, try_mode=True) + if len(res) == 0: + self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data},返回新建实例") + return plr_resource + elif len(res) == 1: + return res[0] + else: + raise ValueError(f"资源转换得到多个实例: {res}") + # 异步上下文管理方法 async def __aenter__(self): """进入异步上下文"""