更新transfer_resource_to_another参数,支持spot入参

This commit is contained in:
Xuwznln
2025-10-11 02:41:37 +08:00
parent df33e1a214
commit 0c42d60cf2
9 changed files with 598 additions and 519 deletions

View File

@@ -1,4 +1,5 @@
import copy
import inspect
import io
import json
import threading
@@ -332,7 +333,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 创建资源管理客户端
self._resource_clients: Dict[str, Client] = {
"resource_add": self.create_client(ResourceAdd, "/resources/add"),
"resource_get": self.create_client(ResourceGet, "/resources/get"),
"resource_get": self.create_client(SerialCommand, "/resources/get"),
"resource_delete": self.create_client(ResourceDelete, "/resources/delete"),
"resource_update": self.create_client(ResourceUpdate, "/resources/update"),
"resource_list": self.create_client(ResourceList, "/resources/list"),
@@ -578,6 +579,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
for i in data:
action = i.get("action") # remove, add, update
resources_uuid: List[str] = i.get("data") # 资源数据
additional_add_params = i.get("additional_add_params", {}) # 额外参数
self.lab_logger().info(
f"[Resource Tree Update] Processing {action} operation, "
f"resources count: {len(resources_uuid)}"
@@ -609,7 +611,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在")
else:
try:
parent_resource.assign_child_resource(plr_resource, location=None)
# 特殊兼容所有plr的物料的assign方法和create_resource append_resource后期同步
additional_params = {}
site = additional_add_params.get("site", None)
spec = inspect.signature(parent_resource.assign_child_resource)
if "spot" in spec.parameters:
additional_params["spot"] = site
parent_resource.assign_child_resource(plr_resource, location=None, **additional_params)
except Exception as e:
self.lab_logger().warning(
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}")
@@ -666,14 +674,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
return res
async def transfer_resource_to_another(self, plr_resources: List["ResourcePLR"], target_device_id, target_resource_uuid: str):
async def transfer_resource_to_another(self, plr_resources: List["ResourcePLR"], target_device_id: str, target_resources: List["ResourcePLR"], sites: List[str]):
# 准备工作
uids = []
target_uids = []
for plr_resource in plr_resources:
uid = getattr(plr_resource, "unilabos_uuid", None)
if uid is None:
raise ValueError(f"物料{plr_resource}没有unilabos_uuid属性无法转运")
raise ValueError(f"来源物料{plr_resource}没有unilabos_uuid属性无法转运")
uids.append(uid)
for target_resource in target_resources:
uid = getattr(target_resource, "unilabos_uuid", None)
if uid is None:
raise ValueError(f"目标物料{target_resource}没有unilabos_uuid属性无法转运")
target_uids.append(uid)
srv_address = f"/srv{target_device_id}/s2c_resource_tree"
sclient = self.create_client(SerialCommand, srv_address)
# 等待服务可用(设置超时)
@@ -688,31 +702,33 @@ class BaseROS2DeviceNode(Node, Generic[T]):
}], ensure_ascii=False)), SerialCommand_Response())
# 通知云端转运资源
tree_set = ResourceTreeSet.from_plr_resources(plr_resources)
for root_node in tree_set.root_nodes:
root_node.res_content.parent = None
root_node.res_content.parent_uuid = target_resource_uuid
r = SerialCommand.Request()
r.command = json.dumps({"data": {"data": tree_set.dump()}, "action": "update"}) # 和Update Resource一致
response: SerialCommand_Response = await self._resource_clients["c2s_update_resource_tree"].call_async(r) # type: ignore
self.lab_logger().info(f"资源云端转运到{target_device_id}结果: {response.response}")
for plr_resource, target_uid, site in zip(plr_resources, target_uids, sites):
tree_set = ResourceTreeSet.from_plr_resources([plr_resource])
for root_node in tree_set.root_nodes:
root_node.res_content.parent = None
root_node.res_content.parent_uuid = target_uid
r = SerialCommand.Request()
r.command = json.dumps({"data": {"data": tree_set.dump()}, "action": "update"}) # 和Update Resource一致
response: SerialCommand_Response = await self._resource_clients["c2s_update_resource_tree"].call_async(r) # type: ignore
self.lab_logger().info(f"资源云端转运到{target_device_id}结果: {response.response}")
# 创建请求
request = SerialCommand.Request()
request.command = json.dumps([{
"action": "add",
"data": tree_set.all_nodes_uuid # 只添加父节点,子节点会自动添加
}], ensure_ascii=False)
# 创建请求
request = SerialCommand.Request()
request.command = json.dumps([{
"action": "add",
"data": tree_set.all_nodes_uuid, # 只添加父节点,子节点会自动添加
"additional_add_params": {"site": site}
}], ensure_ascii=False)
future = sclient.call_async(request)
timeout = 30.0
start_time = time.time()
while not future.done():
if time.time() - start_time > timeout:
self.lab_logger().error(f"[{self.device_id} Node-Resource] Timeout waiting for response from {target_device_id}")
return False
time.sleep(0.05)
self.lab_logger().info(f"资源本地增加到{target_device_id}结果: {response.response}")
future = sclient.call_async(request)
timeout = 30.0
start_time = time.time()
while not future.done():
if time.time() - start_time > timeout:
self.lab_logger().error(f"[{self.device_id} Node-Resource] Timeout waiting for response from {target_device_id}")
return False
time.sleep(0.05)
self.lab_logger().info(f"资源本地增加到{target_device_id}结果: {response.response}")
return None
def register_device(self):
@@ -872,7 +888,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
for k, v in goal.get_fields_and_field_types().items():
if v in ["unilabos_msgs/Resource", "sequence<unilabos_msgs/Resource>"]:
self.lab_logger().info(f"{action_name} 查询资源状态: Key: {k} Type: {v}")
current_resources: Union[List[Resource], List[List[Resource]]] = []
current_resources: List[List[Dict[str, Any]]] = []
# TODO: resource后面需要分组
only_one_resource = False
try:
@@ -881,8 +897,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
r = ResourceGet.Request()
r.id = i["id"] # splash optional
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
current_resources.append(response.resources)
response: SerialCommand_Response = await self._resource_clients["resource_get"].call_async(r)
current_resources.append(json.loads(response.response))
else:
only_one_resource = True
r = ResourceGet.Request()
@@ -893,23 +909,21 @@ class BaseROS2DeviceNode(Node, Generic[T]):
)
r.with_children = True
response = await self._resource_clients["resource_get"].call_async(r)
current_resources.extend(response.resources)
current_resources.append(json.loads(response.response))
except Exception:
logger.error(f"资源查询失败,默认使用本地资源")
# 删除对response.resources的检查因为它总是存在
type_hint = action_paramtypes[k]
final_type = get_type_class(type_hint)
if only_one_resource:
resources_list: List[Dict[str, Any]] = [convert_from_ros_msg(rs) for rs in current_resources] # type: ignore
self.lab_logger().debug(f"资源查询结果: {len(resources_list)} 个资源")
final_resource = convert_resources_to_type(resources_list, final_type)
tree_set = ResourceTreeSet.from_raw_list(current_resources[0])
self.lab_logger().debug(f"资源查询结果: {len(tree_set.all_nodes)} 个资源")
final_resource: List[ResourcePLR] | ResourcePLR = tree_set.to_plr_resources()[0]
# 判断 ACTION 是否需要特殊的物料类型如 pylabrobot.resources.Resource并做转换
else:
resources_list: List[List[Dict[str, Any]]] = [[convert_from_ros_msg(rs) for rs in sub_res_list] for sub_res_list in current_resources] # type: ignore
final_resource = [
convert_resources_to_type(sub_res_list, final_type)[0]
for sub_res_list in resources_list
]
final_resource: List[ResourcePLR] | ResourcePLR = []
for entry in current_resources:
final_resource.append(ResourceTreeSet.from_raw_list(entry)[0]) # type: ignore
try:
action_kwargs[k] = self.resource_tracker.figure_resource(final_resource, try_mode=False)
except Exception as e: