Files
Uni-Lab-OS/unilabos/ros/initialize_device.py
Junhan Chang c78ac482d8 Initial commit
2025-04-17 15:19:47 +08:00

52 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import rclpy
from rclpy.node import Node
from typing import Optional
from unilabos.registry.registry import lab_registry
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError
from unilabos.ros.device_node_wrapper import ros2_device_node
from unilabos.utils import logger
from unilabos.utils.import_manager import default_manager
def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2DeviceNode]:
"""Initializes a device based on its configuration.
This function dynamically imports the appropriate device class and creates an instance of it using the provided device configuration.
It also sets up action clients for the device based on its action value mappings.
Args:
device_id (str): The unique identifier for the device.
device_config (dict): The configuration dictionary for the device, which includes the class type and other parameters.
Returns:
None
"""
d = None
device_class_config = device_config["class"]
if isinstance(device_class_config, str): # 如果是字符串则直接去lab_registry中查找获取class
if device_class_config not in lab_registry.device_type_registry:
raise ValueError(f"Device class {device_class_config} not found.")
device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"]
if isinstance(device_class_config, dict):
DEVICE = default_manager.get_class(device_class_config["module"])
# 不管是ros2的实例还是python的都必须包一次除了HostNode
DEVICE = ros2_device_node(
DEVICE,
status_types=device_class_config.get("status_types", {}),
action_value_mappings=device_class_config.get("action_value_mappings", {}),
hardware_interface=device_class_config.get(
"hardware_interface",
{"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []},
),
children=device_config.get("children", {})
)
try:
d = DEVICE(
device_id=device_id, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {})
)
except DeviceInitError as ex:
return d
else:
logger.warning(f"initialize device {device_id} failed, provided device_config: {device_config}")
return d