--- description: 设备驱动开发规范 globs: ["unilabos/devices/**/*.py"] --- # 设备驱动开发规范 ## 目录结构 ``` unilabos/devices/ ├── virtual/ # 虚拟设备(用于测试) │ ├── virtual_stirrer.py │ └── virtual_centrifuge.py ├── liquid_handling/ # 液体处理设备 ├── balance/ # 天平设备 ├── hplc/ # HPLC设备 ├── pump_and_valve/ # 泵和阀门 ├── temperature/ # 温度控制设备 ├── workstation/ # 工作站(组合设备) └── ... ``` ## 设备类完整模板 ```python import asyncio import logging import time as time_module from typing import Dict, Any, Optional from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode class MyDevice: """ 设备类描述 Attributes: device_id: 设备唯一标识 config: 设备配置字典 data: 设备状态数据 """ _ros_node: BaseROS2DeviceNode def __init__( self, device_id: str = None, config: Dict[str, Any] = None, **kwargs ): """ 初始化设备 Args: device_id: 设备ID config: 配置字典 **kwargs: 其他参数 """ # 兼容不同调用方式 if device_id is None and 'id' in kwargs: device_id = kwargs.pop('id') if config is None and 'config' in kwargs: config = kwargs.pop('config') self.device_id = device_id or "unknown_device" self.config = config or {} self.data = {} # 从config读取参数 self.port = self.config.get('port') or kwargs.get('port', 'COM1') self._max_value = self.config.get('max_value', 1000.0) # 初始化日志 self.logger = logging.getLogger(f"MyDevice.{self.device_id}") self.logger.info(f"设备 {self.device_id} 已创建") def post_init(self, ros_node: BaseROS2DeviceNode): """ ROS节点注入 - 在ROS节点创建后调用 Args: ros_node: ROS2设备节点实例 """ self._ros_node = ros_node async def initialize(self) -> bool: """ 初始化设备 - 连接硬件、设置初始状态 Returns: bool: 初始化是否成功 """ self.logger.info(f"初始化设备 {self.device_id}") try: # 执行硬件初始化 # await self._connect_hardware() # 设置初始状态 self.data.update({ "status": "待机", "is_running": False, "current_value": 0.0, }) self.logger.info(f"设备 {self.device_id} 初始化完成") return True except Exception as e: self.logger.error(f"初始化失败: {e}") self.data["status"] = f"错误: {e}" return False async def cleanup(self) -> bool: """ 清理设备 - 断开连接、释放资源 Returns: bool: 清理是否成功 """ self.logger.info(f"清理设备 {self.device_id}") self.data.update({ "status": "离线", "is_running": False, }) return True # ==================== 设备动作 ==================== async def execute_action( self, param1: float, param2: str = "", **kwargs ) -> bool: """ 执行设备动作 Args: param1: 参数1 param2: 参数2(可选) Returns: bool: 动作是否成功 """ # 类型转换和验证 try: param1 = float(param1) except (ValueError, TypeError) as e: self.logger.error(f"参数类型错误: {e}") return False # 参数验证 if param1 > self._max_value: self.logger.error(f"参数超出范围: {param1} > {self._max_value}") return False self.logger.info(f"执行动作: param1={param1}, param2={param2}") # 更新状态 self.data.update({ "status": "运行中", "is_running": True, }) # 执行动作(带进度反馈) duration = 10.0 # 秒 start_time = time_module.time() while True: elapsed = time_module.time() - start_time remaining = max(0, duration - elapsed) progress = min(100, (elapsed / duration) * 100) self.data.update({ "status": f"运行中: {progress:.0f}%", "remaining_time": remaining, }) if remaining <= 0: break await self._ros_node.sleep(1.0) # 完成 self.data.update({ "status": "完成", "is_running": False, }) self.logger.info("动作执行完成") return True # ==================== 状态属性 ==================== @property def status(self) -> str: """设备状态 - 自动发布为ROS Topic""" return self.data.get("status", "未知") @property def is_running(self) -> bool: """是否正在运行""" return self.data.get("is_running", False) @property def current_value(self) -> float: """当前值""" return self.data.get("current_value", 0.0) # ==================== 辅助方法 ==================== def get_device_info(self) -> Dict[str, Any]: """获取设备信息""" return { "device_id": self.device_id, "status": self.status, "is_running": self.is_running, "current_value": self.current_value, } def __str__(self) -> str: return f"MyDevice({self.device_id}: {self.status})" ``` ## 关键规则 ### 1. 参数处理 所有动作方法的参数都可能以字符串形式传入,必须进行类型转换: ```python async def my_action(self, value: float, **kwargs) -> bool: # 始终进行类型转换 try: value = float(value) except (ValueError, TypeError) as e: self.logger.error(f"参数类型错误: {e}") return False ``` ### 2. vessel 参数处理 vessel 参数可能是字符串ID或字典: ```python def extract_vessel_id(vessel: Union[str, dict]) -> str: if isinstance(vessel, dict): return vessel.get("id", "") return str(vessel) if vessel else "" ``` ### 3. 状态更新 使用 `self.data` 字典存储状态,属性读取状态: ```python # 更新状态 self.data["status"] = "运行中" self.data["current_speed"] = 300.0 # 读取状态(通过属性) @property def status(self) -> str: return self.data.get("status", "待机") ``` ### 4. 异步等待 使用 ROS 节点的 sleep 方法: ```python # 正确 await self._ros_node.sleep(1.0) # 避免(除非在纯 Python 测试环境) await asyncio.sleep(1.0) ``` ### 5. 进度反馈 长时间运行的操作需要提供进度反馈: ```python while remaining > 0: progress = (elapsed / total_time) * 100 self.data["status"] = f"运行中: {progress:.0f}%" self.data["remaining_time"] = remaining await self._ros_node.sleep(1.0) ``` ## 虚拟设备 虚拟设备用于测试和演示,放在 `unilabos/devices/virtual/` 目录: - 类名以 `Virtual` 开头 - 文件名以 `virtual_` 开头 - 模拟真实设备的行为和时序 - 使用表情符号增强日志可读性(可选) ## 工作站设备 工作站是组合多个设备的复杂设备: ```python from unilabos.devices.workstation.workstation_base import WorkstationBase class MyWorkstation(WorkstationBase): """组合工作站""" async def execute_workflow(self, workflow: Dict[str, Any]) -> bool: """执行工作流""" pass ``` ## 设备注册 设备类开发完成后,需要在注册表中注册: 1. 创建/编辑 `unilabos/registry/devices/my_category.yaml` 2. 添加设备配置(参考 `virtual_device.yaml`) 3. 运行 `--complete_registry` 自动生成 schema