Files
Uni-Lab-OS/docs/developer_guide/add_device.md
Xuwznln 75f09034ff update docs, test examples
fix liquid_handler init bug
2025-11-18 18:42:27 +08:00

27 KiB
Raw Blame History

添加设备:编写驱动

在 Uni-Lab 中设备Device是实验操作的基础单元。Uni-Lab 使用注册表机制来兼容管理种类繁多的设备驱动程序。抽象的设备对外拥有【话题】【服务】【动作】三种通信机制,因此将设备添加进 Uni-Lab实际上是将设备驱动中的这三种机制映射到 Uni-Lab 标准指令集上。

💡 提示: 本文档介绍如何使用已有的设备驱动SDK。若设备没有现成的驱动程序需要自己开发驱动请参考 {doc}add_old_device

支持的驱动类型

Uni-Lab 支持以下两种驱动程序:

1. Python Class推荐

Python 类设备驱动在完成注册表后可以直接在 Uni-Lab 中使用,无需额外编译。

示例:

class MockGripper:
    def __init__(self):
        self._position: float = 0.0
        self._velocity: float = 2.0
        self._torque: float = 0.0
        self._status = "Idle"

    @property
    def position(self) -> float:
        return self._position

    @property
    def velocity(self) -> float:
        return self._velocity

    @property
    def torque(self) -> float:
        return self._torque

    # 会被自动识别的设备属性,接入 Uni-Lab 时会定时对外广播
    @property
    def status(self) -> str:
        return self._status

    @status.setter
    def status(self, target):
        self._status = target

    # 会被自动识别的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
    def push_to(self, position: float, torque: float, velocity: float = 0.0):
        self._status = "Running"
        current_pos = self.position
        if velocity == 0.0:
            velocity = self.velocity

        move_time = abs(position - current_pos) / velocity
        for i in range(20):
            self._position = current_pos + (position - current_pos) / 20 * (i+1)
            self._torque = torque / (20 - i)
            self._velocity = velocity
            time.sleep(move_time / 20)
        self._torque = torque
        self._status = "Idle"

2. C# Class

C# 驱动设备在完成注册表后,需要调用 Uni-Lab C# 编译后才能使用(仅需一次)。

示例:

using System;
using System.Threading.Tasks;

public class MockGripper
{
    // 会被自动识别的设备属性,接入 Uni-Lab 时会定时对外广播
    public double position { get; private set; } = 0.0;
    public double velocity { get; private set; } = 2.0;
    public double torque { get; private set; } = 0.0;
    public string status { get; private set; } = "Idle";

    // 需要在注册表添加的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
    public async Task PushToAsync(double Position, double Torque, double Velocity = 0.0)
    {
        status = "Running";
        double currentPos = Position;
        if (Velocity == 0.0)
        {
            velocity = Velocity;
        }
        double moveTime = Math.Abs(Position - currentPos) / velocity;
        for (int i = 0; i < 20; i++)
        {
            position = currentPos + (Position - currentPos) / 20 * (i + 1);
            torque = Torque / (20 - i);
            velocity = Velocity;
            await Task.Delay((int)(moveTime * 1000 / 20));
        }
        torque = Torque;
        status = "Idle";
    }
}

快速开始:两种方式添加设备

方式 1使用注册表编辑器推荐

推荐使用 Uni-Lab-OS 自带的可视化编辑器,它能自动分析您的设备驱动并生成大部分配置:

步骤:

  1. 启动 Uni-Lab-OS
  2. 在浏览器中打开"注册表编辑器"页面
  3. 选择您的 Python 设备驱动文件
  4. 点击"分析文件",让系统读取类信息
  5. 填写基本信息(设备描述、图标等)
  6. 点击"生成注册表",复制生成的内容
  7. 保存到 devices/ 目录下

优点:

  • 自动识别设备属性和方法
  • 可视化界面,易于操作
  • 自动生成完整配置
  • 减少手动配置错误

方式 2手动编写注册表简化版

如果需要手动编写,只需要提供两个必需字段,系统会自动补全其余内容:

最小配置示例:

my_device: # 设备唯一标识符
  class:
    module: unilabos.devices.your_module.my_device:MyDevice # Python 类路径
    type: python # 驱动类型

