--- description: Uni-Lab-OS 实验室自动化平台开发规范 - 核心规则 globs: ["**/*.py", "**/*.yaml", "**/*.json"] --- # Uni-Lab-OS 项目开发规范 ## 项目概述 Uni-Lab-OS 是一个实验室自动化操作系统,用于连接和控制各种实验设备,实现实验工作流的自动化和标准化。 ## 技术栈 - **Python 3.11** - 核心开发语言 - **ROS 2** - 设备通信中间件 (rclpy) - **Conda/Mamba** - 包管理 (robostack-staging, conda-forge) - **FastAPI** - Web API 服务 - **WebSocket** - 实时通信 - **NetworkX** - 拓扑图管理 - **YAML** - 配置和注册表定义 - **PyLabRobot** - 实验室自动化库集成 - **pytest** - 测试框架 - **asyncio** - 异步编程 ## 项目结构 ``` unilabos/ ├── app/ # 应用入口、Web服务、后端 ├── compile/ # 协议编译器 (stir, add, filter 等) ├── config/ # 配置管理 ├── devices/ # 设备驱动 (真实/虚拟) ├── device_comms/ # 设备通信协议 ├── device_mesh/ # 3D网格和可视化 ├── registry/ # 设备和资源类型注册表 (YAML) ├── resources/ # 资源定义 ├── ros/ # ROS 2 集成 ├── utils/ # 工具函数 └── workflow/ # 工作流管理 ``` ## 代码规范 ### Python 风格 1. **类型注解**:所有函数必须使用类型注解 ```python def transfer_liquid( source: str, destination: str, volume: float, **kwargs ) -> List[Dict[str, Any]]: ``` 2. **Docstring**:使用 Google 风格的文档字符串 ```python def initialize(self) -> bool: """ 初始化设备 Returns: bool: 初始化是否成功 """ ``` 3. **导入顺序**: - 标准库 - 第三方库 - ROS 相关 (rclpy, unilabos_msgs) - 项目内部模块 ### 异步编程 1. 设备操作方法使用 `async def` 2. 使用 `await self._ros_node.sleep()` 而非 `asyncio.sleep()` 3. 长时间运行操作需提供进度反馈 ```python async def stir(self, stir_time: float, stir_speed: float, **kwargs) -> bool: """执行搅拌操作""" start_time = time_module.time() while True: elapsed = time_module.time() - start_time remaining = max(0, stir_time - elapsed) self.data.update({ "remaining_time": remaining, "status": f"搅拌中: {stir_speed} RPM" }) if remaining <= 0: break await self._ros_node.sleep(1.0) return True ``` ### 日志规范 使用项目自定义日志系统: ```python from unilabos.utils.log import logger, info, debug, warning, error, trace # 在设备类中使用 self.logger = logging.getLogger(f"DeviceName.{self.device_id}") self.logger.info("设备初始化完成") ``` ## 设备驱动开发 ### 设备类结构 ```python from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode class MyDevice: """设备驱动类""" _ros_node: BaseROS2DeviceNode def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs): self.device_id = device_id or "unknown_device" self.config = config or {} self.data = {} # 设备状态数据 def post_init(self, ros_node: BaseROS2DeviceNode): """ROS节点注入""" self._ros_node = ros_node async def initialize(self) -> bool: """初始化设备""" pass async def cleanup(self) -> bool: """清理设备""" pass # 状态属性 - 自动发布为 ROS Topic @property def status(self) -> str: return self.data.get("status", "待机") ``` ### 状态属性装饰器 ```python from unilabos.utils.decorator import topic_config class MyDevice: @property @topic_config(period=1.0, qos=10) # 每秒发布一次 def temperature(self) -> float: return self._temperature ``` ### 虚拟设备 虚拟设备放置在 `unilabos/devices/virtual/` 目录下,命名为 `virtual_*.py` ## 注册表配置 ### 设备注册表 (YAML) 位置: `unilabos/registry/devices/*.yaml` ```yaml my_device_type: category: - my_category description: "设备描述" version: "1.0.0" class: module: "unilabos.devices.my_device:MyDevice" type: python status_types: status: String temperature: Float64 action_value_mappings: auto-initialize: type: UniLabJsonCommandAsync goal: {} feedback: {} result: {} schema: {...} ``` ### 资源注册表 (YAML) 位置: `unilabos/registry/resources/**/*.yaml` ```yaml my_container: category: - container class: module: "unilabos.resources.my_resource:MyContainer" type: pylabrobot version: "1.0.0" ``` ## 协议编译器 位置: `unilabos/compile/*_protocol.py` ### 协议生成函数模板 ```python from typing import List, Dict, Any, Union import networkx as nx def generate_my_protocol( G: nx.DiGraph, vessel: Union[str, dict], param1: float = 0.0, **kwargs ) -> List[Dict[str, Any]]: """ 生成操作协议序列 Args: G: 物理拓扑图 vessel: 容器ID或字典 param1: 参数1 Returns: List[Dict]: 动作序列 """ # 提取vessel_id vessel_id = vessel if isinstance(vessel, str) else vessel.get("id", "") # 查找设备 device_id = find_connected_device(G, vessel_id) # 生成动作 action_sequence = [{ "device_id": device_id, "action_name": "my_action", "action_kwargs": { "vessel": {"id": vessel_id}, "param1": float(param1) } }] return action_sequence ``` ## 测试规范 ### 测试文件位置 - 单元测试: `tests/` 目录 - 设备测试: `tests/devices/` - 资源测试: `tests/resources/` - ROS消息测试: `tests/ros/msgs/` ### 测试命名 ```python # tests/devices/my_device/test_my_device.py import pytest def test_device_initialization(): """测试设备初始化""" pass def test_device_action(): """测试设备动作""" pass ``` ## 错误处理 ```python from unilabos.utils.exception import UniLabException try: result = await device.execute_action() except ValueError as e: self.logger.error(f"参数错误: {e}") self.data["status"] = "错误: 参数无效" return False except Exception as e: self.logger.error(f"执行失败: {e}") raise ``` ## 配置管理 ```python from unilabos.config.config import BasicConfig, HTTPConfig # 读取配置 port = BasicConfig.port is_host = BasicConfig.is_host_mode # 配置文件: local_config.py ``` ## 常用工具 ### 单例模式 ```python from unilabos.utils.decorator import singleton @singleton class MyManager: pass ``` ### 类型检查 ```python from unilabos.utils.type_check import NoAliasDumper yaml.dump(data, f, Dumper=NoAliasDumper) ``` ### 导入管理 ```python from unilabos.utils.import_manager import get_class device_class = get_class("unilabos.devices.my_device:MyDevice") ``` ## Git 提交规范 提交信息格式: ``` (): ``` 类型: - `feat`: 新功能 - `fix`: 修复bug - `docs`: 文档更新 - `refactor`: 重构 - `test`: 测试相关 - `chore`: 构建/工具相关 示例: ``` feat(devices): 添加虚拟搅拌器设备 - 实现VirtualStirrer类 - 支持定时搅拌和持续搅拌模式 - 添加速度验证逻辑 ```