diff --git a/unilabos/resources/graphio.py b/unilabos/resources/graphio.py index b894973c..7dc8fe11 100644 --- a/unilabos/resources/graphio.py +++ b/unilabos/resources/graphio.py @@ -58,22 +58,22 @@ def canonicalize_nodes_data( node["name"] = node.get("id") print_status(f"Warning: Node {node.get('id', 'unknown')} missing 'name', defaulting to {node['name']}", "warning") if not isinstance(node.get("position"), dict): - node["position"] = {"position": {}} + node["pose"] = {"position": {}} x = node.pop("x", None) if x is not None: - node["position"]["position"]["x"] = x + node["pose"]["position"]["x"] = x y = node.pop("y", None) if y is not None: - node["position"]["position"]["y"] = y + node["pose"]["position"]["y"] = y z = node.pop("z", None) if z is not None: - node["position"]["position"]["z"] = z + node["pose"]["position"]["z"] = z if "sample_id" in node: sample_id = node.pop("sample_id") if sample_id: logger.error(f"{node}的sample_id参数已弃用,sample_id: {sample_id}") for k in list(node.keys()): - if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children"]: + if k not in ["id", "uuid", "name", "description", "schema", "model", "icon", "parent_uuid", "parent", "type", "class", "position", "config", "data", "children", "pose"]: v = node.pop(k) node["config"][k] = v diff --git a/unilabos/ros/device_node_wrapper.py b/unilabos/ros/device_node_wrapper.py index 51ff2171..f5e80c58 100644 --- a/unilabos/ros/device_node_wrapper.py +++ b/unilabos/ros/device_node_wrapper.py @@ -5,6 +5,7 @@ from unilabos.ros.msgs.message_converter import ( get_action_type, ) from unilabos.ros.nodes.base_device_node import init_wrapper, ROS2DeviceNode +from unilabos.ros.nodes.resource_tracker import ResourceDictInstance # 定义泛型类型变量 T = TypeVar("T") @@ -18,12 +19,11 @@ class ROS2DeviceNodeWrapper(ROS2DeviceNode): def ros2_device_node( cls: Type[T], - device_config: Optional[Dict[str, Any]] = None, + device_config: Optional[ResourceDictInstance] = None, status_types: Optional[Dict[str, Any]] = None, action_value_mappings: Optional[Dict[str, Any]] = None, hardware_interface: Optional[Dict[str, Any]] = None, print_publish: bool = False, - children: Optional[Dict[str, Any]] = None, ) -> Type[ROS2DeviceNodeWrapper]: """Create a ROS2 Node class for a device class with properties and actions. @@ -45,7 +45,7 @@ def ros2_device_node( if status_types is None: status_types = {} if device_config is None: - device_config = {} + raise ValueError("device_config cannot be None") if action_value_mappings is None: action_value_mappings = {} if hardware_interface is None: @@ -82,7 +82,6 @@ def ros2_device_node( action_value_mappings=action_value_mappings, hardware_interface=hardware_interface, print_publish=print_publish, - children=children, *args, **kwargs, ), diff --git a/unilabos/ros/initialize_device.py b/unilabos/ros/initialize_device.py index a92a9f50..55ac1455 100644 --- a/unilabos/ros/initialize_device.py +++ b/unilabos/ros/initialize_device.py @@ -4,13 +4,14 @@ from typing import Optional from unilabos.registry.registry import lab_registry from unilabos.ros.device_node_wrapper import ros2_device_node from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError +from unilabos.ros.nodes.resource_tracker import ResourceDictInstance from unilabos.utils import logger from unilabos.utils.exception import DeviceClassInvalid from unilabos.utils.import_manager import default_manager -def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2DeviceNode]: +def initialize_device_from_dict(device_id, device_config: ResourceDictInstance) -> Optional[ROS2DeviceNode]: """Initializes a device based on its configuration. This function dynamically imports the appropriate device class and creates an instance of it using the provided device configuration. @@ -24,15 +25,14 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device None """ d = None - original_device_config = copy.deepcopy(device_config) - device_class_config = device_config["class"] - uid = device_config["uuid"] + device_class_config = device_config.res_content.klass + uid = device_config.res_content.uuid if isinstance(device_class_config, str): # 如果是字符串,则直接去lab_registry中查找,获取class if len(device_class_config) == 0: raise DeviceClassInvalid(f"Device [{device_id}] class cannot be an empty string. {device_config}") if device_class_config not in lab_registry.device_type_registry: raise DeviceClassInvalid(f"Device [{device_id}] class {device_class_config} not found. {device_config}") - device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"] + device_class_config = lab_registry.device_type_registry[device_class_config]["class"] elif isinstance(device_class_config, dict): raise DeviceClassInvalid(f"Device [{device_id}] class config should be type 'str' but 'dict' got. {device_config}") if isinstance(device_class_config, dict): @@ -41,17 +41,16 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device DEVICE = ros2_device_node( DEVICE, status_types=device_class_config.get("status_types", {}), - device_config=original_device_config, + device_config=device_config, action_value_mappings=device_class_config.get("action_value_mappings", {}), hardware_interface=device_class_config.get( "hardware_interface", {"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []}, - ), - children=device_config.get("children", {}) + ) ) try: d = DEVICE( - device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {}) + device_id=device_id, device_uuid=uid, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.res_content.config ) except DeviceInitError as ex: return d diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index ecf8697a..4373cea3 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -192,7 +192,7 @@ def slave( for device_config in devices_config.root_nodes: device_id = device_config.res_content.id if device_config.res_content.type == "device": - d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) + d = initialize_device_from_dict(device_id, device_config) if d is not None: devices_instances[device_id] = d logger.info(f"Device {device_id} initialized.") diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index ae6db267..4495dbf8 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -48,7 +48,7 @@ from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import ( DeviceNodeResourceTracker, ResourceTreeSet, - ResourceTreeInstance, + ResourceTreeInstance, ResourceDictInstance, ) from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator @@ -133,12 +133,11 @@ def init_wrapper( device_id: str, device_uuid: str, driver_class: type[T], - device_config: Dict[str, Any], + device_config: ResourceTreeInstance, status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], print_publish: bool, - children: Optional[list] = None, driver_params: Optional[Dict[str, Any]] = None, driver_is_ros: bool = False, *args, @@ -147,8 +146,6 @@ def init_wrapper( """初始化设备节点的包装函数,和ROS2DeviceNode初始化保持一致""" if driver_params is None: driver_params = kwargs.copy() - if children is None: - children = [] kwargs["device_id"] = device_id kwargs["device_uuid"] = device_uuid kwargs["driver_class"] = driver_class @@ -157,7 +154,6 @@ def init_wrapper( kwargs["status_types"] = status_types kwargs["action_value_mappings"] = action_value_mappings kwargs["hardware_interface"] = hardware_interface - kwargs["children"] = children kwargs["print_publish"] = print_publish kwargs["driver_is_ros"] = driver_is_ros super(type(self), self).__init__(*args, **kwargs) @@ -1582,12 +1578,11 @@ class ROS2DeviceNode: device_id: str, device_uuid: str, driver_class: Type[T], - device_config: Dict[str, Any], + device_config: ResourceDictInstance, driver_params: Dict[str, Any], status_types: Dict[str, Any], action_value_mappings: Dict[str, Any], hardware_interface: Dict[str, Any], - children: Dict[str, Any], print_publish: bool = True, driver_is_ros: bool = False, ): @@ -1598,7 +1593,7 @@ class ROS2DeviceNode: device_id: 设备标识符 device_uuid: 设备uuid driver_class: 设备类 - device_config: 原始初始化的json + device_config: 原始初始化的ResourceDictInstance driver_params: driver初始化的参数 status_types: 状态类型映射 action_value_mappings: 动作值映射 @@ -1612,6 +1607,7 @@ class ROS2DeviceNode: self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__") self._driver_class = driver_class self.device_config = device_config + children: List[ResourceDictInstance] = device_config.children self.driver_is_ros = driver_is_ros self.driver_is_workstation = False self.resource_tracker = DeviceNodeResourceTracker() diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 2e7f7a25..2dba3422 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -532,7 +532,7 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info(f"[Host Node] Initializing device: {device_id}") try: - d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) + d = initialize_device_from_dict(device_id, device_config) except DeviceClassInvalid as e: self.lab_logger().error(f"[Host Node] Device class invalid: {e}") d = None diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index af1afab5..7325dda6 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -24,7 +24,7 @@ from unilabos.ros.msgs.message_converter import ( convert_from_ros_msg_with_mapping, ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, DeviceNodeResourceTracker, ROS2DeviceNode -from unilabos.ros.nodes.resource_tracker import ResourceTreeSet +from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDictInstance from unilabos.utils.type_check import get_result_info_str if TYPE_CHECKING: @@ -47,7 +47,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): def __init__( self, protocol_type: List[str], - children: Dict[str, Any], + children: List[ResourceDictInstance], *, driver_instance: "WorkstationBase", device_id: str, @@ -81,10 +81,11 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): # 初始化子设备 self.communication_node_id_to_instance = {} - for device_id, device_config in self.children.items(): - if device_config.get("type", "device") != "device": + for device_config in self.children: + device_id = device_config.res_content.id + if device_config.res_content.type != "device": self.lab_logger().debug( - f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." + f"[Protocol Node] Skipping type {device_config.res_content.type} {device_id} already existed, skipping." ) continue try: @@ -101,8 +102,9 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): self.communication_node_id_to_instance[device_id] = d continue - for device_id, device_config in self.children.items(): - if device_config.get("type", "device") != "device": + for device_config in self.children: + device_id = device_config.res_content.id + if device_config.res_content.type != "device": continue # 设置硬件接口代理 if device_id not in self.sub_devices: diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 2a01b315..f306597e 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -62,7 +62,6 @@ class ResourceDict(BaseModel): parent: Optional["ResourceDict"] = Field(description="Parent resource object", default=None, exclude=True) type: Union[Literal["device"], str] = Field(description="Resource type") klass: str = Field(alias="class", description="Resource class name") - position: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) pose: ResourceDictPosition = Field(description="Resource position", default_factory=ResourceDictPosition) config: Dict[str, Any] = Field(description="Resource configuration") data: Dict[str, Any] = Field(description="Resource data") @@ -146,15 +145,16 @@ class ResourceDictInstance(object): if not content.get("extra"): # MagicCode content["extra"] = {} if "pose" not in content: - content["pose"] = content.get("position", {}) + content["pose"] = content.pop("position", {}) return ResourceDictInstance(ResourceDict.model_validate(content)) - def get_nested_dict(self) -> Dict[str, Any]: + def get_plr_nested_dict(self) -> Dict[str, Any]: """获取资源实例的嵌套字典表示""" res_dict = self.res_content.model_dump(by_alias=True) - res_dict["children"] = {child.res_content.id: child.get_nested_dict() for child in self.children} + res_dict["children"] = {child.res_content.id: child.get_plr_nested_dict() for child in self.children} res_dict["parent"] = self.res_content.parent_instance_name res_dict["position"] = self.res_content.position.position.model_dump() + del res_dict["pose"] return res_dict diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 9481ce31..7a60474a 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -9,10 +9,11 @@ import asyncio import inspect import traceback from abc import abstractmethod -from typing import Type, Any, Dict, Optional, TypeVar, Generic +from typing import Type, Any, Dict, Optional, TypeVar, Generic, List from unilabos.resources.graphio import nested_dict_to_list, resource_ulab_to_plr -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet, ResourceDictInstance, \ + ResourceTreeInstance from unilabos.utils import logger, import_manager from unilabos.utils.cls_creator import create_instance_from_config @@ -33,7 +34,7 @@ class DeviceClassCreator(Generic[T]): 这个类提供了从任意类创建实例的通用方法。 """ - def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): + def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker): """ 初始化设备类创建器 @@ -50,9 +51,9 @@ class DeviceClassCreator(Generic[T]): 附加资源到设备类实例 """ if self.device_instance is not None: - for c in self.children.values(): - if c["type"] != "device": - self.resource_tracker.add_resource(c) + for c in self.children: + if c.res_content.type != "device": + self.resource_tracker.add_resource(c.get_plr_nested_dict()) def create_instance(self, data: Dict[str, Any]) -> T: """ @@ -94,7 +95,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]): 这个类提供了针对PyLabRobot设备类的实例创建方法,特别处理deserialize方法。 """ - def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): + def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker): """ 初始化PyLabRobot设备类创建器 @@ -111,12 +112,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): def attach_resource(self): pass # 只能增加实例化物料,原来默认物料仅为字典查询 - def _process_resource_mapping(self, resource, source_type): - if source_type == dict: - from pylabrobot.resources.resource import Resource - - return nested_dict_to_list(resource), Resource - return resource, source_type + # def _process_resource_mapping(self, resource, source_type): + # if source_type == dict: + # from pylabrobot.resources.resource import Resource + # + # return nested_dict_to_list(resource), Resource + # return resource, source_type def _process_resource_references( self, data: Any, to_dict=False, states=None, prefix_path="", name_to_uuid=None @@ -142,15 +143,21 @@ class PyLabRobotCreator(DeviceClassCreator[T]): if isinstance(data, dict): if "_resource_child_name" in data: child_name = data["_resource_child_name"] - if child_name in self.children: - resource = self.children[child_name] + resource: Optional[ResourceDictInstance] = None + for child in self.children: + if child.res_content.name == child_name: + resource = child + if resource is not None: if "_resource_type" in data: type_path = data["_resource_type"] try: - target_type = import_manager.get_class(type_path) - contain_model = not issubclass(target_type, Deck) - resource, target_type = self._process_resource_mapping(resource, target_type) - resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state + # target_type = import_manager.get_class(type_path) + # contain_model = not issubclass(target_type, Deck) + # resource, target_type = self._process_resource_mapping(resource, target_type) + res_tree = ResourceTreeInstance(resource) + res_tree_set = ResourceTreeSet([res_tree]) + resource_instance: Resource = res_tree_set.to_plr_resources()[0] + # resource_instance: Resource = resource_ulab_to_plr(resource, contain_model) # 带state states[prefix_path] = resource_instance.serialize_all_state() # 使用 prefix_path 作为 key 存储资源状态 if to_dict: @@ -202,12 +209,12 @@ class PyLabRobotCreator(DeviceClassCreator[T]): stack = None # 递归遍历 children 构建 name_to_uuid 映射 - def collect_name_to_uuid(children_dict: Dict[str, Any], result: Dict[str, str]): + def collect_name_to_uuid(children_list: List[ResourceDictInstance], result: Dict[str, str]): """递归遍历嵌套的 children 字典,收集 name 到 uuid 的映射""" - for child in children_dict.values(): - if isinstance(child, dict): - result[child["name"]] = child["uuid"] - collect_name_to_uuid(child["children"], result) + for child in children_list: + if isinstance(child, ResourceDictInstance): + result[child.res_content.name] = child.res_content.uuid + collect_name_to_uuid(child.children, result) name_to_uuid = {} collect_name_to_uuid(self.children, name_to_uuid) @@ -313,7 +320,7 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): 这个类提供了针对WorkstationNode设备类的实例创建方法,处理children参数。 """ - def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): + def __init__(self, cls: Type[T], children: List[ResourceDictInstance], resource_tracker: DeviceNodeResourceTracker): """ 初始化WorkstationNode设备类创建器 @@ -336,9 +343,9 @@ class WorkstationNodeCreator(DeviceClassCreator[T]): try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children - for material_id, child in self.children.items(): - if child["type"] != "device": - self.resource_tracker.add_resource(self.children[material_id]) + for child in self.children: + if child.res_content.type != "device": + self.resource_tracker.add_resource(child.get_plr_nested_dict()) deck_dict = data.get("deck") if deck_dict: from pylabrobot.resources import Deck, Resource