注册表文件位置:

  • 默认路径:unilabos/registry/devices
  • 自定义路径:启动时使用 --registry_path 参数指定
  • 可将多个设备写在同一个 YAML 文件中

系统自动生成的内容:

系统会自动分析您的 Python 驱动类并生成:

  • status_types:从 @property 装饰的方法自动识别状态属性
  • action_value_mappings:从类方法自动生成动作映射
  • init_param_schema:从 __init__ 方法分析初始化参数
  • schema:前端显示用的属性类型定义

完整结构概览:

my_device:
  class:
    module: unilabos.devices.your_module.my_device:MyDevice
    type: python
    status_types: {} # 自动生成
    action_value_mappings: {} # 自动生成
  description: '' # 可选:设备描述
  icon: '' # 可选:设备图标
  init_param_schema: {} # 自动生成
  schema: {} # 自动生成

💡 提示: 详细的注册表编写指南和高级配置,请参考 {doc}03_add_device_registry


Python 类结构要求

Uni-Lab 设备驱动是一个 Python 类,需要遵循以下结构:

from typing import Dict, Any

class MyDevice:
    """设备类文档字符串

    说明设备的功能、连接方式等
    """

    def __init__(self, config: Dict[str, Any]):
        """初始化设备

        Args:
            config: 配置字典,来自图文件或注册表
        """
        self.port = config.get('port', '/dev/ttyUSB0')
        self.baudrate = config.get('baudrate', 9600)
        self._status = "idle"
        # 初始化硬件连接

    @property
    def status(self) -> str:
        """设备状态(会自动广播)"""
        return self._status

    def my_action(self, param: float) -> Dict[str, Any]:
        """执行动作

        Args:
            param: 参数说明

        Returns:
            {"success": True, "result": ...}
        """
        # 执行设备操作
        return {"success": True}

状态属性 vs 动作方法

