mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 21:11:12 +00:00
pass device config to device class
This commit is contained in:
@@ -7,7 +7,7 @@ zhida_hplc:
|
|||||||
status: String
|
status: String
|
||||||
action_value_mappings:
|
action_value_mappings:
|
||||||
start:
|
start:
|
||||||
type: StringSingleInput
|
type: StrSingleInput
|
||||||
goal:
|
goal:
|
||||||
string: string
|
string: string
|
||||||
feedback: {}
|
feedback: {}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ class ROS2DeviceNodeWrapper(ROS2DeviceNode):
|
|||||||
|
|
||||||
def ros2_device_node(
|
def ros2_device_node(
|
||||||
cls: Type[T],
|
cls: Type[T],
|
||||||
|
device_config: Optional[Dict[str, Any]] = None,
|
||||||
status_types: Optional[Dict[str, Any]] = None,
|
status_types: Optional[Dict[str, Any]] = None,
|
||||||
action_value_mappings: Optional[Dict[str, Any]] = None,
|
action_value_mappings: Optional[Dict[str, Any]] = None,
|
||||||
hardware_interface: Optional[Dict[str, Any]] = None,
|
hardware_interface: Optional[Dict[str, Any]] = None,
|
||||||
@@ -30,6 +31,7 @@ def ros2_device_node(
|
|||||||
cls: 要封装的设备类
|
cls: 要封装的设备类
|
||||||
status_types: 需要发布的状态和传感器信息,每个(PROP: TYPE),PROP应该匹配cls.PROP或cls.get_PROP(),
|
status_types: 需要发布的状态和传感器信息,每个(PROP: TYPE),PROP应该匹配cls.PROP或cls.get_PROP(),
|
||||||
TYPE应该是ROS2消息类型。默认为{}。
|
TYPE应该是ROS2消息类型。默认为{}。
|
||||||
|
device_config: 初始化时的config。
|
||||||
action_value_mappings: 设备动作。默认为{}。
|
action_value_mappings: 设备动作。默认为{}。
|
||||||
每个(ACTION: {'type': CMD_TYPE, 'goal': {FIELD: PROP}, 'feedback': {FIELD: PROP}, 'result': {FIELD: PROP}}),
|
每个(ACTION: {'type': CMD_TYPE, 'goal': {FIELD: PROP}, 'feedback': {FIELD: PROP}, 'result': {FIELD: PROP}}),
|
||||||
hardware_interface: 硬件接口配置。默认为{"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []}。
|
hardware_interface: 硬件接口配置。默认为{"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []}。
|
||||||
@@ -42,6 +44,8 @@ def ros2_device_node(
|
|||||||
# 从属性中自动发现可发布状态
|
# 从属性中自动发现可发布状态
|
||||||
if status_types is None:
|
if status_types is None:
|
||||||
status_types = {}
|
status_types = {}
|
||||||
|
if device_config is None:
|
||||||
|
device_config = {}
|
||||||
if action_value_mappings is None:
|
if action_value_mappings is None:
|
||||||
action_value_mappings = {}
|
action_value_mappings = {}
|
||||||
if hardware_interface is None:
|
if hardware_interface is None:
|
||||||
@@ -73,6 +77,7 @@ def ros2_device_node(
|
|||||||
"__init__": lambda self, *args, **kwargs: init_wrapper(
|
"__init__": lambda self, *args, **kwargs: init_wrapper(
|
||||||
self,
|
self,
|
||||||
driver_class=cls,
|
driver_class=cls,
|
||||||
|
device_config=device_config,
|
||||||
status_types=status_types,
|
status_types=status_types,
|
||||||
action_value_mappings=action_value_mappings,
|
action_value_mappings=action_value_mappings,
|
||||||
hardware_interface=hardware_interface,
|
hardware_interface=hardware_interface,
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import rclpy
|
import copy
|
||||||
from rclpy.node import Node
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from unilabos.registry.registry import lab_registry
|
from unilabos.registry.registry import lab_registry
|
||||||
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError
|
|
||||||
from unilabos.ros.device_node_wrapper import ros2_device_node
|
from unilabos.ros.device_node_wrapper import ros2_device_node
|
||||||
|
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, DeviceInitError
|
||||||
from unilabos.utils import logger
|
from unilabos.utils import logger
|
||||||
from unilabos.utils.import_manager import default_manager
|
from unilabos.utils.import_manager import default_manager
|
||||||
|
|
||||||
@@ -22,17 +22,21 @@ def initialize_device_from_dict(device_id, device_config) -> Optional[ROS2Device
|
|||||||
None
|
None
|
||||||
"""
|
"""
|
||||||
d = None
|
d = None
|
||||||
|
original_device_config = copy.deepcopy(device_config)
|
||||||
device_class_config = device_config["class"]
|
device_class_config = device_config["class"]
|
||||||
if isinstance(device_class_config, str): # 如果是字符串,则直接去lab_registry中查找,获取class
|
if isinstance(device_class_config, str): # 如果是字符串,则直接去lab_registry中查找,获取class
|
||||||
if device_class_config not in lab_registry.device_type_registry:
|
if device_class_config not in lab_registry.device_type_registry:
|
||||||
raise ValueError(f"Device class {device_class_config} not found.")
|
raise ValueError(f"Device class {device_class_config} not found.")
|
||||||
device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"]
|
device_class_config = device_config["class"] = lab_registry.device_type_registry[device_class_config]["class"]
|
||||||
|
else:
|
||||||
|
raise ValueError("不再支持class为字典传入,class必须为注册表中已经提供的设备,您可以新增注册表并通过--registry传入")
|
||||||
if isinstance(device_class_config, dict):
|
if isinstance(device_class_config, dict):
|
||||||
DEVICE = default_manager.get_class(device_class_config["module"])
|
DEVICE = default_manager.get_class(device_class_config["module"])
|
||||||
# 不管是ros2的实例,还是python的,都必须包一次,除了HostNode
|
# 不管是ros2的实例,还是python的,都必须包一次,除了HostNode
|
||||||
DEVICE = ros2_device_node(
|
DEVICE = ros2_device_node(
|
||||||
DEVICE,
|
DEVICE,
|
||||||
status_types=device_class_config.get("status_types", {}),
|
status_types=device_class_config.get("status_types", {}),
|
||||||
|
device_config=original_device_config,
|
||||||
action_value_mappings=device_class_config.get("action_value_mappings", {}),
|
action_value_mappings=device_class_config.get("action_value_mappings", {}),
|
||||||
hardware_interface=device_class_config.get(
|
hardware_interface=device_class_config.get(
|
||||||
"hardware_interface",
|
"hardware_interface",
|
||||||
|
|||||||
@@ -101,6 +101,7 @@ def init_wrapper(
|
|||||||
self,
|
self,
|
||||||
device_id: str,
|
device_id: str,
|
||||||
driver_class: type[T],
|
driver_class: type[T],
|
||||||
|
device_config: Dict[str, Any],
|
||||||
status_types: Dict[str, Any],
|
status_types: Dict[str, Any],
|
||||||
action_value_mappings: Dict[str, Any],
|
action_value_mappings: Dict[str, Any],
|
||||||
hardware_interface: Dict[str, Any],
|
hardware_interface: Dict[str, Any],
|
||||||
@@ -118,6 +119,7 @@ def init_wrapper(
|
|||||||
children = []
|
children = []
|
||||||
kwargs["device_id"] = device_id
|
kwargs["device_id"] = device_id
|
||||||
kwargs["driver_class"] = driver_class
|
kwargs["driver_class"] = driver_class
|
||||||
|
kwargs["device_config"] = device_config
|
||||||
kwargs["driver_params"] = driver_params
|
kwargs["driver_params"] = driver_params
|
||||||
kwargs["status_types"] = status_types
|
kwargs["status_types"] = status_types
|
||||||
kwargs["action_value_mappings"] = action_value_mappings
|
kwargs["action_value_mappings"] = action_value_mappings
|
||||||
@@ -627,6 +629,7 @@ class ROS2DeviceNode:
|
|||||||
self,
|
self,
|
||||||
device_id: str,
|
device_id: str,
|
||||||
driver_class: Type[T],
|
driver_class: Type[T],
|
||||||
|
device_config: Dict[str, Any],
|
||||||
driver_params: Dict[str, Any],
|
driver_params: Dict[str, Any],
|
||||||
status_types: Dict[str, Any],
|
status_types: Dict[str, Any],
|
||||||
action_value_mappings: Dict[str, Any],
|
action_value_mappings: Dict[str, Any],
|
||||||
@@ -641,6 +644,8 @@ class ROS2DeviceNode:
|
|||||||
Args:
|
Args:
|
||||||
device_id: 设备标识符
|
device_id: 设备标识符
|
||||||
driver_class: 设备类
|
driver_class: 设备类
|
||||||
|
device_config: 原始初始化的json
|
||||||
|
driver_params: driver初始化的参数
|
||||||
status_types: 状态类型映射
|
status_types: 状态类型映射
|
||||||
action_value_mappings: 动作值映射
|
action_value_mappings: 动作值映射
|
||||||
hardware_interface: 硬件接口配置
|
hardware_interface: 硬件接口配置
|
||||||
@@ -657,6 +662,7 @@ class ROS2DeviceNode:
|
|||||||
# 保存设备类是否支持异步上下文
|
# 保存设备类是否支持异步上下文
|
||||||
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
|
self._has_async_context = hasattr(driver_class, "__aenter__") and hasattr(driver_class, "__aexit__")
|
||||||
self._driver_class = driver_class
|
self._driver_class = driver_class
|
||||||
|
self.device_config = device_config
|
||||||
self.driver_is_ros = driver_is_ros
|
self.driver_is_ros = driver_is_ros
|
||||||
self.resource_tracker = DeviceNodeResourceTracker()
|
self.resource_tracker = DeviceNodeResourceTracker()
|
||||||
|
|
||||||
|
|||||||
@@ -101,14 +101,7 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
|
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
|
||||||
self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
|
self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
|
||||||
self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例
|
self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例
|
||||||
self._action_value_mappings: Dict[str, Dict] = {
|
self._action_value_mappings: Dict[str, Dict] = {} # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
|
||||||
"add_resrouce": {
|
|
||||||
"type": ResourceCreateFromOuter,
|
|
||||||
"goal": {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
|
|
||||||
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
|
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
|
||||||
self._online_devices: Set[str] = set() # 用于跟踪在线设备
|
self._online_devices: Set[str] = set() # 用于跟踪在线设备
|
||||||
self._last_discovery_time = 0.0 # 上次设备发现的时间
|
self._last_discovery_time = 0.0 # 上次设备发现的时间
|
||||||
@@ -264,16 +257,21 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
|
self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
|
||||||
action_name = action_id[len(namespace) + 1:]
|
action_name = action_id[len(namespace) + 1:]
|
||||||
edge_device_id = namespace[9:]
|
edge_device_id = namespace[9:]
|
||||||
from unilabos.app.mq import mqtt_client
|
# from unilabos.app.mq import mqtt_client
|
||||||
info_with_schema = ros_action_to_json_schema(action_type)
|
# info_with_schema = ros_action_to_json_schema(action_type)
|
||||||
mqtt_client.publish_actions(action_name, {
|
# mqtt_client.publish_actions(action_name, {
|
||||||
"device_id": edge_device_id,
|
# "device_id": edge_device_id,
|
||||||
"action_name": action_name,
|
# "device_type": "",
|
||||||
"schema": info_with_schema,
|
# "action_name": action_name,
|
||||||
})
|
# "schema": info_with_schema,
|
||||||
|
# })
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
|
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
|
||||||
|
|
||||||
|
def add_resource_from_outer(self, resources: list["Resource"], device_ids):
|
||||||
|
print("111")
|
||||||
|
pass
|
||||||
|
|
||||||
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
|
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
|
||||||
"""
|
"""
|
||||||
根据配置初始化设备,
|
根据配置初始化设备,
|
||||||
@@ -302,13 +300,14 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
action_type = action_value_mapping["type"]
|
action_type = action_value_mapping["type"]
|
||||||
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
|
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
|
||||||
self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
|
self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
|
||||||
from unilabos.app.mq import mqtt_client
|
# from unilabos.app.mq import mqtt_client
|
||||||
info_with_schema = ros_action_to_json_schema(action_type)
|
# info_with_schema = ros_action_to_json_schema(action_type)
|
||||||
mqtt_client.publish_actions(action_name, {
|
# mqtt_client.publish_actions(action_name, {
|
||||||
"device_id": device_id,
|
# "device_id": device_id,
|
||||||
"action_name": action_name,
|
# "device_type": device_config["class"],
|
||||||
"schema": info_with_schema,
|
# "action_name": action_name,
|
||||||
})
|
# "schema": info_with_schema,
|
||||||
|
# })
|
||||||
else:
|
else:
|
||||||
self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.")
|
self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.")
|
||||||
device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id
|
device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id
|
||||||
|
|||||||
Reference in New Issue
Block a user