Update docs

This commit is contained in:
Xuwznln
2025-11-18 17:17:43 +08:00
parent 653e6e1ac3
commit 6a681e1d73
121 changed files with 7700 additions and 985 deletions

View File

@@ -1,10 +1,18 @@
# 添加设备
# 添加设备:编写驱动
在 Uni-Lab 中设备Device是实验操作的基础单元。Uni-Lab 使用**注册表机制**来兼容管理种类繁多的设备驱动程序。回顾 {ref}`instructions` 中的概念,抽象的设备对外拥有【话题】【服务】【动作】三种通信机制,因此将设备添加进 Uni-Lab实际上是将设备驱动中的三种机制映射到 Uni-Lab 标准指令集上。
在 Uni-Lab 中设备Device是实验操作的基础单元。Uni-Lab 使用**注册表机制**来兼容管理种类繁多的设备驱动程序。抽象的设备对外拥有【话题】【服务】【动作】三种通信机制,因此将设备添加进 Uni-Lab实际上是将设备驱动中的三种机制映射到 Uni-Lab 标准指令集上。
能被 Uni-Lab 添加的驱动程序类型有以下种类:
> **💡 提示:** 本文档介绍如何使用已有的设备驱动SDK。若设备没有现成的驱动程序需要自己开发驱动请参考 {doc}`add_old_device`。
1. Python Class
## 支持的驱动类型
Uni-Lab 支持以下两种驱动程序:
### 1. Python Class推荐
Python 类设备驱动在完成注册表后可以直接在 Uni-Lab 中使用,无需额外编译。
**示例:**
```python
class MockGripper:
@@ -31,12 +39,11 @@ class MockGripper:
def status(self) -> str:
return self._status
# 会被自动识别的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
@status.setter
def status(self, target):
self._status = target
# 需要在注册表添加的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
# 会被自动识别的设备动作,接入 Uni-Lab 时会作为 ActionServer 接受任意控制者的指令
def push_to(self, position: float, torque: float, velocity: float = 0.0):
self._status = "Running"
current_pos = self.position
@@ -53,9 +60,11 @@ class MockGripper:
self._status = "Idle"
```
Python 类设备驱动在完成注册表后可以直接在 Uni-Lab 使用。
### 2. C# Class
2. C# Class
C# 驱动设备在完成注册表后,需要调用 Uni-Lab C# 编译后才能使用(仅需一次)。
**示例:**
```csharp
using System;
@@ -84,7 +93,7 @@ public class MockGripper
position = currentPos + (Position - currentPos) / 20 * (i + 1);
torque = Torque / (20 - i);
velocity = Velocity;
await Task.Delay((int)(moveTime * 1000 / 20)); // Convert seconds to milliseconds
await Task.Delay((int)(moveTime * 1000 / 20));
}
torque = Torque;
status = "Idle";
@@ -92,12 +101,16 @@ public class MockGripper
}
```
C# 驱动设备在完成注册表后,需要调用 Uni-Lab C# 编译后才能使用,但只需一次。
---
## 快速开始:使用注册表编辑器(推荐)
## 快速开始:两种方式添加设备
### 方式 1使用注册表编辑器推荐
推荐使用 Uni-Lab-OS 自带的可视化编辑器,它能自动分析您的设备驱动并生成大部分配置:
**步骤:**
1. 启动 Uni-Lab-OS
2. 在浏览器中打开"注册表编辑器"页面
3. 选择您的 Python 设备驱动文件
@@ -106,13 +119,18 @@ C# 驱动设备在完成注册表后,需要调用 Uni-Lab C# 编译后才能
6. 点击"生成注册表",复制生成的内容
7. 保存到 `devices/` 目录下
---
**优点:**
## 手动编写注册表(简化版)
- 自动识别设备属性和方法
- 可视化界面,易于操作
- 自动生成完整配置
- 减少手动配置错误
### 方式 2手动编写注册表简化版
如果需要手动编写,只需要提供两个必需字段,系统会自动补全其余内容:
### 最小配置示例
**最小配置示例**
```yaml
my_device: # 设备唯一标识符
@@ -121,22 +139,22 @@ my_device: # 设备唯一标识符
type: python # 驱动类型
```
### 注册表文件位置
**注册表文件位置**
- 默认路径:`unilabos/registry/devices`
- 自定义路径:启动时使用 `--registry` 参数指定
- 可将多个设备写在同一个 yaml 文件中
- 自定义路径:启动时使用 `--registry_path` 参数指定
- 可将多个设备写在同一个 YAML 文件中
### 系统自动生成的内容
**系统自动生成的内容**
系统会自动分析您的 Python 驱动类并生成:
- `status_types`:从 `get_*` 方法自动识别状态属性
- `status_types`:从 `@property` 装饰的方法自动识别状态属性
- `action_value_mappings`:从类方法自动生成动作映射
- `init_param_schema`:从 `__init__` 方法分析初始化参数
- `schema`:前端显示用的属性类型定义
### 完整结构概览
**完整结构概览**
```yaml
my_device:
@@ -151,4 +169,848 @@ my_device:
schema: {} # 自动生成
```
详细的注册表编写指南和高级配置,请参考{doc}`yaml 注册表编写指南 <add_yaml>`
> 💡 **提示:** 详细的注册表编写指南和高级配置,请参考 {doc}`03_add_device_registry`
---
## Python 类结构要求
Uni-Lab 设备驱动是一个 Python 类,需要遵循以下结构:
```python
from typing import Dict, Any
class MyDevice:
"""设备类文档字符串
说明设备的功能、连接方式等
"""
def __init__(self, config: Dict[str, Any]):
"""初始化设备
Args:
config: 配置字典,来自图文件或注册表
"""
self.port = config.get('port', '/dev/ttyUSB0')
self.baudrate = config.get('baudrate', 9600)
self._status = "idle"
# 初始化硬件连接
@property
def status(self) -> str:
"""设备状态(会自动广播)"""
return self._status
def my_action(self, param: float) -> Dict[str, Any]:
"""执行动作
Args:
param: 参数说明
Returns:
{"success": True, "result": ...}
"""
# 执行设备操作
return {"success": True}
```
## 状态属性 vs 动作方法
### 状态属性(@property
状态属性会被自动识别并定期广播:
```python
@property
def temperature(self) -> float:
"""当前温度"""
return self._read_temperature()
@property
def status(self) -> str:
"""设备状态: idle, running, error"""
return self._status
@property
def is_ready(self) -> bool:
"""设备是否就绪"""
return self._status == "idle"
```
**特点**:
- 使用`@property`装饰器
- 只读,不能有参数
- 自动添加到注册表的`status_types`
- 定期发布到 ROS2 topic
### 动作方法
动作方法是设备可以执行的操作:
```python
def start_heating(self, target_temp: float, rate: float = 1.0) -> Dict[str, Any]:
"""开始加热
Args:
target_temp: 目标温度(°C)
rate: 升温速率(°C/min)
Returns:
{"success": bool, "message": str}
"""
self._status = "heating"
self._target_temp = target_temp
# 发送命令到硬件
return {"success": True, "message": f"Heating to {target_temp}°C"}
async def async_operation(self, duration: float) -> Dict[str, Any]:
"""异步操作(长时间运行)
Args:
duration: 持续时间(秒)
"""
# 使用 self.sleep 而不是 asyncio.sleepROS2 异步机制)
await self.sleep(duration)
return {"success": True}
```
**特点**:
- 普通方法或 async 方法
- 返回 Dict 类型的结果
- 自动注册为 ROS2 Action
- 支持参数和返回值
### 返回值设计指南
> **⚠️ 重要:返回值会自动显示在前端**
>
> 动作方法的返回值(字典)会自动显示在 Web 界面的工作流执行结果中。因此,**强烈建议**设计结构化、可读的返回值字典。
**推荐的返回值结构:**
```python
def my_action(self, param: float) -> Dict[str, Any]:
"""执行操作"""
try:
# 执行操作...
result = self._do_something(param)
return {
"success": True, # 必需:操作是否成功
"message": "操作完成", # 推荐:用户友好的消息
"result": result, # 可选:具体结果数据
"param_used": param, # 可选:记录使用的参数
# 其他有用的信息...
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": "操作失败"
}
```
**最佳实践示例(参考 `host_node.test_latency`**
```python
def test_latency(self) -> Dict[str, Any]:
"""测试网络延迟
返回值会在前端显示,包含详细的测试结果
"""
# 执行测试...
avg_rtt_ms = 25.5
avg_time_diff_ms = 10.2
test_count = 5
# 返回结构化的测试结果
return {
"status": "success", # 状态标识
"avg_rtt_ms": avg_rtt_ms, # 平均往返时间
"avg_time_diff_ms": avg_time_diff_ms, # 平均时间差
"max_time_error_ms": 5.3, # 最大误差
"task_delay_ms": 15.7, # 任务延迟
"test_count": test_count, # 测试次数
}
```
**前端显示效果:**
当用户在 Web 界面执行工作流时,返回的字典会以 JSON 格式显示在结果面板中:
```json
{
"status": "success",
"avg_rtt_ms": 25.5,
"avg_time_diff_ms": 10.2,
"max_time_error_ms": 5.3,
"task_delay_ms": 15.7,
"test_count": 5
}
```
**返回值设计建议:**
1. **始终包含 `success` 字段**:布尔值,表示操作是否成功
2. **包含 `message` 字段**:字符串,提供用户友好的描述
3. **使用有意义的键名**:使用描述性的键名(如 `avg_rtt_ms` 而不是 `v1`
4. **包含单位**:在键名中包含单位(如 `_ms``_ml``_celsius`
5. **记录重要参数**:返回使用的关键参数值,便于追溯
6. **错误信息详细**:失败时包含 `error` 字段和详细的错误描述
7. **避免返回大数据**:不要返回大型数组或二进制数据,这会影响前端性能
**错误处理示例:**
```python
def risky_operation(self, param: float) -> Dict[str, Any]:
"""可能失败的操作"""
if param < 0:
return {
"success": False,
"error": "参数不能为负数",
"message": f"无效参数: {param}",
"param": param
}
try:
result = self._execute(param)
return {
"success": True,
"message": "操作成功",
"result": result,
"param": param
}
except IOError as e:
return {
"success": False,
"error": "通信错误",
"message": str(e),
"device_status": self._status
}
```
## 特殊参数类型ResourceSlot 和 DeviceSlot
Uni-Lab 提供特殊的参数类型,用于在方法中声明需要选择资源或设备。
### 导入类型
```python
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from typing import List
```
### ResourceSlot - 资源选择
用于需要选择物料资源的场景:
```python
def pipette_liquid(
self,
source: ResourceSlot, # 单个源容器
target: ResourceSlot, # 单个目标容器
volume: float
) -> Dict[str, Any]:
"""从源容器吸取液体到目标容器
Args:
source: 源容器(前端会显示资源选择下拉框)
target: 目标容器(前端会显示资源选择下拉框)
volume: 体积(μL)
"""
print(f"Pipetting {volume}μL from {source.id} to {target.id}")
return {"success": True}
```
**多选示例**:
```python
def mix_multiple(
self,
containers: List[ResourceSlot], # 多个容器选择
speed: float
) -> Dict[str, Any]:
"""混合多个容器
Args:
containers: 容器列表(前端会显示多选下拉框)
speed: 混合速度
"""
for container in containers:
print(f"Mixing {container.name}")
return {"success": True}
```
### DeviceSlot - 设备选择
用于需要选择其他设备的场景:
```python
def coordinate_with_device(
self,
other_device: DeviceSlot, # 单个设备选择
command: str
) -> Dict[str, Any]:
"""与另一个设备协同工作
Args:
other_device: 协同设备(前端会显示设备选择下拉框)
command: 命令
"""
print(f"Coordinating with {other_device.name}")
return {"success": True}
```
**多设备示例**:
```python
def sync_devices(
self,
devices: List[DeviceSlot], # 多个设备选择
sync_signal: str
) -> Dict[str, Any]:
"""同步多个设备
Args:
devices: 设备列表(前端会显示多选下拉框)
sync_signal: 同步信号
"""
for dev in devices:
print(f"Syncing {dev.name}")
return {"success": True}
```
### 完整示例:液体处理工作站
```python
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
from typing import List, Dict, Any
class LiquidHandler:
"""液体处理工作站"""
def __init__(self, config: Dict[str, Any]):
self.simulation = config.get('simulation', False)
self._status = "idle"
@property
def status(self) -> str:
return self._status
def transfer_liquid(
self,
source: ResourceSlot, # 源容器选择
target: ResourceSlot, # 目标容器选择
volume: float,
tip: ResourceSlot = None # 可选的枪头选择
) -> Dict[str, Any]:
"""转移液体
前端效果:
- source: 下拉框,列出所有可用容器
- target: 下拉框,列出所有可用容器
- volume: 数字输入框
- tip: 下拉框(可选),列出所有枪头
"""
self._status = "transferring"
# source和target会被解析为实际的资源对象
print(f"Transferring {volume}μL")
print(f" From: {source.id} ({source.name})")
print(f" To: {target.id} ({target.name})")
if tip:
print(f" Using tip: {tip.id}")
# 执行实际的液体转移
# ...
self._status = "idle"
return {
"success": True,
"volume_transferred": volume,
"source_id": source.id,
"target_id": target.id
}
def multi_dispense(
self,
source: ResourceSlot, # 单个源
targets: List[ResourceSlot], # 多个目标
volumes: List[float]
) -> Dict[str, Any]:
"""从一个源分配到多个目标
前端效果:
- source: 单选下拉框
- targets: 多选下拉框(可选择多个容器)
- volumes: 数组输入(每个目标对应一个体积)
"""
results = []
for target, vol in zip(targets, volumes):
print(f"Dispensing {vol}μL to {target.name}")
results.append({
"target": target.id,
"volume": vol
})
return {
"success": True,
"dispense_results": results
}
def test_with_balance(
self,
target: ResourceSlot, # 容器
balance: DeviceSlot # 天平设备
) -> Dict[str, Any]:
"""使用天平测量容器
前端效果:
- target: 容器选择下拉框
- balance: 设备选择下拉框(仅显示天平类型)
"""
print(f"Weighing {target.name} on {balance.name}")
# 可以调用balance的方法
# weight = balance.get_weight()
return {
"success": True,
"container": target.id,
"balance_used": balance.id
}
```
### 工作原理
#### 1. 类型识别
注册表扫描方法签名时:
```python
def my_method(self, resource: ResourceSlot, device: DeviceSlot):
pass
```
系统识别到`ResourceSlot``DeviceSlot`类型。
#### 2. 自动添加 placeholder_keys
在注册表中自动生成:
```yaml
my_device:
class:
action_value_mappings:
my_method:
goal:
resource: resource
device: device
placeholder_keys:
resource: unilabos_resources # 自动添加!
device: unilabos_devices # 自动添加!
```
#### 3. 前端 UI 生成
- `unilabos_resources`: 渲染为资源选择下拉框
- `unilabos_devices`: 渲染为设备选择下拉框
#### 4. 运行时解析
用户选择资源/设备后,实际调用时会传入完整的资源/设备对象:
```python
# 用户在前端选择了 plate_1
# 运行时source参数会收到完整的Resource对象
source.id # "plate_1"
source.name # "96孔板"
source.type # "resource"
source.class_ # "corning_96_wellplate_360ul_flat"
```
## 支持的通信方式
### 1. 串口Serial
```python
import serial
class SerialDevice:
def __init__(self, config: Dict[str, Any]):
self.port = config['port']
self.baudrate = config.get('baudrate', 9600)
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
def send_command(self, cmd: str) -> str:
"""发送命令并读取响应"""
self.ser.write(f"{cmd}\r\n".encode())
response = self.ser.readline().decode().strip()
return response
def __del__(self):
if hasattr(self, 'ser') and self.ser.is_open:
self.ser.close()
```
### 2. TCP/IP Socket
```python
import socket
class TCPDevice:
def __init__(self, config: Dict[str, Any]):
self.host = config['host']
self.port = config['port']
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
def send_command(self, cmd: str) -> str:
self.sock.sendall(cmd.encode())
response = self.sock.recv(1024).decode()
return response
```
### 3. Modbus
```python
from pymodbus.client import ModbusTcpClient
class ModbusDevice:
def __init__(self, config: Dict[str, Any]):
self.host = config['host']
self.port = config.get('port', 502)
self.client = ModbusTcpClient(self.host, port=self.port)
self.client.connect()
def read_register(self, address: int) -> int:
result = self.client.read_holding_registers(address, 1)
return result.registers[0]
def write_register(self, address: int, value: int):
self.client.write_register(address, value)
```
### 4. OPC UA
```python
from opcua import Client
class OPCUADevice:
def __init__(self, config: Dict[str, Any]):
self.url = config['url']
self.client = Client(self.url)
self.client.connect()
def read_node(self, node_id: str):
node = self.client.get_node(node_id)
return node.get_value()
def write_node(self, node_id: str, value):
node = self.client.get_node(node_id)
node.set_value(value)
```
### 5. HTTP/RPC
```python
import requests
class HTTPDevice:
def __init__(self, config: Dict[str, Any]):
self.base_url = config['url']
self.auth_token = config.get('token')
def send_command(self, endpoint: str, data: Dict) -> Dict:
url = f"{self.base_url}/{endpoint}"
headers = {'Authorization': f'Bearer {self.auth_token}'}
response = requests.post(url, json=data, headers=headers)
return response.json()
```
## 异步 vs 同步方法
### 同步方法(适合快速操作)
```python
def quick_operation(self, param: float) -> Dict[str, Any]:
"""快速操作,立即返回"""
result = self._do_something(param)
return {"success": True, "result": result}
```
### 异步方法(适合耗时操作)
```python
async def long_operation(self, duration: float) -> Dict[str, Any]:
"""长时间运行的操作"""
self._status = "running"
# 使用 ROS2 提供的 sleep 方法(而不是 asyncio.sleep
await self.sleep(duration)
# 可以在过程中发送feedback
# 需要配合ROS2 Action的feedback机制
self._status = "idle"
return {"success": True, "duration": duration}
```
> **⚠️ 重要提示ROS2 异步机制 vs Python asyncio**
>
> Uni-Lab 的设备驱动虽然使用 `async def` 语法,但**底层是 ROS2 的异步机制,而不是 Python 的 asyncio**。
>
> **不能使用的 asyncio 功能:**
>
> - ❌ `asyncio.sleep()` - 会导致 ROS2 事件循环阻塞
> - ❌ `asyncio.create_task()` - 任务不会被 ROS2 正确调度
> - ❌ `asyncio.gather()` - 无法与 ROS2 集成
> - ❌ 其他 asyncio 标准库函数
>
> **应该使用的方法(继承自 BaseROS2DeviceNode**
>
> - ✅ `await self.sleep(seconds)` - ROS2 兼容的睡眠
> - ✅ `await self.create_task(func, **kwargs)` - ROS2 兼容的任务创建
> - ✅ ROS2 的 Action/Service 回调机制
>
> **示例:**
>
> ```python
> async def complex_operation(self, duration: float) -> Dict[str, Any]:
> """正确使用 ROS2 异步方法"""
> self._status = "processing"
>
> # ✅ 正确:使用 self.sleep
> await self.sleep(duration)
>
> # ✅ 正确:创建并发任务
> task = await self.create_task(self._background_work)
>
> # ❌ 错误:不要使用 asyncio
> # await asyncio.sleep(duration) # 这会导致问题!
> # task = asyncio.create_task(...) # 这也不行!
>
> self._status = "idle"
> return {"success": True}
>
> async def _background_work(self):
> """后台任务"""
> await self.sleep(1.0)
> self.lab_logger().info("Background work completed")
> ```
>
> **为什么不能混用?**
>
> ROS2 使用 `rclpy` 的事件循环来管理所有异步操作。如果使用 `asyncio` 的函数,这些操作会在不同的事件循环中运行,导致:
>
> - ROS2 回调无法正确执行
> - 任务可能永远不会完成
> - 程序可能死锁或崩溃
>
> **参考实现:**
>
> `BaseROS2DeviceNode` 提供的方法定义(`base_device_node.py:563-572`
>
> ```python
> async def sleep(self, rel_time: float, callback_group=None):
> """ROS2 兼容的异步睡眠"""
> if callback_group is None:
> callback_group = self.callback_group
> await ROS2DeviceNode.async_wait_for(self, rel_time, callback_group)
>
> @classmethod
> async def create_task(cls, func, trace_error=True, **kwargs) -> Task:
> """ROS2 兼容的任务创建"""
> return ROS2DeviceNode.run_async_func(func, trace_error, **kwargs)
> ```
## 错误处理
### 基本错误处理
```python
def operation_with_error_handling(self, param: float) -> Dict[str, Any]:
"""带错误处理的操作"""
try:
result = self._risky_operation(param)
return {
"success": True,
"result": result
}
except ValueError as e:
return {
"success": False,
"error": "Invalid parameter",
"message": str(e)
}
except IOError as e:
self._status = "error"
return {
"success": False,
"error": "Communication error",
"message": str(e)
}
```
### 自定义异常
```python
class DeviceError(Exception):
"""设备错误基类"""
pass
class DeviceNotReadyError(DeviceError):
"""设备未就绪"""
pass
class DeviceTimeoutError(DeviceError):
"""设备超时"""
pass
class MyDevice:
def operation(self) -> Dict[str, Any]:
if self._status != "idle":
raise DeviceNotReadyError(f"Device is {self._status}")
# 执行操作
return {"success": True}
```
## 最佳实践
### 1. 类型注解
```python
from typing import Dict, Any, Optional, List
def method(
self,
param1: float,
param2: str,
optional_param: Optional[int] = None
) -> Dict[str, Any]:
"""完整的类型注解有助于自动生成注册表"""
pass
```
### 2. 文档字符串
```python
def method(self, param: float) -> Dict[str, Any]:
"""方法简短描述
更详细的说明...
Args:
param: 参数说明,包括单位和范围
Returns:
Dict包含:
- success (bool): 是否成功
- result (Any): 结果数据
Raises:
DeviceError: 错误情况说明
"""
pass
```
### 3. 配置验证
```python
def __init__(self, config: Dict[str, Any]):
# 验证必需参数
required = ['port', 'baudrate']
for key in required:
if key not in config:
raise ValueError(f"Missing required config: {key}")
self.port = config['port']
self.baudrate = config['baudrate']
```
### 4. 资源清理
```python
def __del__(self):
"""析构函数,清理资源"""
if hasattr(self, 'connection') and self.connection:
self.connection.close()
```
### 5. 设计前端友好的返回值
**记住:返回值会直接显示在 Web 界面**
```python
import time
def measure_temperature(self) -> Dict[str, Any]:
"""测量温度
✅ 好的返回值设计:
- 包含 success 状态
- 使用描述性键名
- 在键名中包含单位
- 记录测量时间
"""
temp = self._read_temperature()
return {
"success": True,
"temperature_celsius": temp, # 键名包含单位
"timestamp": time.time(), # 记录时间
"sensor_status": "normal", # 额外状态信息
"message": f"温度测量完成: {temp}°C" # 用户友好的消息
}
def bad_example(self) -> Dict[str, Any]:
"""❌ 不好的返回值设计"""
return {
"s": True, # ❌ 键名不明确
"v": 25.5, # ❌ 没有说明单位
"t": 1234567890, # ❌ 不清楚是什么时间戳
}
```
**参考 `host_node.test_latency` 方法**(第 1216-1340 行),它返回详细的测试结果,在前端清晰显示:
```python
return {
"status": "success",
"avg_rtt_ms": 25.5, # 有意义的键名 + 单位
"avg_time_diff_ms": 10.2,
"max_time_error_ms": 5.3,
"task_delay_ms": 15.7,
"test_count": 5, # 记录重要信息
}
```
## 下一步
看完本文档后,建议继续阅读:
- {doc}`add_action` - 了解如何添加新的动作指令
- {doc}`add_yaml` - 学习如何编写和完善 YAML 注册表
进阶主题:
- {doc}`03_add_device_registry` - 了解如何配置注册表
- {doc}`04_add_device_testing` - 学习如何测试设备
- {doc}`add_old_device` - 没有 SDK 时如何开发设备驱动
## 参考
- [Python 类型注解](https://docs.python.org/3/library/typing.html)
- [ROS2 rclpy 异步编程](https://docs.ros.org/en/humble/Tutorials/Intermediate/Writing-an-Action-Server-Client/Py.html) - Uni-Lab 使用 ROS2 的异步机制
- [串口通信](https://pyserial.readthedocs.io/)
> **注意:** 虽然设备驱动使用 `async def` 语法,但请**不要参考** Python 标准的 [asyncio 文档](https://docs.python.org/3/library/asyncio.html)。Uni-Lab 使用的是 ROS2 的异步机制,两者不兼容。请使用 `self.sleep()` 和 `self.create_task()` 等 BaseROS2DeviceNode 提供的方法。