状态属性(@property

状态属性会被自动识别并定期广播:

@property
def temperature(self) -> float:
    """当前温度"""
    return self._read_temperature()

@property
def status(self) -> str:
    """设备状态: idle, running, error"""
    return self._status

@property
def is_ready(self) -> bool:
    """设备是否就绪"""
    return self._status == "idle"

特点:

  • 使用@property装饰器
  • 只读,不能有参数
  • 自动添加到注册表的status_types
  • 定期发布到 ROS2 topic

动作方法

动作方法是设备可以执行的操作:

def start_heating(self, target_temp: float, rate: float = 1.0) -> Dict[str, Any]:
    """开始加热

    Args:
        target_temp: 目标温度(°C)
        rate: 升温速率(°C/min)

    Returns:
        {"success": bool, "message": str}
    """
    self._status = "heating"
    self._target_temp = target_temp
    # 发送命令到硬件
    return {"success": True, "message": f"Heating to {target_temp}°C"}

async def async_operation(self, duration: float) -> Dict[str, Any]:
    """异步操作(长时间运行)

    Args:
        duration: 持续时间(秒)
    """
    # 使用 self.sleep 而不是 asyncio.sleepROS2 异步机制)
    await self.sleep(duration)
    return {"success": True}

特点:

  • 普通方法或 async 方法
  • 返回 Dict 类型的结果
  • 自动注册为 ROS2 Action
  • 支持参数和返回值

返回值设计指南

⚠️ 重要:返回值会自动显示在前端

动作方法的返回值(字典)会自动显示在 Web 界面的工作流执行结果中。因此,强烈建议设计结构化、可读的返回值字典。

推荐的返回值结构:

def my_action(self, param: float) -> Dict[str, Any]:
    """执行操作"""
    try:
        # 执行操作...
        result = self._do_something(param)

        return {
            "success": True,              # 必需:操作是否成功
            "message": "操作完成",          # 推荐:用户友好的消息
            "result": result,             # 可选:具体结果数据
            "param_used": param,          # 可选:记录使用的参数
            # 其他有用的信息...
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "message": "操作失败"
        }

最佳实践示例(参考 host_node.test_latency

def test_latency(self) -> Dict[str, Any]:
    """测试网络延迟

    返回值会在前端显示,包含详细的测试结果
    """
    # 执行测试...
    avg_rtt_ms = 25.5
    avg_time_diff_ms = 10.2
    test_count = 5

    # 返回结构化的测试结果
    return {
        "status": "success",                    # 状态标识
        "avg_rtt_ms": avg_rtt_ms,              # 平均往返时间
        "avg_time_diff_ms": avg_time_diff_ms,  # 平均时间差
        "max_time_error_ms": 5.3,              # 最大误差
        "task_delay_ms": 15.7,                 # 任务延迟
        "test_count": test_count,              # 测试次数
    }

前端显示效果:

当用户在 Web 界面执行工作流时,返回的字典会以 JSON 格式显示在结果面板中:

{
  "status": "success",
  "avg_rtt_ms": 25.5,
  "avg_time_diff_ms": 10.2,
  "max_time_error_ms": 5.3,
  "task_delay_ms": 15.7,
  "test_count": 5
}

返回值设计建议:

  1. 始终包含 success 字段:布尔值,表示操作是否成功
  2. 包含 message 字段:字符串,提供用户友好的描述
  3. 使用有意义的键名:使用描述性的键名(如 avg_rtt_ms 而不是 v1
  4. 包含单位:在键名中包含单位(如 _ms_ml_celsius
  5. 记录重要参数:返回使用的关键参数值,便于追溯
  6. 错误信息详细:失败时包含 error 字段和详细的错误描述
  7. 避免返回大数据:不要返回大型数组或二进制数据,这会影响前端性能

错误处理示例:

def risky_operation(self, param: float) -> Dict[str, Any]:
    """可能失败的操作"""
    if param < 0:
        return {
            "success": False,
            "error": "参数不能为负数",
            "message": f"无效参数: {param}",
            "param": param
        }

    try:
        result = self._execute(param)
        return {
            "success": True,
            "message": "操作成功",
            "result": result,
            "param": param
        }
    except IOError as e:
        return {
            "success": False,
            "error": "通信错误",
            "message": str(e),
            "device_status": self._status
        }

特殊参数类型ResourceSlot 和 DeviceSlot

Uni-Lab 提供特殊的参数类型,用于在方法中声明需要选择资源或设备。

导入类型

from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from typing import List

ResourceSlot - 资源选择

用于需要选择物料资源的场景:

def pipette_liquid(
    self,
    source: ResourceSlot,              # 单个源容器
    target: ResourceSlot,              # 单个目标容器
    volume: float
) -> Dict[str, Any]:
    """从源容器吸取液体到目标容器

    Args:
        source: 源容器(前端会显示资源选择下拉框)
        target: 目标容器(前端会显示资源选择下拉框)
        volume: 体积(μL)
    """
    print(f"Pipetting {volume}μL from {source.id} to {target.id}")
    return {"success": True}

多选示例:

def mix_multiple(
    self,
    containers: List[ResourceSlot],    # 多个容器选择
    speed: float
) -> Dict[str, Any]:
    """混合多个容器

    Args:
        containers: 容器列表(前端会显示多选下拉框)
        speed: 混合速度
    """
    for container in containers:
        print(f"Mixing {container.name}")
    return {"success": True}

DeviceSlot - 设备选择

用于需要选择其他设备的场景:

def coordinate_with_device(
    self,
    other_device: DeviceSlot,          # 单个设备选择
    command: str
) -> Dict[str, Any]:
    """与另一个设备协同工作

    Args:
        other_device: 协同设备(前端会显示设备选择下拉框)
        command: 命令
    """
    print(f"Coordinating with {other_device.name}")
    return {"success": True}

多设备示例:

def sync_devices(
    self,
    devices: List[DeviceSlot],         # 多个设备选择
    sync_signal: str
) -> Dict[str, Any]:
    """同步多个设备

    Args:
        devices: 设备列表(前端会显示多选下拉框)
        sync_signal: 同步信号
    """
    for dev in devices:
        print(f"Syncing {dev.name}")
    return {"success": True}

完整示例:液体处理工作站

from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from typing import List, Dict, Any

class LiquidHandler:
    """液体处理工作站"""

    def __init__(self, config: Dict[str, Any]):
        self.simulation = config.get('simulation', False)
        self._status = "idle"

    @property
    def status(self) -> str:
        return self._status

    def transfer_liquid(
        self,
        source: ResourceSlot,               # 源容器选择
        target: ResourceSlot,               # 目标容器选择
        volume: float,
        tip: ResourceSlot = None            # 可选的枪头选择
    ) -> Dict[str, Any]:
        """转移液体

        前端效果:
        - source: 下拉框,列出所有可用容器
        - target: 下拉框,列出所有可用容器
        - volume: 数字输入框
        - tip: 下拉框(可选),列出所有枪头
        """
        self._status = "transferring"

        # source和target会被解析为实际的资源对象
        print(f"Transferring {volume}μL")
        print(f"  From: {source.id} ({source.name})")
        print(f"  To: {target.id} ({target.name})")

        if tip:
            print(f"  Using tip: {tip.id}")

        # 执行实际的液体转移
        # ...

        self._status = "idle"
        return {
            "success": True,
            "volume_transferred": volume,
            "source_id": source.id,
            "target_id": target.id
        }

    def multi_dispense(
        self,
        source: ResourceSlot,               # 单个源
        targets: List[ResourceSlot],        # 多个目标
        volumes: List[float]
    ) -> Dict[str, Any]:
        """从一个源分配到多个目标

        前端效果:
        - source: 单选下拉框
        - targets: 多选下拉框(可选择多个容器)
        - volumes: 数组输入(每个目标对应一个体积)
        """
        results = []
        for target, vol in zip(targets, volumes):
            print(f"Dispensing {vol}μL to {target.name}")
            results.append({
                "target": target.id,
                "volume": vol
            })

        return {
            "success": True,
            "dispense_results": results
        }

    def test_with_balance(
        self,
        target: ResourceSlot,               # 容器
        balance: DeviceSlot                 # 天平设备
    ) -> Dict[str, Any]:
        """使用天平测量容器

        前端效果:
        - target: 容器选择下拉框
        - balance: 设备选择下拉框(仅显示天平类型)
        """
        print(f"Weighing {target.name} on {balance.name}")

        # 可以调用balance的方法
        # weight = balance.get_weight()

        return {
            "success": True,
            "container": target.id,
            "balance_used": balance.id
        }

工作原理

1. 类型识别

注册表扫描方法签名时:

def my_method(self, resource: ResourceSlot, device: DeviceSlot):
    pass

系统识别到ResourceSlotDeviceSlot类型。

2. 自动添加 placeholder_keys

在注册表中自动生成:

my_device:
  class:
    action_value_mappings:
      my_method:
        goal:
          resource: resource
          device: device
        placeholder_keys:
          resource: unilabos_resources # 自动添加!
          device: unilabos_devices # 自动添加!

3. 前端 UI 生成

  • unilabos_resources: 渲染为资源选择下拉框
  • unilabos_devices: 渲染为设备选择下拉框

4. 运行时解析

用户选择资源/设备后,实际调用时会传入完整的资源/设备对象:

# 用户在前端选择了 plate_1
# 运行时source参数会收到完整的Resource对象
source.id        # "plate_1"
source.name      # "96孔板"
source.type      # "resource"
source.class_    # "corning_96_wellplate_360ul_flat"

支持的通信方式

1. 串口Serial

import serial

class SerialDevice:
    def __init__(self, config: Dict[str, Any]):
        self.port = config['port']
        self.baudrate = config.get('baudrate', 9600)
        self.ser = serial.Serial(
            port=self.port,
            baudrate=self.baudrate,
            timeout=1
        )

    def send_command(self, cmd: str) -> str:
        """发送命令并读取响应"""
        self.ser.write(f"{cmd}\r\n".encode())
        response = self.ser.readline().decode().strip()
        return response

    def __del__(self):
        if hasattr(self, 'ser') and self.ser.is_open:
            self.ser.close()

2. TCP/IP Socket

import socket

class TCPDevice:
    def __init__(self, config: Dict[str, Any]):
        self.host = config['host']
        self.port = config['port']
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((self.host, self.port))

    def send_command(self, cmd: str) -> str:
        self.sock.sendall(cmd.encode())
        response = self.sock.recv(1024).decode()
        return response

3. Modbus

from pymodbus.client import ModbusTcpClient

class ModbusDevice:
    def __init__(self, config: Dict[str, Any]):
        self.host = config['host']
        self.port = config.get('port', 502)
        self.client = ModbusTcpClient(self.host, port=self.port)
        self.client.connect()

    def read_register(self, address: int) -> int:
        result = self.client.read_holding_registers(address, 1)
        return result.registers[0]

    def write_register(self, address: int, value: int):
        self.client.write_register(address, value)

4. OPC UA

from opcua import Client

class OPCUADevice:
    def __init__(self, config: Dict[str, Any]):
        self.url = config['url']
        self.client = Client(self.url)
        self.client.connect()

    def read_node(self, node_id: str):
        node = self.client.get_node(node_id)
        return node.get_value()

    def write_node(self, node_id: str, value):
        node = self.client.get_node(node_id)
        node.set_value(value)

5. HTTP/RPC

import requests

class HTTPDevice:
    def __init__(self, config: Dict[str, Any]):
        self.base_url = config['url']
        self.auth_token = config.get('token')

    def send_command(self, endpoint: str, data: Dict) -> Dict:
        url = f"{self.base_url}/{endpoint}"
        headers = {'Authorization': f'Bearer {self.auth_token}'}
        response = requests.post(url, json=data, headers=headers)
        return response.json()

异步 vs 同步方法

同步方法(适合快速操作)

def quick_operation(self, param: float) -> Dict[str, Any]:
    """快速操作,立即返回"""
    result = self._do_something(param)
    return {"success": True, "result": result}

异步方法(适合耗时操作)

async def long_operation(self, duration: float) -> Dict[str, Any]:
    """长时间运行的操作"""
    self._status = "running"

    # 使用 ROS2 提供的 sleep 方法(而不是 asyncio.sleep
    await self.sleep(duration)

    # 可以在过程中发送feedback
    # 需要配合ROS2 Action的feedback机制

    self._status = "idle"
    return {"success": True, "duration": duration}

⚠️ 重要提示ROS2 异步机制 vs Python asyncio

Uni-Lab 的设备驱动虽然使用 async def 语法,但底层是 ROS2 的异步机制,而不是 Python 的 asyncio

不能使用的 asyncio 功能:

  • asyncio.sleep() - 会导致 ROS2 事件循环阻塞
  • asyncio.create_task() - 任务不会被 ROS2 正确调度
  • asyncio.gather() - 无法与 ROS2 集成
  • 其他 asyncio 标准库函数

应该使用的方法(继承自 BaseROS2DeviceNode

  • await self.sleep(seconds) - ROS2 兼容的睡眠
  • await self.create_task(func, **kwargs) - ROS2 兼容的任务创建
  • ROS2 的 Action/Service 回调机制

示例:

async def complex_operation(self, duration: float) -> Dict[str, Any]:
    """正确使用 ROS2 异步方法"""
    self._status = "processing"

    # ✅ 正确:使用 self.sleep
    await self.sleep(duration)

    # ✅ 正确:创建并发任务
    task = await self.create_task(self._background_work)

    # ❌ 错误:不要使用 asyncio
    # await asyncio.sleep(duration)  # 这会导致问题!
    # task = asyncio.create_task(...)  # 这也不行!

    self._status = "idle"
    return {"success": True}

async def _background_work(self):
    """后台任务"""
    await self.sleep(1.0)
    self.lab_logger().info("Background work completed")

为什么不能混用?

ROS2 使用 rclpy 的事件循环来管理所有异步操作。如果使用 asyncio 的函数,这些操作会在不同的事件循环中运行,导致:

  • ROS2 回调无法正确执行
  • 任务可能永远不会完成
  • 程序可能死锁或崩溃

参考实现:

BaseROS2DeviceNode 提供的方法定义(base_device_node.py:563-572

async def sleep(self, rel_time: float, callback_group=None):
    """ROS2 兼容的异步睡眠"""
    if callback_group is None:
        callback_group = self.callback_group
    await ROS2DeviceNode.async_wait_for(self, rel_time, callback_group)

@classmethod
async def create_task(cls, func, trace_error=True, **kwargs) -> Task:
    """ROS2 兼容的任务创建"""
    return ROS2DeviceNode.run_async_func(func, trace_error, **kwargs)

错误处理

基本错误处理

def operation_with_error_handling(self, param: float) -> Dict[str, Any]:
    """带错误处理的操作"""
    try:
        result = self._risky_operation(param)
        return {
            "success": True,
            "result": result
        }
    except ValueError as e:
        return {
            "success": False,
            "error": "Invalid parameter",
            "message": str(e)
        }
    except IOError as e:
        self._status = "error"
        return {
            "success": False,
            "error": "Communication error",
            "message": str(e)
        }

自定义异常

class DeviceError(Exception):
    """设备错误基类"""
    pass

class DeviceNotReadyError(DeviceError):
    """设备未就绪"""
    pass

class DeviceTimeoutError(DeviceError):
    """设备超时"""
    pass

class MyDevice:
    def operation(self) -> Dict[str, Any]:
        if self._status != "idle":
            raise DeviceNotReadyError(f"Device is {self._status}")

        # 执行操作
        return {"success": True}

最佳实践

1. 类型注解

from typing import Dict, Any, Optional, List

def method(
    self,
    param1: float,
    param2: str,
    optional_param: Optional[int] = None
) -> Dict[str, Any]:
    """完整的类型注解有助于自动生成注册表"""
    pass

2. 文档字符串

def method(self, param: float) -> Dict[str, Any]:
    """方法简短描述

    更详细的说明...

    Args:
        param: 参数说明,包括单位和范围

    Returns:
        Dict包含:
        - success (bool): 是否成功
        - result (Any): 结果数据

    Raises:
        DeviceError: 错误情况说明
    """
    pass

3. 配置验证

def __init__(self, config: Dict[str, Any]):
    # 验证必需参数
    required = ['port', 'baudrate']
    for key in required:
        if key not in config:
            raise ValueError(f"Missing required config: {key}")

    self.port = config['port']
    self.baudrate = config['baudrate']

4. 资源清理

def __del__(self):
    """析构函数,清理资源"""
    if hasattr(self, 'connection') and self.connection:
        self.connection.close()

5. 设计前端友好的返回值

记住:返回值会直接显示在 Web 界面

import time

def measure_temperature(self) -> Dict[str, Any]:
    """测量温度

    ✅ 好的返回值设计:
    - 包含 success 状态
    - 使用描述性键名
    - 在键名中包含单位
    - 记录测量时间
    """
    temp = self._read_temperature()

    return {
        "success": True,
        "temperature_celsius": temp,      # 键名包含单位
        "timestamp": time.time(),          # 记录时间
        "sensor_status": "normal",         # 额外状态信息
        "message": f"温度测量完成: {temp}°C"  # 用户友好的消息
    }

def bad_example(self) -> Dict[str, Any]:
    """❌ 不好的返回值设计"""
    return {
        "s": True,          # ❌ 键名不明确
        "v": 25.5,          # ❌ 没有说明单位
        "t": 1234567890,    # ❌ 不清楚是什么时间戳
    }

参考 host_node.test_latency 方法(第 1216-1340 行),它返回详细的测试结果,在前端清晰显示:

return {
    "status": "success",
    "avg_rtt_ms": 25.5,            # 有意义的键名 + 单位
    "avg_time_diff_ms": 10.2,
    "max_time_error_ms": 5.3,
    "task_delay_ms": 15.7,
    "test_count": 5,               # 记录重要信息
}

下一步

看完本文档后,建议继续阅读:

  • {doc}add_action - 了解如何添加新的动作指令
  • {doc}add_yaml - 学习如何编写和完善 YAML 注册表

进阶主题:

  • {doc}03_add_device_registry - 了解如何配置注册表
  • {doc}04_add_device_testing - 学习如何测试设备
  • {doc}add_old_device - 没有 SDK 时如何开发设备驱动

参考

注意: 虽然设备驱动使用 async def 语法,但请不要参考 Python 标准的 asyncio 文档。Uni-Lab 使用的是 ROS2 的异步机制,两者不兼容。请使用 self.sleep()self.create_task() 等 BaseROS2DeviceNode 提供的方法。