mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-04 05:15:10 +00:00
物料更新也是用父节点进行报送
This commit is contained in:
@@ -848,7 +848,7 @@ class MessageProcessor:
|
||||
device_action_groups[key_add].append(item["uuid"])
|
||||
|
||||
logger.info(
|
||||
f"[MessageProcessor] Resource migrated: {item['uuid'][:8]} from {device_old_id} to {device_id}"
|
||||
f"[资源同步] 跨站Transfer: {item['uuid'][:8]} from {device_old_id} to {device_id}"
|
||||
)
|
||||
else:
|
||||
# 正常update
|
||||
@@ -863,11 +863,11 @@ class MessageProcessor:
|
||||
device_action_groups[key] = []
|
||||
device_action_groups[key].append(item["uuid"])
|
||||
|
||||
logger.info(f"触发物料更新 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}")
|
||||
logger.trace(f"[资源同步] 动作 {action} 分组数量: {len(device_action_groups)}, 总数量: {len(resource_uuid_list)}")
|
||||
|
||||
# 为每个(device_id, action)创建独立的更新线程
|
||||
for (device_id, actual_action), items in device_action_groups.items():
|
||||
logger.info(f"设备 {device_id} 物料更新 {actual_action} 数量: {len(items)}")
|
||||
logger.trace(f"[资源同步] {device_id} 物料动作 {actual_action} 数量: {len(items)}")
|
||||
|
||||
def _notify_resource_tree(dev_id, act, item_list):
|
||||
try:
|
||||
|
||||
@@ -1143,7 +1143,7 @@ class DeviceNodeResourceTracker(object):
|
||||
for key in keys_to_remove:
|
||||
self.resource2parent_resource.pop(key, None)
|
||||
|
||||
logger.debug(f"成功移除资源: {resource}")
|
||||
logger.trace(f"[ResourceTracker] 成功移除资源: {resource}")
|
||||
return True
|
||||
|
||||
def clear_resource(self):
|
||||
|
||||
@@ -656,61 +656,71 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
def transfer_to_new_resource(
|
||||
self, plr_resource: "ResourcePLR", tree: ResourceTreeInstance, additional_add_params: Dict[str, Any]
|
||||
):
|
||||
) -> Optional["ResourcePLR"]:
|
||||
parent_uuid = tree.root_node.res_content.parent_uuid
|
||||
if parent_uuid:
|
||||
parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid)
|
||||
if parent_resource is None:
|
||||
if not parent_uuid:
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource} parent未知,挂载到当前节点下,额外参数:{additional_add_params}"
|
||||
)
|
||||
return None
|
||||
if parent_uuid == self.uuid:
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource}请求挂载到{self.identifier},额外参数:{additional_add_params}"
|
||||
)
|
||||
return None
|
||||
parent_resource: ResourcePLR = self.resource_tracker.uuid_to_resources.get(parent_uuid)
|
||||
if parent_resource is None:
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在"
|
||||
)
|
||||
else:
|
||||
try:
|
||||
# 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步
|
||||
additional_params = {}
|
||||
extra = getattr(plr_resource, "unilabos_extra", {})
|
||||
if len(extra):
|
||||
self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra))
|
||||
if "update_resource_site" in extra:
|
||||
additional_add_params["site"] = extra["update_resource_site"]
|
||||
site = additional_add_params.get("site", None)
|
||||
spec = inspect.signature(parent_resource.assign_child_resource)
|
||||
if "spot" in spec.parameters:
|
||||
ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering")
|
||||
if ordering_dict:
|
||||
site = list(ordering_dict.keys()).index(site)
|
||||
additional_params["spot"] = site
|
||||
old_parent = plr_resource.parent
|
||||
if old_parent is not None:
|
||||
# plr并不支持同一个deck的加载和卸载
|
||||
self.lab_logger().warning(f"物料{plr_resource}请求从{old_parent}卸载")
|
||||
old_parent.unassign_child_resource(plr_resource)
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_uuid}不存在"
|
||||
f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}"
|
||||
)
|
||||
else:
|
||||
try:
|
||||
# 特殊兼容所有plr的物料的assign方法,和create_resource append_resource后期同步
|
||||
additional_params = {}
|
||||
extra = getattr(plr_resource, "unilabos_extra", {})
|
||||
if len(extra):
|
||||
self.lab_logger().info(f"发现物料{plr_resource}额外参数: " + str(extra))
|
||||
if "update_resource_site" in extra:
|
||||
additional_add_params["site"] = extra["update_resource_site"]
|
||||
site = additional_add_params.get("site", None)
|
||||
spec = inspect.signature(parent_resource.assign_child_resource)
|
||||
if "spot" in spec.parameters:
|
||||
ordering_dict: Dict[str, Any] = getattr(parent_resource, "_ordering")
|
||||
if ordering_dict:
|
||||
site = list(ordering_dict.keys()).index(site)
|
||||
additional_params["spot"] = site
|
||||
old_parent = plr_resource.parent
|
||||
if old_parent is not None:
|
||||
# plr并不支持同一个deck的加载和卸载
|
||||
self.lab_logger().warning(f"物料{plr_resource}请求从{old_parent}卸载")
|
||||
old_parent.unassign_child_resource(plr_resource)
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource}请求挂载到{parent_resource},额外参数:{additional_params}"
|
||||
)
|
||||
|
||||
# ⭐ assign 之前,需要从 resources 列表中移除
|
||||
# 因为资源将不再是顶级资源,而是成为 parent_resource 的子资源
|
||||
# 如果不移除,figure_resource 会找到两次:一次在 resources,一次在 parent 的 children
|
||||
resource_id = id(plr_resource)
|
||||
for i, r in enumerate(self.resource_tracker.resources):
|
||||
if id(r) == resource_id:
|
||||
self.resource_tracker.resources.pop(i)
|
||||
self.lab_logger().debug(
|
||||
f"从顶级资源列表中移除 {plr_resource.name}(即将成为 {parent_resource.name} 的子资源)"
|
||||
)
|
||||
break
|
||||
# ⭐ assign 之前,需要从 resources 列表中移除
|
||||
# 因为资源将不再是顶级资源,而是成为 parent_resource 的子资源
|
||||
# 如果不移除,figure_resource 会找到两次:一次在 resources,一次在 parent 的 children
|
||||
resource_id = id(plr_resource)
|
||||
for i, r in enumerate(self.resource_tracker.resources):
|
||||
if id(r) == resource_id:
|
||||
self.resource_tracker.resources.pop(i)
|
||||
self.lab_logger().debug(
|
||||
f"从顶级资源列表中移除 {plr_resource.name}(即将成为 {parent_resource.name} 的子资源)"
|
||||
)
|
||||
break
|
||||
|
||||
parent_resource.assign_child_resource(plr_resource, location=None, **additional_params)
|
||||
parent_resource.assign_child_resource(plr_resource, location=None, **additional_params)
|
||||
|
||||
func = getattr(self.driver_instance, "resource_tree_transfer", None)
|
||||
if callable(func):
|
||||
# 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了)
|
||||
func(old_parent, plr_resource, parent_resource)
|
||||
except Exception as e:
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}"
|
||||
)
|
||||
func = getattr(self.driver_instance, "resource_tree_transfer", None)
|
||||
if callable(func):
|
||||
# 分别是 物料的原来父节点,当前物料的状态,物料的新父节点(此时物料已经重新assign了)
|
||||
func(old_parent, plr_resource, parent_resource)
|
||||
return parent_resource
|
||||
except Exception as e:
|
||||
self.lab_logger().warning(
|
||||
f"物料{plr_resource}请求挂载{tree.root_node.res_content.name}的父节点{parent_resource}[{parent_uuid}]失败!\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
async def s2c_resource_tree(self, req: SerialCommand_Request, res: SerialCommand_Response):
|
||||
"""
|
||||
@@ -725,7 +735,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
def _handle_add(
|
||||
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
) -> Tuple[Dict[str, Any], List[ResourcePLR]]:
|
||||
"""
|
||||
处理资源添加操作的内部函数
|
||||
|
||||
@@ -737,15 +747,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
Returns:
|
||||
操作结果字典
|
||||
"""
|
||||
parents = [] # 放的是被变更的物料 / 被变更的物料父级
|
||||
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||
self.resource_tracker.add_resource(plr_resource)
|
||||
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
|
||||
parent = self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
|
||||
if parent is not None:
|
||||
parents.append(parent)
|
||||
else:
|
||||
parents.append(plr_resource)
|
||||
|
||||
func = getattr(self.driver_instance, "resource_tree_add", None)
|
||||
if callable(func):
|
||||
func(plr_resources)
|
||||
|
||||
return {"success": True, "action": "add"}
|
||||
return {"success": True, "action": "add"}, parents
|
||||
|
||||
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -780,11 +795,11 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
if plr_resource.parent is not None:
|
||||
plr_resource.parent.unassign_child_resource(plr_resource)
|
||||
self.resource_tracker.remove_resource(plr_resource)
|
||||
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
|
||||
self.lab_logger().info(f"[资源同步] 移除物料 {plr_resource} 及其子节点")
|
||||
|
||||
for other_plr_resource in other_plr_resources:
|
||||
self.resource_tracker.remove_resource(other_plr_resource)
|
||||
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
|
||||
self.lab_logger().info(f"[资源同步] 移除物料 {other_plr_resource} 及其子节点")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
@@ -816,11 +831,16 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
|
||||
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
|
||||
)
|
||||
original_parent_resource = original_instance.parent
|
||||
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
||||
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
||||
not_same_parent = original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None
|
||||
old_name = original_instance.name
|
||||
new_name = plr_resource.name
|
||||
parent_appended = False
|
||||
|
||||
# Update操作中包含改名:需要先remove再add
|
||||
if original_instance.name != plr_resource.name:
|
||||
old_name = original_instance.name
|
||||
new_name = plr_resource.name
|
||||
# Update操作中包含改名:需要先remove再add,这里更新父节点即可
|
||||
if not not_same_parent and old_name != new_name:
|
||||
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
|
||||
|
||||
# 收集所有相关的uuid(包括子节点)
|
||||
@@ -829,12 +849,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
_handle_add([original_instance], tree_set, additional_add_params)
|
||||
|
||||
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
|
||||
original_instances.append(original_parent_resource)
|
||||
parent_appended = True
|
||||
|
||||
# 常规更新:不涉及改名
|
||||
original_parent_resource = original_instance.parent
|
||||
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
||||
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
||||
|
||||
self.lab_logger().info(
|
||||
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
|
||||
f"目标父节点{target_parent_resource_uuid} 更新"
|
||||
@@ -845,13 +863,12 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
|
||||
|
||||
# 如果父节点变化,需要重新挂载
|
||||
if (
|
||||
original_parent_resource_uuid != target_parent_resource_uuid
|
||||
and original_parent_resource is not None
|
||||
):
|
||||
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||
if not_same_parent:
|
||||
parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||
original_instances.append(parent)
|
||||
parent_appended = True
|
||||
else:
|
||||
# 判断是否变更了resource_site
|
||||
# 判断是否变更了resource_site,重新登记
|
||||
target_site = original_instance.unilabos_extra.get("update_resource_site")
|
||||
sites = original_instance.parent.sites if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else None
|
||||
site_names = list(original_instance.parent._ordering.keys()) if original_instance.parent is not None and hasattr(original_instance.parent, "sites") else []
|
||||
@@ -859,7 +876,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
site_index = sites.index(original_instance)
|
||||
site_name = site_names[site_index]
|
||||
if site_name != target_site:
|
||||
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||
parent = self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||
if parent is not None:
|
||||
original_instances.append(parent)
|
||||
parent_appended = True
|
||||
|
||||
# 加载状态
|
||||
original_instance.load_all_state(states)
|
||||
@@ -867,7 +887,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
self.lab_logger().info(
|
||||
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count} 个"
|
||||
)
|
||||
original_instances.append(original_instance)
|
||||
if not parent_appended:
|
||||
original_instances.append(original_instance)
|
||||
|
||||
# 调用driver的update回调
|
||||
func = getattr(self.driver_instance, "resource_tree_update", None)
|
||||
@@ -884,8 +905,8 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
action = i.get("action") # remove, add, update
|
||||
resources_uuid: List[str] = i.get("data") # 资源数据
|
||||
additional_add_params = i.get("additional_add_params", {}) # 额外参数
|
||||
self.lab_logger().info(
|
||||
f"[Resource Tree Update] Processing {action} operation, " f"resources count: {len(resources_uuid)}"
|
||||
self.lab_logger().trace(
|
||||
f"[资源同步] 处理 {action}, " f"resources count: {len(resources_uuid)}"
|
||||
)
|
||||
tree_set = None
|
||||
if action in ["add", "update"]:
|
||||
@@ -897,8 +918,13 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
if tree_set is None:
|
||||
raise ValueError("tree_set不能为None")
|
||||
plr_resources = tree_set.to_plr_resources()
|
||||
result = _handle_add(plr_resources, tree_set, additional_add_params)
|
||||
new_tree_set = ResourceTreeSet.from_plr_resources(plr_resources)
|
||||
result, parents = _handle_add(plr_resources, tree_set, additional_add_params)
|
||||
parents: List[Optional["ResourcePLR"]] = [i for i in parents if i is not None]
|
||||
de_dupe_parents = list(set(parents))
|
||||
new_tree_set = ResourceTreeSet.from_plr_resources(de_dupe_parents) # 去重
|
||||
for tree in new_tree_set.trees:
|
||||
if tree.root_node.res_content.uuid_parent is None and self.node_name != "host_node":
|
||||
tree.root_node.res_content.parent_uuid = self.uuid
|
||||
r = SerialCommand.Request()
|
||||
r.command = json.dumps(
|
||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
||||
@@ -917,7 +943,10 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
plr_resources.append(ResourceTreeSet([tree]).to_plr_resources()[0])
|
||||
result, original_instances = _handle_update(plr_resources, tree_set, additional_add_params)
|
||||
if not BasicConfig.no_update_feedback:
|
||||
new_tree_set = ResourceTreeSet.from_plr_resources(original_instances)
|
||||
new_tree_set = ResourceTreeSet.from_plr_resources(original_instances) # 去重
|
||||
for tree in new_tree_set.trees:
|
||||
if tree.root_node.res_content.uuid_parent is None and self.node_name != "host_node":
|
||||
tree.root_node.res_content.parent_uuid = self.uuid
|
||||
r = SerialCommand.Request()
|
||||
r.command = json.dumps(
|
||||
{"data": {"data": new_tree_set.dump()}, "action": "update"}) # 和Update Resource一致
|
||||
@@ -937,15 +966,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
# 返回处理结果
|
||||
result_json = {"results": results, "total": len(data)}
|
||||
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
|
||||
self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
|
||||
# self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
error_msg = f"Invalid JSON format: {str(e)}"
|
||||
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
|
||||
self.lab_logger().error(f"[资源同步] {error_msg}")
|
||||
res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
error_msg = f"Unexpected error: {str(e)}"
|
||||
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
|
||||
self.lab_logger().error(f"[资源同步] {error_msg}")
|
||||
self.lab_logger().error(traceback.format_exc())
|
||||
res.response = json.dumps({"success": False, "error": error_msg}, ensure_ascii=False)
|
||||
|
||||
|
||||
@@ -362,8 +362,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
request.command = ""
|
||||
future = sclient.call_async(request)
|
||||
# Use timeout for result as well
|
||||
future.result(timeout_sec=5.0)
|
||||
self.lab_logger().debug(f"[Host Node] Re-register completed for {device_namespace}")
|
||||
future.result()
|
||||
except Exception as e:
|
||||
# Gracefully handle destruction during shutdown
|
||||
if "destruction was requested" in str(e) or self._shutting_down:
|
||||
@@ -1515,7 +1514,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
|
||||
# 构建服务地址
|
||||
srv_address = f"/srv{namespace}/s2c_resource_tree"
|
||||
self.lab_logger().info(f"[Host Node-Resource] Notifying {device_id} for resource tree {action} operation")
|
||||
self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation started -------")
|
||||
|
||||
# 创建服务客户端
|
||||
sclient = self.create_client(SerialCommand, srv_address)
|
||||
@@ -1550,9 +1549,7 @@ class HostNode(BaseROS2DeviceNode):
|
||||
time.sleep(0.05)
|
||||
|
||||
response = future.result()
|
||||
self.lab_logger().info(
|
||||
f"[Host Node-Resource] Resource tree {action} notification completed for {device_id}"
|
||||
)
|
||||
self.lab_logger().trace(f"[Host Node-Resource] Host -> {device_id} ResourceTree {action} operation completed -------")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user