feat: 允许返回非本节点物料,后面可以通过decoration进行区分,就不进行warning了

This commit is contained in:
Xuwznln
2025-10-11 03:38:14 +08:00
parent 387866b9c9
commit 3a11eb90d4

View File

@@ -6,7 +6,7 @@ import threading
import time
import traceback
import uuid
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, Union, TYPE_CHECKING
from typing import get_type_hints, TypeVar, Generic, Dict, Any, Type, TypedDict, Optional, List, TYPE_CHECKING
from concurrent.futures import ThreadPoolExecutor
import asyncio
@@ -25,7 +25,6 @@ from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialComma
from unilabos.resources.container import RegularContainer
from unilabos.resources.graphio import (
convert_resources_to_type,
resource_ulab_to_plr,
initialize_resources,
dict_to_tree,
@@ -35,7 +34,6 @@ from unilabos.resources.graphio import (
from unilabos.resources.plr_additional_res_reg import register
from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg,
convert_from_ros_msg,
convert_from_ros_msg_with_mapping,
convert_to_ros_msg_with_mapping,
)
@@ -49,11 +47,14 @@ from unilabos_msgs.srv import (
) # type: ignore
from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet, ResourceDict, \
ResourceDictInstance
from unilabos.ros.nodes.resource_tracker import (
DeviceNodeResourceTracker,
ResourceTreeSet,
)
from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func
from unilabos.utils.import_manager import default_manager
from unilabos.utils.log import info, debug, warning, error, critical, logger, trace
from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str
@@ -346,7 +347,6 @@ class BaseROS2DeviceNode(Node, Generic[T]):
res.response = ""
return res
async def append_resource(req: SerialCommand_Request, res: SerialCommand_Response):
# 物料传输到对应的node节点
rclient = self.create_client(ResourceAdd, "/resources/add")
@@ -581,8 +581,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
resources_uuid: List[str] = i.get("data") # 资源数据
additional_add_params = i.get("additional_add_params", {}) # 额外参数
self.lab_logger().info(
f"[Resource Tree Update] Processing {action} operation, "
f"resources count: {len(resources_uuid)}"
f"[Resource Tree Update] Processing {action} operation, " f"resources count: {len(resources_uuid)}"
)
tree_set = None
if action in ["add", "update"]:
@@ -608,7 +607,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid)
if parent_resource is None:
self.lab_logger().warning(
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在")
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在"
)
else:
try:
# 特殊兼容所有plr的物料的assign方法和create_resource append_resource后期同步
@@ -617,10 +617,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
spec = inspect.signature(parent_resource.assign_child_resource)
if "spot" in spec.parameters:
additional_params["spot"] = site
parent_resource.assign_child_resource(plr_resource, location=None, **additional_params)
parent_resource.assign_child_resource(
plr_resource, location=None, **additional_params
)
except Exception as e:
self.lab_logger().warning(
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}")
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}"
)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
@@ -631,10 +634,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False)
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
original_instance.load_all_state(states)
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())}")
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())}"
)
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
@@ -642,8 +647,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
results.append({"success": True, "action": "update"})
elif action == "remove":
# 移除资源
plr_resources: List[ResourcePLR] = [self.resource_tracker.uuid_to_resources[i] for
i in resources_uuid]
plr_resources: List[ResourcePLR] = [
self.resource_tracker.uuid_to_resources[i] for i in resources_uuid
]
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(plr_resources)
@@ -674,7 +680,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
return res
async def transfer_resource_to_another(self, plr_resources: List["ResourcePLR"], target_device_id: str, target_resources: List["ResourcePLR"], sites: List[str]):
async def transfer_resource_to_another(
self,
plr_resources: List["ResourcePLR"],
target_device_id: str,
target_resources: List["ResourcePLR"],
sites: List[str],
):
# 准备工作
uids = []
target_uids = []
@@ -696,10 +708,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raise ValueError(f"[{self.device_id} Node-Resource] Service {srv_address} not available")
# 先从当前节点移除资源
await self.s2c_resource_tree(SerialCommand_Request(command=json.dumps([{
"action": "remove",
"data": uids # 只移除父节点
}], ensure_ascii=False)), SerialCommand_Response())
await self.s2c_resource_tree(
SerialCommand_Request(
command=json.dumps([{"action": "remove", "data": uids}], ensure_ascii=False) # 只移除父节点
),
SerialCommand_Response(),
)
# 通知云端转运资源
for plr_resource, target_uid, site in zip(plr_resources, target_uids, sites):
@@ -714,18 +728,25 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 创建请求
request = SerialCommand.Request()
request.command = json.dumps([{
request.command = json.dumps(
[
{
"action": "add",
"data": tree_set.all_nodes_uuid, # 只添加父节点,子节点会自动添加
"additional_add_params": {"site": site}
}], ensure_ascii=False)
"additional_add_params": {"site": site},
}
],
ensure_ascii=False,
)
future = sclient.call_async(request)
timeout = 30.0
start_time = time.time()
while not future.done():
if time.time() - start_time > timeout:
self.lab_logger().error(f"[{self.device_id} Node-Resource] Timeout waiting for response from {target_device_id}")
self.lab_logger().error(
f"[{self.device_id} Node-Resource] Timeout waiting for response from {target_device_id}"
)
return False
time.sleep(0.05)
self.lab_logger().info(f"资源本地增加到{target_device_id}结果: {response.response}")
@@ -888,44 +909,34 @@ class BaseROS2DeviceNode(Node, Generic[T]):
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
self.lab_logger().info(f"{action_name} 查询资源状态: Key: {k} Type: {v}")
current_resources: List[List[Dict[str, Any]]] = []
# TODO: resource后面需要分组
only_one_resource = False
try:
if isinstance(action_kwargs[k], list) and len(action_kwargs[k]) > 1:
for i in action_kwargs[k]:
r = ResourceGet.Request()
r.id = i["id"] # splash optional
r.with_children = True
response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(r)
current_resources.append(json.loads(response.response))
else:
only_one_resource = True
r = ResourceGet.Request()
r.id = (
action_kwargs[k]["id"]
if v == "unilabos_msgs/Resource"
else action_kwargs[k][0]["id"]
)
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
current_resources.append(json.loads(response.response))
except Exception:
logger.error(f"资源查询失败,默认使用本地资源")
# 删除对response.resources的检查因为它总是存在
type_hint = action_paramtypes[k]
final_type = get_type_class(type_hint)
if only_one_resource:
tree_set = ResourceTreeSet.from_raw_list(current_resources[0])
self.lab_logger().debug(f"资源查询结果: {len(tree_set.all_nodes)} 个资源")
final_resource: List[ResourcePLR] | ResourcePLR = tree_set.to_plr_resources()[0]
# 判断 ACTION 是否需要特殊的物料类型如 pylabrobot.resources.Resource并做转换
else:
final_resource: List[ResourcePLR] | ResourcePLR = []
for entry in current_resources:
final_resource.append(ResourceTreeSet.from_raw_list(entry)[0]) # type: ignore
try:
action_kwargs[k] = self.resource_tracker.figure_resource(final_resource, try_mode=False)
# 统一处理单个或多个资源
is_sequence = v != "unilabos_msgs/Resource"
resource_inputs = action_kwargs[k] if is_sequence else [action_kwargs[k]]
# 批量查询资源
queried_resources = []
for resource_data in resource_inputs:
r = SerialCommand.Request()
r.command = json.dumps({"id": resource_data["id"], "with_children": True})
# 发送请求并等待响应
response: SerialCommand_Response = await self._resource_clients[
"resource_get"
].call_async(r)
raw_data = json.loads(response.response)
# 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(raw_data)
plr_resource = tree_set.to_plr_resources()[0]
queried_resources.append(plr_resource)
self.lab_logger().debug(f"资源查询结果: 共 {len(queried_resources)} 个资源")
# 通过资源跟踪器获取本地实例
final_resources = queried_resources if is_sequence else queried_resources[0]
action_kwargs[k] = self.resource_tracker.figure_resource(final_resources, try_mode=False)
except Exception as e:
self.lab_logger().error(f"{action_name} 物料实例获取失败: {e}\n{traceback.format_exc()}")
error_skip = True
@@ -1096,11 +1107,92 @@ class BaseROS2DeviceNode(Node, Generic[T]):
assert callable(
function
), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}"
# 处理 ResourceSlot 类型参数
args_list = default_manager._analyze_method_signature(function)["args"]
for arg in args_list:
arg_name = arg["name"]
arg_type = arg["type"]
# 跳过不在 function_args 中的参数
if arg_name not in function_args:
continue
# 处理单个 ResourceSlot
if arg_type == "unilabos.registry.placeholder_type:ResourceSlot":
resource_data = function_args[arg_name]
if isinstance(resource_data, dict) and "id" in resource_data:
try:
converted_resource = self._convert_resource_sync(resource_data)
function_args[arg_name] = converted_resource
except Exception as e:
self.lab_logger().error(
f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot参数转换失败: {arg_name}")
# 处理 ResourceSlot 列表
elif isinstance(arg_type, tuple) and len(arg_type) == 2:
resource_slot_type = "unilabos.registry.placeholder_type:ResourceSlot"
if arg_type[0] == "list" and arg_type[1] == resource_slot_type:
resource_list = function_args[arg_name]
if isinstance(resource_list, list):
try:
converted_resources = []
for resource_data in resource_list:
if isinstance(resource_data, dict) and "id" in resource_data:
converted_resource = self._convert_resource_sync(resource_data)
converted_resources.append(converted_resource)
function_args[arg_name] = converted_resources
except Exception as e:
self.lab_logger().error(
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}")
return function(**function_args)
except KeyError as ex:
raise JsonCommandInitError(
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
)
def _convert_resource_sync(self, resource_data: Dict[str, Any]):
"""同步转换资源数据为实例"""
# 创建资源查询请求
r = SerialCommand.Request()
r.command = json.dumps({"id": resource_data["id"], "with_children": True})
# 同步调用资源查询服务
future = self._resource_clients["resource_get"].call_async(r)
# 等待结果使用while循环每次sleep 0.5秒最多等待5秒
timeout = 30.0
elapsed = 0.0
while not future.done() and elapsed < timeout:
time.sleep(0.05)
elapsed += 0.05
if not future.done():
raise Exception(f"资源查询超时: {resource_data['id']}")
response = future.result()
if response is None:
raise Exception(f"资源查询返回空结果: {resource_data['id']}")
current_resources = json.loads(response.response)
# 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(current_resources)
plr_resource = tree_set.to_plr_resources()[0]
# 通过资源跟踪器获取本地实例
res = self.resource_tracker.figure_resource(plr_resource, try_mode=True)
if len(res) == 0:
self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data},返回新建实例")
return plr_resource
elif len(res) == 1:
return res[0]
else:
raise ValueError(f"资源转换得到多个实例: {res}")
async def _execute_driver_command_async(self, string: str):
try:
@@ -1123,12 +1215,79 @@ class BaseROS2DeviceNode(Node, Generic[T]):
assert asyncio.iscoroutinefunction(
function
), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}"
# 处理 ResourceSlot 类型参数
args_list = default_manager._analyze_method_signature(function)["args"]
for arg in args_list:
arg_name = arg["name"]
arg_type = arg["type"]
# 跳过不在 function_args 中的参数
if arg_name not in function_args:
continue
# 处理单个 ResourceSlot
if arg_type == "unilabos.registry.placeholder_type:ResourceSlot":
resource_data = function_args[arg_name]
if isinstance(resource_data, dict) and "id" in resource_data:
try:
converted_resource = await self._convert_resource_async(resource_data)
function_args[arg_name] = converted_resource
except Exception as e:
self.lab_logger().error(
f"转换ResourceSlot参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot参数转换失败: {arg_name}")
# 处理 ResourceSlot 列表
elif isinstance(arg_type, tuple) and len(arg_type) == 2:
resource_slot_type = "unilabos.registry.placeholder_type:ResourceSlot"
if arg_type[0] == "list" and arg_type[1] == resource_slot_type:
resource_list = function_args[arg_name]
if isinstance(resource_list, list):
try:
converted_resources = []
for resource_data in resource_list:
if isinstance(resource_data, dict) and "id" in resource_data:
converted_resource = await self._convert_resource_async(resource_data)
converted_resources.append(converted_resource)
function_args[arg_name] = converted_resources
except Exception as e:
self.lab_logger().error(
f"转换ResourceSlot列表参数 {arg_name} 失败: {e}\n{traceback.format_exc()}"
)
raise JsonCommandInitError(f"ResourceSlot列表参数转换失败: {arg_name}")
return await function(**function_args)
except KeyError as ex:
raise JsonCommandInitError(
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
)
async def _convert_resource_async(self, resource_data: Dict[str, Any]):
"""异步转换资源数据为实例"""
# 创建资源查询请求
r = SerialCommand.Request()
r.command = json.dumps({"id": resource_data["id"], "with_children": True})
# 异步调用资源查询服务
response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(r)
current_resources = json.loads(response.response)
# 转换为 PLR 资源
tree_set = ResourceTreeSet.from_raw_list(current_resources)
plr_resource = tree_set.to_plr_resources()[0]
# 通过资源跟踪器获取本地实例
res = self.resource_tracker.figure_resource(plr_resource, try_mode=True)
if len(res) == 0:
self.lab_logger().warning(f"资源转换未能索引到实例: {resource_data},返回新建实例")
return plr_resource
elif len(res) == 1:
return res[0]
else:
raise ValueError(f"资源转换得到多个实例: {res}")
# 异步上下文管理方法
async def __aenter__(self):
"""进入异步上下文"""