Resource update & asyncio fix

correct bioyond config

prcxi example

fix append_resource

fix regularcontainer

fix cancel error

fix resource_get param

fix json dumps

support name change during materials change

enable slave mode

change uuid logger to trace level

correct remove_resource stats

disable slave connect websocket

adjust with_children param

modify devices to use correct executor (sleep, create_task)

support sleep and create_task in node

fix run async execution error
This commit is contained in:
Xuwznln
2025-10-31 21:43:25 +08:00
parent 8807865649
commit b6dfe2b944
32 changed files with 1456 additions and 943 deletions

View File

@@ -6,11 +6,12 @@ from typing import Optional, Dict, Any, List
import rclpy
from unilabos_msgs.srv._serial_command import SerialCommand_Response
from unilabos.app.register import register_devices_and_resources
from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
from unilabos_msgs.srv import SerialCommand # type: ignore
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.timer import Timer
@@ -108,66 +109,51 @@ def slave(
rclpy_init_args: List[str] = ["--log-level", "debug"],
) -> None:
"""从节点函数"""
# 1. 初始化 ROS2
if not rclpy.ok():
rclpy.init(args=rclpy_init_args)
executor = rclpy.__executor
if not executor:
executor = rclpy.__executor = MultiThreadedExecutor()
devices_instances = {}
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
devices_instances[device_id] = d
# 默认初始化
# if d is not None and isinstance(d, Node):
# executor.add_node(d)
# else:
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
resource_mesh_manager = ResourceMeshManager(
resources_mesh_config,
resources_config, # type: ignore FIXME
resource_tracker=DeviceNodeResourceTracker(),
device_id="resource_mesh_manager",
)
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
executor.add_node(resource_mesh_manager)
executor.add_node(joint_republisher)
# 1.5 启动 executor 线程
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
thread.start()
# 2. 创建 Slave Machine Node
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
# 3. 向 Host 报送节点信息和物料,获取 UUID 映射
uuid_mapping = {}
if not BasicConfig.slave_no_host:
# 3.1 报送节点信息
sclient = n.create_client(SerialCommand, "/node_info_update")
sclient.wait_for_service()
registry_config = {}
devices_to_register, resources_to_register = register_devices_and_resources(lab_registry, True)
registry_config.update(devices_to_register)
registry_config.update(resources_to_register)
request = SerialCommand.Request()
request.command = json.dumps(
{
"machine_name": BasicConfig.machine_name,
"type": "slave",
"devices_config": devices_config.dump(),
"registry_config": lab_registry.obtain_registry_device_info(),
"registry_config": registry_config,
},
ensure_ascii=False,
cls=TypeEncoder,
)
response = sclient.call_async(request).result()
sclient.call_async(request).result()
logger.info(f"Slave node info updated.")
# 使用新的 c2s_update_resource_tree 服务
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
rclient.wait_for_service()
# 序列化 ResourceTreeSet 为 JSON
# 3.2 报送物料树,获取 UUID 映射
if resources_config:
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
rclient.wait_for_service()
request = SerialCommand.Request()
request.command = json.dumps(
{
@@ -180,35 +166,61 @@ def slave(
},
ensure_ascii=False,
)
tree_response: SerialCommand_Response = rclient.call_async(request).result()
tree_response: SerialCommand_Response = rclient.call(request)
uuid_mapping = json.loads(tree_response.response)
# 创建反向映射new_uuid -> old_uuid
reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
for node in resources_config.root_nodes:
if node.res_content.type == "device":
for sub_node in node.children:
# 只有二级子设备
if sub_node.res_content.type != "device":
device_tracker = devices_instances[node.res_content.id].resource_tracker
# sub_node.res_content.uuid 已经是新UUID需要用旧UUID去查找
old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid)
if old_uuid:
# 找到旧UUID使用UUID查找
resource_instance = device_tracker.figure_resource({"uuid": old_uuid})
else:
# 未找到旧UUID使用name查找
resource_instance = device_tracker.figure_resource({"name": sub_node.res_content.name})
device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
logger.info(f"Slave resource tree added. UUID mapping: {len(uuid_mapping)} nodes")
# 3.3 使用 UUID 映射更新 resources_config 的 UUID参考 client.py 逻辑)
old_uuids = {node.res_content.uuid: node for node in resources_config.all_nodes}
for old_uuid, node in old_uuids.items():
if old_uuid in uuid_mapping:
new_uuid = uuid_mapping[old_uuid]
node.res_content.uuid = new_uuid
# 更新所有子节点的 parent_uuid
for child in node.children:
child.res_content.parent_uuid = new_uuid
else:
logger.error("Slave模式不允许新增非设备节点下的物料")
continue
if tree_response:
logger.info(f"Slave resource tree added. Response: {tree_response.response}")
else:
logger.warning("Slave resource tree add response is None")
logger.warning(f"资源UUID未更新: {old_uuid}")
else:
logger.info("No resources to add.")
# 4. 初始化所有设备实例(此时 resources_config 的 UUID 已更新)
devices_instances = {}
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type == "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
if d is not None:
devices_instances[device_id] = d
logger.info(f"Device {device_id} initialized.")
else:
logger.warning(f"Device {device_id} initialization failed.")
# 5. 如果启用可视化,创建可视化相关节点
if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
# 将 ResourceTreeSet 转换为 list 用于 visual 组件
resources_list = (
[node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
if resources_config
else []
)
resource_mesh_manager = ResourceMeshManager(
resources_mesh_config,
resources_list,
resource_tracker=DeviceNodeResourceTracker(),
device_id="resource_mesh_manager",
)
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
lh_joint_pub = LiquidHandlerJointPublisher(
resources_config=resources_list, resource_tracker=DeviceNodeResourceTracker()
)
executor.add_node(resource_mesh_manager)
executor.add_node(joint_republisher)
executor.add_node(lh_joint_pub)
# 7. 保持运行
while True:
time.sleep(1)

View File

@@ -53,7 +53,7 @@ from unilabos.ros.nodes.resource_tracker import (
)
from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func
from rclpy.task import Task, Future
from unilabos.utils.import_manager import default_manager
from unilabos.utils.log import info, debug, warning, error, critical, logger, trace
from unilabos.utils.type_check import get_type_class, TypeEncoder, get_result_info_str
@@ -466,8 +466,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
contain_model = not isinstance(resource, Deck)
if isinstance(resource, ResourcePLR):
# resources.list()
resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources}))
plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model)
plr_instance = ResourceTreeSet.from_raw_list(resources).to_plr_resources()[0]
# resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources}))
# plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model)
if isinstance(plr_instance, Plate):
empty_liquid_info_in = [(None, 0)] * plr_instance.num_items
@@ -555,6 +556,15 @@ class BaseROS2DeviceNode(Node, Generic[T]):
rclpy.get_global_executor().add_node(self)
self.lab_logger().debug(f"ROS节点初始化完成")
async def sleep(self, rel_time: float, callback_group=None):
if callback_group is None:
callback_group = self.callback_group
await ROS2DeviceNode.async_wait_for(self, rel_time, callback_group)
@classmethod
async def create_task(cls, func, trace_error=True, **kwargs) -> Task:
return ROS2DeviceNode.run_async_func(func, trace_error, **kwargs)
async def update_resource(self, resources: List["ResourcePLR"]):
r = SerialCommand.Request()
tree_set = ResourceTreeSet.from_plr_resources(resources)
@@ -629,6 +639,145 @@ class BaseROS2DeviceNode(Node, Generic[T]):
- remove: 从资源树中移除资源
"""
from pylabrobot.resources.resource import Resource as ResourcePLR
def _handle_add(
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
处理资源添加操作的内部函数
Args:
plr_resources: PLR资源列表
tree_set: 资源树集合
additional_add_params: 额外的添加参数
Returns:
操作结果字典
"""
for plr_resource, tree in zip(plr_resources, tree_set.trees):
self.resource_tracker.add_resource(plr_resource)
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
return {"success": True, "action": "add"}
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
"""
处理资源移除操作的内部函数
Args:
resources_uuid: 要移除的资源UUID列表
Returns:
操作结果字典,包含移除的资源列表
"""
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
[{"uuid": uid} for uid in resources_uuid], try_mode=True
)
found_plr_resources = []
other_plr_resources = []
for found_resource in found_resources:
for resource in found_resource:
if issubclass(resource.__class__, ResourcePLR):
found_plr_resources.append(resource)
else:
other_plr_resources.append(resource)
# 调用driver的remove回调
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(found_plr_resources)
# 从parent卸载并从tracker移除
for plr_resource in found_plr_resources:
if plr_resource.parent is not None:
plr_resource.parent.unassign_child_resource(plr_resource)
self.resource_tracker.remove_resource(plr_resource)
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
for other_plr_resource in other_plr_resources:
self.resource_tracker.remove_resource(other_plr_resource)
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
return {
"success": True,
"action": "remove",
# "removed_plr": found_plr_resources,
# "removed_other": other_plr_resources,
}
def _handle_update(
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
处理资源更新操作的内部函数
Args:
plr_resources: PLR资源列表包含新状态
tree_set: 资源树集合
additional_add_params: 额外的参数
Returns:
操作结果字典
"""
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
# Update操作中包含改名需要先remove再add
if original_instance.name != plr_resource.name:
old_name = original_instance.name
new_name = plr_resource.name
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
# 收集所有相关的uuid包括子节点
_handle_remove([original_instance.unilabos_uuid])
original_instance.name = new_name
_handle_add([original_instance], tree_set, additional_add_params)
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
# 常规更新:不涉及改名
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
self.lab_logger().info(
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
f"目标父节点{target_parent_resource_uuid} 更新"
)
# 更新extra
if getattr(plr_resource, "unilabos_extra", None) is not None:
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
# 如果父节点变化,需要重新挂载
if (
original_parent_resource_uuid != target_parent_resource_uuid
and original_parent_resource is not None
):
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
# 加载状态
original_instance.load_all_state(states)
child_count = len(original_instance.get_all_children())
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count}"
)
# 调用driver的update回调
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
func(plr_resources)
return {"success": True, "action": "update"}
try:
data = json.loads(req.command)
results = []
@@ -647,7 +796,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
].call_async(
SerialCommand.Request(
command=json.dumps(
{"data": {"data": resources_uuid, "with_children": False}, "action": "get"}
{"data": {"data": resources_uuid, "with_children": True if action == "add" else False}, "action": "get"}
)
)
) # type: ignore
@@ -655,68 +804,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
tree_set = ResourceTreeSet.from_raw_list(raw_nodes)
try:
if action == "add":
# 添加资源到资源跟踪器
if tree_set is None:
raise ValueError("tree_set不能为None")
plr_resources = tree_set.to_plr_resources()
for plr_resource, tree in zip(plr_resources, tree_set.trees):
self.resource_tracker.add_resource(plr_resource)
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
results.append({"success": True, "action": "add"})
result = _handle_add(plr_resources, tree_set, additional_add_params)
results.append(result)
elif action == "update":
# 更新资源
if tree_set is None:
raise ValueError("tree_set不能为None")
plr_resources = tree_set.to_plr_resources()
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
self.lab_logger().info(
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} 目标父节点{target_parent_resource_uuid} 更新"
)
# todo: 对extra进行update
if getattr(plr_resource, "unilabos_extra", None) is not None:
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra")
if original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None:
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
original_instance.load_all_state(states)
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())}"
)
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
func(plr_resources)
results.append({"success": True, "action": "update"})
result = _handle_update(plr_resources, tree_set, additional_add_params)
results.append(result)
elif action == "remove":
# 移除资源
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
[{"uuid": uid} for uid in resources_uuid], try_mode=True
)
found_plr_resources = []
other_plr_resources = []
for found_resource in found_resources:
for resource in found_resource:
if issubclass(resource.__class__, ResourcePLR):
found_plr_resources.append(resource)
else:
other_plr_resources.append(resource)
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(found_plr_resources)
for plr_resource in found_plr_resources:
if plr_resource.parent is not None:
plr_resource.parent.unassign_child_resource(plr_resource)
self.resource_tracker.remove_resource(plr_resource)
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
for other_plr_resource in other_plr_resources:
self.resource_tracker.remove_resource(other_plr_resource)
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
results.append({"success": True, "action": "remove"})
result = _handle_remove(resources_uuid)
results.append(result)
except Exception as e:
error_msg = f"Error processing {action} operation: {str(e)}"
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
@@ -725,7 +826,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 返回处理结果
result_json = {"results": results, "total": len(data)}
res.response = json.dumps(result_json, ensure_ascii=False)
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
except json.JSONDecodeError as e:
@@ -995,9 +1096,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 通过资源跟踪器获取本地实例
final_resources = queried_resources if is_sequence else queried_resources[0]
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
]
final_resources = (
self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
if not is_sequence
else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False)
for res in queried_resources
]
)
action_kwargs[k] = final_resources
except Exception as e:
@@ -1218,6 +1324,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raise JsonCommandInitError(
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
)
def _convert_resource_sync(self, resource_data: Dict[str, Any]):
"""同步转换资源数据为实例"""
# 创建资源查询请求
@@ -1385,18 +1492,27 @@ class ROS2DeviceNode:
它不继承设备类,而是通过代理模式访问设备类的属性和方法。
"""
# 类变量,用于循环管理
_loop = None
_loop_running = False
_loop_thread = None
@classmethod
def run_async_func(cls, func, trace_error=True, **kwargs) -> Task:
def _handle_future_exception(fut):
try:
fut.result()
except Exception as e:
error(f"异步任务 {func.__name__} 报错了")
error(traceback.format_exc())
future = rclpy.get_global_executor().create_task(func(**kwargs))
if trace_error:
future.add_done_callback(_handle_future_exception)
return future
@classmethod
def get_loop(cls):
return cls._loop
@classmethod
def run_async_func(cls, func, trace_error=True, **kwargs):
return run_async_func(func, loop=cls._loop, trace_error=trace_error, **kwargs)
async def async_wait_for(cls, node: Node, wait_time: float, callback_group=None):
future = Future()
timer = node.create_timer(wait_time, lambda : future.set_result(None), callback_group=callback_group, clock=node.get_clock())
await future
timer.cancel()
node.destroy_timer(timer)
@property
def driver_instance(self):
@@ -1436,11 +1552,6 @@ class ROS2DeviceNode:
print_publish: 是否打印发布信息
driver_is_ros:
"""
# 在初始化时检查循环状态
if ROS2DeviceNode._loop_running and ROS2DeviceNode._loop_thread is not None:
pass
elif ROS2DeviceNode._loop_thread is None:
self._start_loop()
# 保存设备类是否支持异步上下文
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
@@ -1529,17 +1640,6 @@ class ROS2DeviceNode:
except Exception as e:
self._ros_node.lab_logger().error(f"设备后初始化失败: {e}")
def _start_loop(self):
def run_event_loop():
loop = asyncio.new_event_loop()
ROS2DeviceNode._loop = loop
asyncio.set_event_loop(loop)
loop.run_forever()
ROS2DeviceNode._loop_thread = threading.Thread(target=run_event_loop, daemon=True, name="ROS2DeviceNodeLoop")
ROS2DeviceNode._loop_thread.start()
logger.info(f"循环线程已启动")
class DeviceInfoType(TypedDict):
id: str

View File

@@ -18,7 +18,8 @@ from unilabos_msgs.srv import (
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand, ResourceGet,
SerialCommand,
ResourceGet,
) # type: ignore
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
@@ -164,29 +165,16 @@ class HostNode(BaseROS2DeviceNode):
# resources_config 的 root node 是
# # 创建反向映射new_uuid -> old_uuid
# reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
# for tree in resources_config.trees:
# node = tree.root_node
# if node.res_content.type == "device":
# if node.res_content.id == "host_node":
# continue
# # slave节点走c2s更新接口拿到add自行update uuid
# device_tracker = self.devices_instances[node.res_content.id].resource_tracker
# old_uuid = reverse_uuid_mapping.get(node.res_content.uuid)
# if old_uuid:
# # 找到旧UUID使用UUID查找
# resource_instance = device_tracker.uuid_to_resources.get(old_uuid)
# else:
# # 未找到旧UUID使用name查找
# resource_instance = device_tracker.figure_resource(
# {"name": node.res_content.name}
# )
# device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
# else:
# try:
# for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
# self.resource_tracker.add_resource(plr_resource)
# except Exception as ex:
# self.lab_logger().warning("[Host Node-Resource] 根节点物料序列化失败!")
for tree in resources_config.trees:
node = tree.root_node
if node.res_content.type == "device":
continue
else:
try:
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
except Exception as ex:
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
# 初始化Node基类传递空参数覆盖列表
@@ -746,46 +734,116 @@ class HostNode(BaseROS2DeviceNode):
def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""获取结果回调"""
job_id = item.job_id
result_msg = future.result().result
result_data = convert_from_ros_msg(result_msg)
status = "success"
return_info_str = result_data.get("return_info")
if return_info_str is not None:
try:
return_info = json.loads(return_info_str)
suc = return_info.get("suc", False)
if not suc:
status = "failed"
except json.JSONDecodeError:
try:
result = future.result()
result_msg = result.result
goal_status = result.status
# 检查是否是被取消的任务
if goal_status == GoalStatus.STATUS_CANCELED:
self.lab_logger().info(f"[Host Node] Goal {action_id} ({job_id[:8]}) was cancelled")
status = "failed"
return_info = serialize_result_info("", False, result_data)
self.lab_logger().critical("错误的return_info类型请断点修复")
else:
# 无 return_info 字段时,回退到 success 字段(若存在)
suc_field = result_data.get("success")
if isinstance(suc_field, bool):
status = "success" if suc_field else "failed"
return_info = serialize_result_info("", suc_field, result_data)
return_info = serialize_result_info("Job was cancelled", False, {})
else:
# 最保守的回退标记失败并返回空JSON
status = "failed"
return_info = serialize_result_info("缺少return_info", False, result_data)
result_data = convert_from_ros_msg(result_msg)
status = "success"
return_info_str = result_data.get("return_info")
if return_info_str is not None:
try:
return_info = json.loads(return_info_str)
suc = return_info.get("suc", False)
if not suc:
status = "failed"
except json.JSONDecodeError:
status = "failed"
return_info = serialize_result_info("", False, result_data)
self.lab_logger().critical("错误的return_info类型请断点修复")
else:
# 无 return_info 字段时,回退到 success 字段(若存在)
suc_field = result_data.get("success")
if isinstance(suc_field, bool):
status = "success" if suc_field else "failed"
return_info = serialize_result_info("", suc_field, result_data)
else:
# 最保守的回退标记失败并返回空JSON
status = "failed"
return_info = serialize_result_info("缺少return_info", False, result_data)
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id}): {status}")
self.lab_logger().debug(f"[Host Node] Result data: {result_data}")
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}")
if goal_status != GoalStatus.STATUS_CANCELED:
self.lab_logger().debug(f"[Host Node] Result data: {result_data}")
if job_id:
# 清理 _goals 中的记录
if job_id in self._goals:
del self._goals[job_id]
self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals")
# 发布状态到桥接器
if job_id:
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
if goal_status == GoalStatus.STATUS_CANCELED:
bridge.publish_job_status({}, item, status, return_info)
else:
bridge.publish_job_status(result_data, item, status, return_info)
except Exception as e:
self.lab_logger().error(
f"[Host Node] Error in get_result_callback for {action_id} ({job_id[:8]}): {str(e)}"
)
import traceback
self.lab_logger().error(traceback.format_exc())
# 清理 _goals 中的记录
if job_id in self._goals:
del self._goals[job_id]
# 发布失败状态
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(result_data, item, status, return_info)
bridge.publish_job_status(
{}, item, "failed", serialize_result_info(f"Callback error: {str(e)}", False, {})
)
def cancel_goal(self, goal_uuid: str) -> None:
"""取消目标"""
def cancel_goal(self, goal_uuid: str) -> bool:
"""
取消目标
Args:
goal_uuid: 目标UUIDjob_id
Returns:
bool: 如果找到目标并发起取消请求返回True否则返回False
"""
if goal_uuid in self._goals:
self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid}")
self._goals[goal_uuid].cancel_goal_async()
self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid[:8]}")
goal_handle = self._goals[goal_uuid]
# 发起异步取消请求
cancel_future = goal_handle.cancel_goal_async()
# 添加取消完成的回调
cancel_future.add_done_callback(lambda future: self._cancel_goal_callback(goal_uuid, future))
return True
else:
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid} not found, cannot cancel")
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} not found in _goals, cannot cancel")
return False
def _cancel_goal_callback(self, goal_uuid: str, future) -> None:
"""取消目标的回调"""
try:
cancel_response = future.result()
if cancel_response.goals_canceling:
self.lab_logger().info(f"[Host Node] Goal {goal_uuid[:8]} cancel request accepted")
else:
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} cancel request rejected")
except Exception as e:
self.lab_logger().error(f"[Host Node] Error cancelling goal {goal_uuid[:8]}: {str(e)}")
import traceback
self.lab_logger().error(traceback.format_exc())
def get_goal_status(self, job_id: str) -> int:
"""获取目标状态"""
@@ -877,11 +935,10 @@ class HostNode(BaseROS2DeviceNode):
success = False
uuid_mapping = {}
if len(self.bridges) > 0:
from unilabos.app.web.client import HTTPClient
from unilabos.app.web.client import HTTPClient, http_client
client: HTTPClient = self.bridges[-1]
resource_start_time = time.time()
uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
success = True
resource_end_time = time.time()
self.lab_logger().info(
@@ -989,9 +1046,10 @@ class HostNode(BaseROS2DeviceNode):
"""
更新节点信息回调
"""
self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
try:
from unilabos.app.communication import get_communication_client
from unilabos.app.web.client import HTTPClient, http_client
info = json.loads(request.command)
if "SYNC_SLAVE_NODE_INFO" in info:
@@ -1000,10 +1058,10 @@ class HostNode(BaseROS2DeviceNode):
edge_device_id = info["edge_device_id"]
self.device_machine_names[edge_device_id] = machine_name
else:
comm_client = get_communication_client()
registry_config = info["registry_config"]
for device_config in registry_config:
comm_client.publish_registry(device_config["id"], device_config)
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1029,10 +1087,9 @@ class HostNode(BaseROS2DeviceNode):
success = False
if len(self.bridges) > 0: # 边的提交待定
from unilabos.app.web.client import HTTPClient
from unilabos.app.web.client import HTTPClient, http_client
client: HTTPClient = self.bridges[-1]
r = client.resource_add(add_schema(resources))
r = http_client.resource_add(add_schema(resources))
success = bool(r)
response.success = success

View File

@@ -2,7 +2,7 @@ import traceback
import uuid
from pydantic import BaseModel, field_serializer, field_validator
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from unilabos.utils.log import logger
@@ -60,7 +60,7 @@ class ResourceDict(BaseModel):
icon: str = Field(description="Resource icon", default="")
parent_uuid: Optional["str"] = Field(description="Parent resource uuid", default=None) # 先设定parent_uuid
parent: Optional["ResourceDict"] = Field(description="Parent resource object", default=None, exclude=True)
type: Literal["device"] | str = Field(description="Resource type")
type: Union[Literal["device"], str] = Field(description="Resource type")
klass: str = Field(alias="class", description="Resource class name")
position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition)
@@ -848,15 +848,13 @@ class DeviceNodeResourceTracker(object):
extra: extra字典值
"""
if isinstance(resource, dict):
# ⭐ 修复合并extra而不是覆盖
current_extra = resource.get("extra", {})
current_extra.update(extra)
resource["extra"] = current_extra
c_extra = resource.get("extra", {})
c_extra.update(extra)
resource["extra"] = c_extra
else:
# ⭐ 修复合并unilabos_extra而不是覆盖
current_extra = getattr(resource, "unilabos_extra", {})
current_extra.update(extra)
setattr(resource, "unilabos_extra", current_extra)
c_extra = getattr(resource, "unilabos_extra", {})
c_extra.update(extra)
setattr(resource, "unilabos_extra", c_extra)
def _traverse_and_process(self, resource, process_func) -> int:
"""
@@ -900,7 +898,7 @@ class DeviceNodeResourceTracker(object):
new_uuid = name_to_uuid_map[resource_name]
self.set_resource_uuid(res, new_uuid)
self.uuid_to_resources[new_uuid] = res
logger.debug(f"设置资源UUID: {resource_name} -> {new_uuid}")
logger.trace(f"设置资源UUID: {resource_name} -> {new_uuid}")
return 1
return 0
@@ -923,7 +921,8 @@ class DeviceNodeResourceTracker(object):
if resource_name and resource_name in name_to_extra_map:
extra = name_to_extra_map[resource_name]
self.set_resource_extra(res, extra)
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
if len(extra):
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
return 1
return 0
@@ -933,7 +932,7 @@ class DeviceNodeResourceTracker(object):
"""
递归遍历资源树更新所有节点的uuid
Args:0
Args:
resource: 资源对象可以是dict或实例
uuid_map: uuid映射字典{old_uuid: new_uuid}
@@ -958,6 +957,27 @@ class DeviceNodeResourceTracker(object):
return self._traverse_and_process(resource, process)
def loop_gather_uuid(self, resource) -> List[str]:
"""
递归遍历资源树收集所有节点的uuid
Args:
resource: 资源对象可以是dict或实例
Returns:
收集到的uuid列表
"""
uuid_list = []
def process(res):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid:
uuid_list.append(current_uuid)
return 0
self._traverse_and_process(resource, process)
return uuid_list
def _collect_uuid_mapping(self, resource):
"""
递归收集资源的 uuid 映射到 uuid_to_resources
@@ -971,14 +991,15 @@ class DeviceNodeResourceTracker(object):
if current_uuid:
old = self.uuid_to_resources.get(current_uuid)
self.uuid_to_resources[current_uuid] = res
logger.debug(
logger.trace(
f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}"
)
return 1
return 0
self._traverse_and_process(resource, process)
def _remove_uuid_mapping(self, resource):
def _remove_uuid_mapping(self, resource) -> int:
"""
递归清除资源的 uuid 映射
@@ -990,10 +1011,11 @@ class DeviceNodeResourceTracker(object):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid and current_uuid in self.uuid_to_resources:
self.uuid_to_resources.pop(current_uuid)
logger.debug(f"移除资源UUID映射: {current_uuid} -> {res}")
logger.trace(f"移除资源UUID映射: {current_uuid} -> {res}")
return 1
return 0
self._traverse_and_process(resource, process)
return self._traverse_and_process(resource, process)
def parent_resource(self, resource):
if id(resource) in self.resource2parent_resource:
@@ -1048,13 +1070,12 @@ class DeviceNodeResourceTracker(object):
removed = True
break
if not removed:
# 递归清除uuid映射
count = self._remove_uuid_mapping(resource)
if not count:
logger.warning(f"尝试移除不存在的资源: {resource}")
return False
# 递归清除uuid映射
self._remove_uuid_mapping(resource)
# 清除 resource2parent_resource 中与该资源相关的映射
# 需要清除1) 该资源作为 key 的映射 2) 该资源作为 value 的映射
keys_to_remove = []
@@ -1077,7 +1098,9 @@ class DeviceNodeResourceTracker(object):
self.uuid_to_resources.clear()
self.resource2parent_resource.clear()
def figure_resource(self, query_resource, try_mode=False):
def figure_resource(
self, query_resource: Union[List[Union[dict, "PLRResource"]], dict, "PLRResource"], try_mode=False
) -> Union[List[Union[dict, "PLRResource", List[Union[dict, "PLRResource"]]]], dict, "PLRResource"]:
if isinstance(query_resource, list):
return [self.figure_resource(r, try_mode) for r in query_resource]
elif (