mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 13:01:12 +00:00
52 lines
2.4 KiB
Python
52 lines
2.4 KiB
Python
import rclpy
|
||
from rclpy.node import Node
|
||
from typing import Optional
|
||
from unilabos.registry.registry import lab_registry
|
||
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError
|
||
from unilabos.ros.device_node_wrapper import ros2_device_node
|
||
from unilabos.utils import logger
|
||
from unilabos.utils.import_manager import default_manager
|
||
|
||
|
||
def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2DeviceNode]:
|
||
"""Initializes a device based on its configuration.
|
||
|
||
This function dynamically imports the appropriate device class and creates an instance of it using the provided device configuration.
|
||
It also sets up action clients for the device based on its action value mappings.
|
||
|
||
Args:
|
||
device_id (str): The unique identifier for the device.
|
||
device_config (dict): The configuration dictionary for the device, which includes the class type and other parameters.
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
d = None
|
||
device_class_config = device_config["class"]
|
||
if isinstance(device_class_config, str): # 如果是字符串,则直接去lab_registry中查找,获取class
|
||
if device_class_config not in lab_registry.device_type_registry:
|
||
raise ValueError(f"Device class {device_class_config} not found.")
|
||
device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"]
|
||
if isinstance(device_class_config, dict):
|
||
DEVICE = default_manager.get_class(device_class_config["module"])
|
||
# 不管是ros2的实例,还是python的,都必须包一次,除了HostNode
|
||
DEVICE = ros2_device_node(
|
||
DEVICE,
|
||
status_types=device_class_config.get("status_types", {}),
|
||
action_value_mappings=device_class_config.get("action_value_mappings", {}),
|
||
hardware_interface=device_class_config.get(
|
||
"hardware_interface",
|
||
{"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []},
|
||
),
|
||
children=device_config.get("children", {})
|
||
)
|
||
try:
|
||
d = DEVICE(
|
||
device_id=device_id, driver_is_ros=device_class_config["type"] == "ros2", driver_params=device_config.get("config", {})
|
||
)
|
||
except DeviceInitError as ex:
|
||
return d
|
||
else:
|
||
logger.warning(f"initialize device {device_id} failed, provided device_config: {device_config}")
|
||
return d
|