From 1d181743ea56850825d6df1befa93573724975f7 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 5 Feb 2026 00:18:19 +0800 Subject: [PATCH] adapt to new samples sys --- unilabos/app/web/controller.py | 1 + unilabos/app/ws_client.py | 4 +- unilabos/ros/nodes/base_device_node.py | 37 ++++++++++++---- unilabos/ros/nodes/presets/host_node.py | 58 ++++++++----------------- 4 files changed, 51 insertions(+), 49 deletions(-) diff --git a/unilabos/app/web/controller.py b/unilabos/app/web/controller.py index acd1f56..6a01645 100644 --- a/unilabos/app/web/controller.py +++ b/unilabos/app/web/controller.py @@ -327,6 +327,7 @@ def job_add(req: JobAddReq) -> JobData: queue_item, action_type=action_type, action_kwargs=action_args, + sample_material=req.sample_material, server_info=server_info, ) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 448ab6c..7949aaa 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -545,7 +545,7 @@ class MessageProcessor: try: message_str = json.dumps(msg, ensure_ascii=False) await self.websocket.send(message_str) - logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 + # logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 except Exception as e: logger.error(f"[MessageProcessor] Failed to send message: {str(e)}") logger.error(traceback.format_exc()) @@ -1302,7 +1302,7 @@ class WebSocketClient(BaseCommunicationClient): }, } self.message_processor.send_message(message) - logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") + # logger.trace(f"[WebSocketClient] Device status published: {device_id}.{property_name}") def publish_job_status( self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 7cde6a3..952a502 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -231,14 +231,15 @@ class PropertyPublisher: def publish_property(self): try: - self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") + # self.node.lab_logger().trace(f"【.publish_property】开始发布属性: {self.name}") value = self.get_property() if self.print_publish: - self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") + pass + # self.node.lab_logger().trace(f"【.publish_property】发布 {self.msg_type}: {value}") if value is not None: msg = convert_to_ros_msg(self.msg_type, value) self.publisher_.publish(msg) - self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") + # self.node.lab_logger().trace(f"【.publish_property】属性 {self.name} 发布成功") except Exception as e: self.node.lab_logger().error( f"【.publish_property】发布属性 {self.publisher_.topic} 出错: {str(e)}\n{traceback.format_exc()}" @@ -1594,7 +1595,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): function_name = target["function_name"] function_args = target["function_args"] # 获取 unilabos 系统参数 - unilabos_param: Dict[str, Any] = target.get(JSON_UNILABOS_PARAM, {}) + unilabos_param: Dict[str, Any] = target[JSON_UNILABOS_PARAM] assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" function = getattr(self.driver_instance, function_name) @@ -1612,9 +1613,19 @@ class BaseROS2DeviceNode(Node, Generic[T]): if arg_name not in function_args: # 处理 sample_uuids 参数注入 if arg_name == PARAM_SAMPLE_UUIDS: - function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, []) + raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {}) + # 将 material uuid 转换为 resource 实例 + # key: sample_uuid, value: material_uuid -> resource 实例 + resolved_sample_uuids: Dict[str, Any] = {} + for sample_uuid, material_uuid in raw_sample_uuids.items(): + if material_uuid and self.resource_tracker: + resource = self.resource_tracker.uuid_to_resources.get(material_uuid) + resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid + else: + resolved_sample_uuids[sample_uuid] = material_uuid + function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids self.lab_logger().debug( - f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}" + f"[JsonCommand] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}" ) continue @@ -1754,9 +1765,19 @@ class BaseROS2DeviceNode(Node, Generic[T]): if arg_name not in function_args: # 处理 sample_uuids 参数注入 if arg_name == PARAM_SAMPLE_UUIDS: - function_args[PARAM_SAMPLE_UUIDS] = unilabos_param.get(PARAM_SAMPLE_UUIDS, []) + raw_sample_uuids = unilabos_param.get(PARAM_SAMPLE_UUIDS, {}) + # 将 material uuid 转换为 resource 实例 + # key: sample_uuid, value: material_uuid -> resource 实例 + resolved_sample_uuids: Dict[str, Any] = {} + for sample_uuid, material_uuid in raw_sample_uuids.items(): + if material_uuid and self.resource_tracker: + resource = self.resource_tracker.uuid_to_resources.get(material_uuid) + resolved_sample_uuids[sample_uuid] = resource if resource else material_uuid + else: + resolved_sample_uuids[sample_uuid] = material_uuid + function_args[PARAM_SAMPLE_UUIDS] = resolved_sample_uuids self.lab_logger().debug( - f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {function_args[PARAM_SAMPLE_UUIDS]}" + f"[JsonCommandAsync] 注入 {PARAM_SAMPLE_UUIDS}: {resolved_sample_uuids}" ) continue diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 09a754c..0ac1de2 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -1,17 +1,17 @@ import collections -from dataclasses import dataclass, field import json import threading import time import traceback import uuid +from dataclasses import dataclass, field from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union -from typing_extensions import TypedDict from action_msgs.msg import GoalStatus from geometry_msgs.msg import Point from rclpy.action import ActionClient, get_action_server_names_and_types_by_node from rclpy.service import Service +from typing_extensions import TypedDict from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.srv import ( ResourceAdd, @@ -23,10 +23,20 @@ from unilabos_msgs.srv import ( from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response from unique_identifier_msgs.msg import UUID +from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.registry.registry import lab_registry from unilabos.resources.container import RegularContainer from unilabos.resources.graphio import initialize_resource from unilabos.resources.registry import add_schema +from unilabos.resources.resource_tracker import ( + ResourceDict, + ResourceDictInstance, + ResourceTreeSet, + ResourceTreeInstance, + RETURN_UNILABOS_SAMPLES, + JSON_UNILABOS_PARAM, + PARAM_SAMPLE_UUIDS, +) from unilabos.ros.initialize_device import initialize_device_from_dict from unilabos.ros.msgs.message_converter import ( get_msg_type, @@ -37,20 +47,10 @@ from unilabos.ros.msgs.message_converter import ( ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode -from unilabos.resources.resource_tracker import ( - ResourceDict, - ResourceDictInstance, - ResourceTreeSet, - ResourceTreeInstance, - EXTRA_SAMPLE_UUID, - EXTRA_UNILABOS_SAMPLE_UUID, - RETURN_UNILABOS_SAMPLES, -) from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.log import warning from unilabos.utils.type_check import serialize_result_info -from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot if TYPE_CHECKING: from unilabos.app.ws_client import QueueItem @@ -776,14 +776,14 @@ class HostNode(BaseROS2DeviceNode): if action_name.startswith("auto-"): action_name = action_name[5:] action_id = f"/devices/{device_id}/_execute_driver_command" - action_kwargs = { - "string": json.dumps( - { - "function_name": action_name, - "function_args": action_kwargs, - } - ) + json_command: Dict[str, Any] = { + "function_name": action_name, + "function_args": action_kwargs, + JSON_UNILABOS_PARAM: { + PARAM_SAMPLE_UUIDS: sample_material, + }, } + action_kwargs = {"string": json.dumps(json_command)} if action_type.startswith("UniLabJsonCommandAsync"): action_id = f"/devices/{device_id}/_execute_driver_command_async" else: @@ -794,26 +794,6 @@ class HostNode(BaseROS2DeviceNode): raise ValueError(f"ActionClient {action_id} not found.") action_client: ActionClient = self._action_clients[action_id] - - # 遍历action_kwargs下的所有子dict,将sample_uuid的值赋给sample_id - def assign_sample_id(obj): - if isinstance(obj, dict): - # 处理 EXTRA_SAMPLE_UUID ("sample_uuid") - if EXTRA_SAMPLE_UUID in obj: - obj["sample_id"] = obj[EXTRA_SAMPLE_UUID] - obj.pop(EXTRA_SAMPLE_UUID) - # 处理 EXTRA_UNILABOS_SAMPLE_UUID ("unilabos_sample_uuid") - if EXTRA_UNILABOS_SAMPLE_UUID in obj: - obj["sample_id"] = obj[EXTRA_UNILABOS_SAMPLE_UUID] - obj.pop(EXTRA_UNILABOS_SAMPLE_UUID) - for k, v in obj.items(): - if k != "unilabos_extra": - assign_sample_id(v) - elif isinstance(obj, list): - for item in obj: - assign_sample_id(item) - - assign_sample_id(action_kwargs) goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) # self.lab_logger().trace(f"[Host Node] Sending goal for {action_id}: {str(goal_msg)[:1000]}")