mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-04 13:25:13 +00:00
329 lines
8.0 KiB
Plaintext
329 lines
8.0 KiB
Plaintext
---
|
||
description: 设备驱动开发规范
|
||
globs: ["unilabos/devices/**/*.py"]
|
||
---
|
||
|
||
# 设备驱动开发规范
|
||
|
||
## 目录结构
|
||
|
||
```
|
||
unilabos/devices/
|
||
├── virtual/ # 虚拟设备(用于测试)
|
||
│ ├── virtual_stirrer.py
|
||
│ └── virtual_centrifuge.py
|
||
├── liquid_handling/ # 液体处理设备
|
||
├── balance/ # 天平设备
|
||
├── hplc/ # HPLC设备
|
||
├── pump_and_valve/ # 泵和阀门
|
||
├── temperature/ # 温度控制设备
|
||
├── workstation/ # 工作站(组合设备)
|
||
└── ...
|
||
```
|
||
|
||
## 设备类完整模板
|
||
|
||
```python
|
||
import asyncio
|
||
import logging
|
||
import time as time_module
|
||
from typing import Dict, Any, Optional
|
||
|
||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
|
||
|
||
|
||
class MyDevice:
|
||
"""
|
||
设备类描述
|
||
|
||
Attributes:
|
||
device_id: 设备唯一标识
|
||
config: 设备配置字典
|
||
data: 设备状态数据
|
||
"""
|
||
|
||
_ros_node: BaseROS2DeviceNode
|
||
|
||
def __init__(
|
||
self,
|
||
device_id: str = None,
|
||
config: Dict[str, Any] = None,
|
||
**kwargs
|
||
):
|
||
"""
|
||
初始化设备
|
||
|
||
Args:
|
||
device_id: 设备ID
|
||
config: 配置字典
|
||
**kwargs: 其他参数
|
||
"""
|
||
# 兼容不同调用方式
|
||
if device_id is None and 'id' in kwargs:
|
||
device_id = kwargs.pop('id')
|
||
if config is None and 'config' in kwargs:
|
||
config = kwargs.pop('config')
|
||
|
||
self.device_id = device_id or "unknown_device"
|
||
self.config = config or {}
|
||
self.data = {}
|
||
|
||
# 从config读取参数
|
||
self.port = self.config.get('port') or kwargs.get('port', 'COM1')
|
||
self._max_value = self.config.get('max_value', 1000.0)
|
||
|
||
# 初始化日志
|
||
self.logger = logging.getLogger(f"MyDevice.{self.device_id}")
|
||
|
||
self.logger.info(f"设备 {self.device_id} 已创建")
|
||
|
||
def post_init(self, ros_node: BaseROS2DeviceNode):
|
||
"""
|
||
ROS节点注入 - 在ROS节点创建后调用
|
||
|
||
Args:
|
||
ros_node: ROS2设备节点实例
|
||
"""
|
||
self._ros_node = ros_node
|
||
|
||
async def initialize(self) -> bool:
|
||
"""
|
||
初始化设备 - 连接硬件、设置初始状态
|
||
|
||
Returns:
|
||
bool: 初始化是否成功
|
||
"""
|
||
self.logger.info(f"初始化设备 {self.device_id}")
|
||
|
||
try:
|
||
# 执行硬件初始化
|
||
# await self._connect_hardware()
|
||
|
||
# 设置初始状态
|
||
self.data.update({
|
||
"status": "待机",
|
||
"is_running": False,
|
||
"current_value": 0.0,
|
||
})
|
||
|
||
self.logger.info(f"设备 {self.device_id} 初始化完成")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"初始化失败: {e}")
|
||
self.data["status"] = f"错误: {e}"
|
||
return False
|
||
|
||
async def cleanup(self) -> bool:
|
||
"""
|
||
清理设备 - 断开连接、释放资源
|
||
|
||
Returns:
|
||
bool: 清理是否成功
|
||
"""
|
||
self.logger.info(f"清理设备 {self.device_id}")
|
||
|
||
self.data.update({
|
||
"status": "离线",
|
||
"is_running": False,
|
||
})
|
||
|
||
return True
|
||
|
||
# ==================== 设备动作 ====================
|
||
|
||
async def execute_action(
|
||
self,
|
||
param1: float,
|
||
param2: str = "",
|
||
**kwargs
|
||
) -> bool:
|
||
"""
|
||
执行设备动作
|
||
|
||
Args:
|
||
param1: 参数1
|
||
param2: 参数2(可选)
|
||
|
||
Returns:
|
||
bool: 动作是否成功
|
||
"""
|
||
# 类型转换和验证
|
||
try:
|
||
param1 = float(param1)
|
||
except (ValueError, TypeError) as e:
|
||
self.logger.error(f"参数类型错误: {e}")
|
||
return False
|
||
|
||
# 参数验证
|
||
if param1 > self._max_value:
|
||
self.logger.error(f"参数超出范围: {param1} > {self._max_value}")
|
||
return False
|
||
|
||
self.logger.info(f"执行动作: param1={param1}, param2={param2}")
|
||
|
||
# 更新状态
|
||
self.data.update({
|
||
"status": "运行中",
|
||
"is_running": True,
|
||
})
|
||
|
||
# 执行动作(带进度反馈)
|
||
duration = 10.0 # 秒
|
||
start_time = time_module.time()
|
||
|
||
while True:
|
||
elapsed = time_module.time() - start_time
|
||
remaining = max(0, duration - elapsed)
|
||
progress = min(100, (elapsed / duration) * 100)
|
||
|
||
self.data.update({
|
||
"status": f"运行中: {progress:.0f}%",
|
||
"remaining_time": remaining,
|
||
})
|
||
|
||
if remaining <= 0:
|
||
break
|
||
|
||
await self._ros_node.sleep(1.0)
|
||
|
||
# 完成
|
||
self.data.update({
|
||
"status": "完成",
|
||
"is_running": False,
|
||
})
|
||
|
||
self.logger.info("动作执行完成")
|
||
return True
|
||
|
||
# ==================== 状态属性 ====================
|
||
|
||
@property
|
||
def status(self) -> str:
|
||
"""设备状态 - 自动发布为ROS Topic"""
|
||
return self.data.get("status", "未知")
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
"""是否正在运行"""
|
||
return self.data.get("is_running", False)
|
||
|
||
@property
|
||
def current_value(self) -> float:
|
||
"""当前值"""
|
||
return self.data.get("current_value", 0.0)
|
||
|
||
# ==================== 辅助方法 ====================
|
||
|
||
def get_device_info(self) -> Dict[str, Any]:
|
||
"""获取设备信息"""
|
||
return {
|
||
"device_id": self.device_id,
|
||
"status": self.status,
|
||
"is_running": self.is_running,
|
||
"current_value": self.current_value,
|
||
}
|
||
|
||
def __str__(self) -> str:
|
||
return f"MyDevice({self.device_id}: {self.status})"
|
||
```
|
||
|
||
## 关键规则
|
||
|
||
### 1. 参数处理
|
||
|
||
所有动作方法的参数都可能以字符串形式传入,必须进行类型转换:
|
||
|
||
```python
|
||
async def my_action(self, value: float, **kwargs) -> bool:
|
||
# 始终进行类型转换
|
||
try:
|
||
value = float(value)
|
||
except (ValueError, TypeError) as e:
|
||
self.logger.error(f"参数类型错误: {e}")
|
||
return False
|
||
```
|
||
|
||
### 2. vessel 参数处理
|
||
|
||
vessel 参数可能是字符串ID或字典:
|
||
|
||
```python
|
||
def extract_vessel_id(vessel: Union[str, dict]) -> str:
|
||
if isinstance(vessel, dict):
|
||
return vessel.get("id", "")
|
||
return str(vessel) if vessel else ""
|
||
```
|
||
|
||
### 3. 状态更新
|
||
|
||
使用 `self.data` 字典存储状态,属性读取状态:
|
||
|
||
```python
|
||
# 更新状态
|
||
self.data["status"] = "运行中"
|
||
self.data["current_speed"] = 300.0
|
||
|
||
# 读取状态(通过属性)
|
||
@property
|
||
def status(self) -> str:
|
||
return self.data.get("status", "待机")
|
||
```
|
||
|
||
### 4. 异步等待
|
||
|
||
使用 ROS 节点的 sleep 方法:
|
||
|
||
```python
|
||
# 正确
|
||
await self._ros_node.sleep(1.0)
|
||
|
||
# 避免(除非在纯 Python 测试环境)
|
||
await asyncio.sleep(1.0)
|
||
```
|
||
|
||
### 5. 进度反馈
|
||
|
||
长时间运行的操作需要提供进度反馈:
|
||
|
||
```python
|
||
while remaining > 0:
|
||
progress = (elapsed / total_time) * 100
|
||
self.data["status"] = f"运行中: {progress:.0f}%"
|
||
self.data["remaining_time"] = remaining
|
||
|
||
await self._ros_node.sleep(1.0)
|
||
```
|
||
|
||
## 虚拟设备
|
||
|
||
虚拟设备用于测试和演示,放在 `unilabos/devices/virtual/` 目录:
|
||
|
||
- 类名以 `Virtual` 开头
|
||
- 文件名以 `virtual_` 开头
|
||
- 模拟真实设备的行为和时序
|
||
- 使用表情符号增强日志可读性(可选)
|
||
|
||
## 工作站设备
|
||
|
||
工作站是组合多个设备的复杂设备:
|
||
|
||
```python
|
||
from unilabos.devices.workstation.workstation_base import WorkstationBase
|
||
|
||
class MyWorkstation(WorkstationBase):
|
||
"""组合工作站"""
|
||
|
||
async def execute_workflow(self, workflow: Dict[str, Any]) -> bool:
|
||
"""执行工作流"""
|
||
pass
|
||
```
|
||
|
||
## 设备注册
|
||
|
||
设备类开发完成后,需要在注册表中注册:
|
||
|
||
1. 创建/编辑 `unilabos/registry/devices/my_category.yaml`
|
||
2. 添加设备配置(参考 `virtual_device.yaml`)
|
||
3. 运行 `--complete_registry` 自动生成 schema
|