diff --git a/.cursor/rules/device-drivers.mdc b/.cursor/rules/device-drivers.mdc new file mode 100644 index 0000000..8adfb33 --- /dev/null +++ b/.cursor/rules/device-drivers.mdc @@ -0,0 +1,328 @@ +--- +description: 设备驱动开发规范 +globs: ["unilabos/devices/**/*.py"] +--- + +# 设备驱动开发规范 + +## 目录结构 + +``` +unilabos/devices/ +├── virtual/ # 虚拟设备(用于测试) +│ ├── virtual_stirrer.py +│ └── virtual_centrifuge.py +├── liquid_handling/ # 液体处理设备 +├── balance/ # 天平设备 +├── hplc/ # HPLC设备 +├── pump_and_valve/ # 泵和阀门 +├── temperature/ # 温度控制设备 +├── workstation/ # 工作站(组合设备) +└── ... +``` + +## 设备类完整模板 + +```python +import asyncio +import logging +import time as time_module +from typing import Dict, Any, Optional + +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode + + +class MyDevice: + """ + 设备类描述 + + Attributes: + device_id: 设备唯一标识 + config: 设备配置字典 + data: 设备状态数据 + """ + + _ros_node: BaseROS2DeviceNode + + def __init__( + self, + device_id: str = None, + config: Dict[str, Any] = None, + **kwargs + ): + """ + 初始化设备 + + Args: + device_id: 设备ID + config: 配置字典 + **kwargs: 其他参数 + """ + # 兼容不同调用方式 + if device_id is None and 'id' in kwargs: + device_id = kwargs.pop('id') + if config is None and 'config' in kwargs: + config = kwargs.pop('config') + + self.device_id = device_id or "unknown_device" + self.config = config or {} + self.data = {} + + # 从config读取参数 + self.port = self.config.get('port') or kwargs.get('port', 'COM1') + self._max_value = self.config.get('max_value', 1000.0) + + # 初始化日志 + self.logger = logging.getLogger(f"MyDevice.{self.device_id}") + + self.logger.info(f"设备 {self.device_id} 已创建") + + def post_init(self, ros_node: BaseROS2DeviceNode): + """ + ROS节点注入 - 在ROS节点创建后调用 + + Args: + ros_node: ROS2设备节点实例 + """ + self._ros_node = ros_node + + async def initialize(self) -> bool: + """ + 初始化设备 - 连接硬件、设置初始状态 + + Returns: + bool: 初始化是否成功 + """ + self.logger.info(f"初始化设备 {self.device_id}") + + try: + # 执行硬件初始化 + # await self._connect_hardware() + + # 设置初始状态 + self.data.update({ + "status": "待机", + "is_running": False, + "current_value": 0.0, + }) + + self.logger.info(f"设备 {self.device_id} 初始化完成") + return True + + except Exception as e: + self.logger.error(f"初始化失败: {e}") + self.data["status"] = f"错误: {e}" + return False + + async def cleanup(self) -> bool: + """ + 清理设备 - 断开连接、释放资源 + + Returns: + bool: 清理是否成功 + """ + self.logger.info(f"清理设备 {self.device_id}") + + self.data.update({ + "status": "离线", + "is_running": False, + }) + + return True + + # ==================== 设备动作 ==================== + + async def execute_action( + self, + param1: float, + param2: str = "", + **kwargs + ) -> bool: + """ + 执行设备动作 + + Args: + param1: 参数1 + param2: 参数2(可选) + + Returns: + bool: 动作是否成功 + """ + # 类型转换和验证 + try: + param1 = float(param1) + except (ValueError, TypeError) as e: + self.logger.error(f"参数类型错误: {e}") + return False + + # 参数验证 + if param1 > self._max_value: + self.logger.error(f"参数超出范围: {param1} > {self._max_value}") + return False + + self.logger.info(f"执行动作: param1={param1}, param2={param2}") + + # 更新状态 + self.data.update({ + "status": "运行中", + "is_running": True, + }) + + # 执行动作(带进度反馈) + duration = 10.0 # 秒 + start_time = time_module.time() + + while True: + elapsed = time_module.time() - start_time + remaining = max(0, duration - elapsed) + progress = min(100, (elapsed / duration) * 100) + + self.data.update({ + "status": f"运行中: {progress:.0f}%", + "remaining_time": remaining, + }) + + if remaining <= 0: + break + + await self._ros_node.sleep(1.0) + + # 完成 + self.data.update({ + "status": "完成", + "is_running": False, + }) + + self.logger.info("动作执行完成") + return True + + # ==================== 状态属性 ==================== + + @property + def status(self) -> str: + """设备状态 - 自动发布为ROS Topic""" + return self.data.get("status", "未知") + + @property + def is_running(self) -> bool: + """是否正在运行""" + return self.data.get("is_running", False) + + @property + def current_value(self) -> float: + """当前值""" + return self.data.get("current_value", 0.0) + + # ==================== 辅助方法 ==================== + + def get_device_info(self) -> Dict[str, Any]: + """获取设备信息""" + return { + "device_id": self.device_id, + "status": self.status, + "is_running": self.is_running, + "current_value": self.current_value, + } + + def __str__(self) -> str: + return f"MyDevice({self.device_id}: {self.status})" +``` + +## 关键规则 + +### 1. 参数处理 + +所有动作方法的参数都可能以字符串形式传入,必须进行类型转换: + +```python +async def my_action(self, value: float, **kwargs) -> bool: + # 始终进行类型转换 + try: + value = float(value) + except (ValueError, TypeError) as e: + self.logger.error(f"参数类型错误: {e}") + return False +``` + +### 2. vessel 参数处理 + +vessel 参数可能是字符串ID或字典: + +```python +def extract_vessel_id(vessel: Union[str, dict]) -> str: + if isinstance(vessel, dict): + return vessel.get("id", "") + return str(vessel) if vessel else "" +``` + +### 3. 状态更新 + +使用 `self.data` 字典存储状态,属性读取状态: + +```python +# 更新状态 +self.data["status"] = "运行中" +self.data["current_speed"] = 300.0 + +# 读取状态(通过属性) +@property +def status(self) -> str: + return self.data.get("status", "待机") +``` + +### 4. 异步等待 + +使用 ROS 节点的 sleep 方法: + +```python +# 正确 +await self._ros_node.sleep(1.0) + +# 避免(除非在纯 Python 测试环境) +await asyncio.sleep(1.0) +``` + +### 5. 进度反馈 + +长时间运行的操作需要提供进度反馈: + +```python +while remaining > 0: + progress = (elapsed / total_time) * 100 + self.data["status"] = f"运行中: {progress:.0f}%" + self.data["remaining_time"] = remaining + + await self._ros_node.sleep(1.0) +``` + +## 虚拟设备 + +虚拟设备用于测试和演示,放在 `unilabos/devices/virtual/` 目录: + +- 类名以 `Virtual` 开头 +- 文件名以 `virtual_` 开头 +- 模拟真实设备的行为和时序 +- 使用表情符号增强日志可读性(可选) + +## 工作站设备 + +工作站是组合多个设备的复杂设备: + +```python +from unilabos.devices.workstation.workstation_base import WorkstationBase + +class MyWorkstation(WorkstationBase): + """组合工作站""" + + async def execute_workflow(self, workflow: Dict[str, Any]) -> bool: + """执行工作流""" + pass +``` + +## 设备注册 + +设备类开发完成后,需要在注册表中注册: + +1. 创建/编辑 `unilabos/registry/devices/my_category.yaml` +2. 添加设备配置(参考 `virtual_device.yaml`) +3. 运行 `--complete_registry` 自动生成 schema diff --git a/.cursor/rules/protocol-development.mdc b/.cursor/rules/protocol-development.mdc new file mode 100644 index 0000000..a94f947 --- /dev/null +++ b/.cursor/rules/protocol-development.mdc @@ -0,0 +1,240 @@ +--- +description: 协议编译器开发规范 +globs: ["unilabos/compile/**/*.py"] +--- + +# 协议编译器开发规范 + +## 概述 + +协议编译器负责将高级实验操作(如 Stir、Add、Filter)编译为设备可执行的动作序列。 + +## 文件命名 + +- 位置: `unilabos/compile/` +- 命名: `{operation}_protocol.py` +- 示例: `stir_protocol.py`, `add_protocol.py`, `filter_protocol.py` + +## 协议函数模板 + +```python +from typing import List, Dict, Any, Union +import networkx as nx +import logging + +from .utils.unit_parser import parse_time_input +from .utils.vessel_parser import extract_vessel_id + +logger = logging.getLogger(__name__) + + +def generate_{operation}_protocol( + G: nx.DiGraph, + vessel: Union[str, dict], + param1: Union[str, float] = "0", + param2: float = 0.0, + **kwargs +) -> List[Dict[str, Any]]: + """ + 生成{操作}协议序列 + + Args: + G: 物理拓扑图 (NetworkX DiGraph) + vessel: 容器ID或Resource字典 + param1: 参数1(支持字符串单位,如 "5 min") + param2: 参数2 + **kwargs: 其他参数 + + Returns: + List[Dict]: 动作序列 + + Raises: + ValueError: 参数无效时 + """ + # 1. 提取 vessel_id + vessel_id = extract_vessel_id(vessel) + + # 2. 验证参数 + if not vessel_id: + raise ValueError("vessel 参数不能为空") + + if vessel_id not in G.nodes(): + raise ValueError(f"容器 '{vessel_id}' 不存在于系统中") + + # 3. 解析参数(支持单位) + parsed_param1 = parse_time_input(param1) # "5 min" -> 300.0 + + # 4. 查找设备 + device_id = find_connected_device(G, vessel_id, device_type="my_device") + + # 5. 生成动作序列 + action_sequence = [] + + action = { + "device_id": device_id, + "action_name": "my_action", + "action_kwargs": { + "vessel": {"id": vessel_id}, # 始终使用字典格式 + "param1": float(parsed_param1), + "param2": float(param2), + } + } + action_sequence.append(action) + + logger.info(f"生成协议: {len(action_sequence)} 个动作") + return action_sequence + + +def find_connected_device( + G: nx.DiGraph, + vessel_id: str, + device_type: str = "" +) -> str: + """ + 查找与容器相连的设备 + + Args: + G: 拓扑图 + vessel_id: 容器ID + device_type: 设备类型关键字 + + Returns: + str: 设备ID + """ + # 查找所有匹配类型的设备 + device_nodes = [] + for node in G.nodes(): + node_class = G.nodes[node].get('class', '') or '' + if device_type.lower() in node_class.lower(): + device_nodes.append(node) + + # 检查连接 + if vessel_id and device_nodes: + for device in device_nodes: + if G.has_edge(device, vessel_id) or G.has_edge(vessel_id, device): + return device + + # 返回第一个可用设备 + if device_nodes: + return device_nodes[0] + + # 默认设备 + return f"{device_type}_1" +``` + +## 关键规则 + +### 1. vessel 参数处理 + +vessel 参数可能是字符串或字典,需要统一处理: + +```python +def extract_vessel_id(vessel: Union[str, dict]) -> str: + """提取vessel_id""" + if isinstance(vessel, dict): + # 可能是 {"id": "xxx"} 或完整 Resource 对象 + return vessel.get("id", list(vessel.values())[0].get("id", "")) + return str(vessel) if vessel else "" +``` + +### 2. action_kwargs 中的 vessel + +始终使用 `{"id": vessel_id}` 格式传递 vessel: + +```python +# 正确 +"action_kwargs": { + "vessel": {"id": vessel_id}, # 字符串ID包装为字典 +} + +# 避免 +"action_kwargs": { + "vessel": vessel_resource, # 不要传递完整 Resource 对象 +} +``` + +### 3. 单位解析 + +使用 `parse_time_input` 解析时间参数: + +```python +from .utils.unit_parser import parse_time_input + +# 支持格式: "5 min", "1 h", "300", "1.5 hours" +time_seconds = parse_time_input("5 min") # -> 300.0 +time_seconds = parse_time_input(120) # -> 120.0 +time_seconds = parse_time_input("1 h") # -> 3600.0 +``` + +### 4. 参数验证 + +所有参数必须进行验证和类型转换: + +```python +# 验证范围 +if speed < 10.0 or speed > 1500.0: + logger.warning(f"速度 {speed} 超出范围,修正为 300") + speed = 300.0 + +# 类型转换 +param = float(param) if not isinstance(param, (int, float)) else param +``` + +### 5. 日志记录 + +使用项目日志记录器: + +```python +logger = logging.getLogger(__name__) + +def generate_protocol(...): + logger.info(f"开始生成协议...") + logger.debug(f"参数: vessel={vessel_id}, time={time}") + logger.warning(f"参数修正: {old_value} -> {new_value}") +``` + +## 便捷函数 + +为常用操作提供便捷函数: + +```python +def stir_briefly(G: nx.DiGraph, vessel: Union[str, dict], + speed: float = 300.0) -> List[Dict[str, Any]]: + """短时间搅拌(30秒)""" + return generate_stir_protocol(G, vessel, time="30", stir_speed=speed) + +def stir_vigorously(G: nx.DiGraph, vessel: Union[str, dict], + time: str = "5 min") -> List[Dict[str, Any]]: + """剧烈搅拌""" + return generate_stir_protocol(G, vessel, time=time, stir_speed=800.0) +``` + +## 测试函数 + +每个协议文件应包含测试函数: + +```python +def test_{operation}_protocol(): + """测试协议生成""" + # 测试参数处理 + vessel_dict = {"id": "flask_1", "name": "反应瓶1"} + vessel_id = extract_vessel_id(vessel_dict) + assert vessel_id == "flask_1" + + # 测试单位解析 + time_s = parse_time_input("5 min") + assert time_s == 300.0 + + +if __name__ == "__main__": + test_{operation}_protocol() +``` + +## 现有协议参考 + +- `stir_protocol.py` - 搅拌操作 +- `add_protocol.py` - 添加物料 +- `filter_protocol.py` - 过滤操作 +- `heatchill_protocol.py` - 加热/冷却 +- `separate_protocol.py` - 分离操作 +- `evaporate_protocol.py` - 蒸发操作 diff --git a/.cursor/rules/registry-config.mdc b/.cursor/rules/registry-config.mdc new file mode 100644 index 0000000..bba2f22 --- /dev/null +++ b/.cursor/rules/registry-config.mdc @@ -0,0 +1,319 @@ +--- +description: 注册表配置规范 (YAML) +globs: ["unilabos/registry/**/*.yaml"] +--- + +# 注册表配置规范 + +## 概述 + +注册表使用 YAML 格式定义设备和资源类型,是 Uni-Lab-OS 的核心配置系统。 + +## 目录结构 + +``` +unilabos/registry/ +├── devices/ # 设备类型注册 +│ ├── virtual_device.yaml +│ ├── liquid_handler.yaml +│ └── ... +├── device_comms/ # 通信设备配置 +│ ├── communication_devices.yaml +│ └── modbus_ioboard.yaml +└── resources/ # 资源类型注册 + ├── bioyond/ + ├── organic/ + ├── opentrons/ + └── ... +``` + +## 设备注册表格式 + +### 基本结构 + +```yaml +device_type_id: + # 基本信息 + description: "设备描述" + version: "1.0.0" + category: + - category_name + icon: "icon_device.webp" + + # 类配置 + class: + module: "unilabos.devices.my_module:MyClass" + type: python + + # 状态类型(属性 -> ROS消息类型) + status_types: + status: String + temperature: Float64 + is_running: Bool + + # 动作映射 + action_value_mappings: + action_name: + type: UniLabJsonCommand # 或 UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} + handles: {} +``` + +### action_value_mappings 详细格式 + +```yaml +action_value_mappings: + # 同步动作 + my_sync_action: + type: UniLabJsonCommand + goal: + param1: param1 + param2: param2 + feedback: {} + result: + success: success + message: message + goal_default: + param1: 0.0 + param2: "" + handles: {} + placeholder_keys: + device_param: unilabos_devices # 设备选择器 + resource_param: unilabos_resources # 资源选择器 + schema: + title: "动作名称参数" + description: "动作描述" + type: object + properties: + goal: + type: object + properties: + param1: + type: number + param2: + type: string + required: + - param1 + feedback: {} + result: + type: object + properties: + success: + type: boolean + message: + type: string + required: + - goal + + # 异步动作 + my_async_action: + type: UniLabJsonCommandAsync + goal: {} + feedback: + progress: progress + current_status: status + result: + success: success + schema: {...} +``` + +### 自动生成的动作 + +以 `auto-` 开头的动作由系统自动生成: + +```yaml +action_value_mappings: + auto-initialize: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} + + auto-cleanup: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} +``` + +### handles 配置 + +用于工作流编辑器中的数据流连接: + +```yaml +handles: + input: + - handler_key: "input_resource" + data_type: "resource" + label: "输入资源" + data_source: "handle" + data_key: "resources" + output: + - handler_key: "output_labware" + data_type: "resource" + label: "输出器皿" + data_source: "executor" + data_key: "created_resource.@flatten" +``` + +## 资源注册表格式 + +```yaml +resource_type_id: + description: "资源描述" + version: "1.0.0" + category: + - category_name + icon: "" + handles: [] + init_param_schema: {} + + class: + module: "unilabos.resources.my_module:MyResource" + type: pylabrobot # 或 python +``` + +### PyLabRobot 资源示例 + +```yaml +BIOYOND_Electrolyte_6VialCarrier: + category: + - bottle_carriers + - bioyond + class: + module: "unilabos.resources.bioyond.bottle_carriers:BIOYOND_Electrolyte_6VialCarrier" + type: pylabrobot + version: "1.0.0" +``` + +## 状态类型映射 + +Python 类型到 ROS 消息类型的映射: + +| Python 类型 | ROS 消息类型 | +|------------|-------------| +| `str` | `String` | +| `bool` | `Bool` | +| `int` | `Int64` | +| `float` | `Float64` | +| `list` | `String` (序列化) | +| `dict` | `String` (序列化) | + +## 自动完善注册表 + +使用 `--complete_registry` 参数自动生成 schema: + +```bash +python -m unilabos.app.main --complete_registry +``` + +这会: +1. 扫描设备类的方法签名 +2. 自动生成 `auto-` 前缀的动作 +3. 生成 JSON Schema +4. 更新 YAML 文件 + +## 验证规则 + +1. **device_type_id** 必须唯一 +2. **module** 路径必须正确可导入 +3. **status_types** 的类型必须是有效的 ROS 消息类型 +4. **schema** 必须是有效的 JSON Schema + +## 示例:完整设备配置 + +```yaml +virtual_stirrer: + category: + - virtual_device + description: "虚拟搅拌器设备" + version: "1.0.0" + icon: "icon_stirrer.webp" + handles: [] + init_param_schema: {} + + class: + module: "unilabos.devices.virtual.virtual_stirrer:VirtualStirrer" + type: python + + status_types: + status: String + operation_mode: String + current_speed: Float64 + is_stirring: Bool + remaining_time: Float64 + + action_value_mappings: + auto-initialize: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: + title: "initialize参数" + type: object + properties: + goal: + type: object + properties: {} + feedback: {} + result: {} + required: + - goal + + stir: + type: UniLabJsonCommandAsync + goal: + stir_time: stir_time + stir_speed: stir_speed + settling_time: settling_time + feedback: + current_speed: current_speed + remaining_time: remaining_time + result: + success: success + goal_default: + stir_time: 60.0 + stir_speed: 300.0 + settling_time: 30.0 + handles: {} + schema: + title: "stir参数" + description: "搅拌操作" + type: object + properties: + goal: + type: object + properties: + stir_time: + type: number + description: "搅拌时间(秒)" + stir_speed: + type: number + description: "搅拌速度(RPM)" + settling_time: + type: number + description: "沉降时间(秒)" + required: + - stir_time + - stir_speed + feedback: + type: object + properties: + current_speed: + type: number + remaining_time: + type: number + result: + type: object + properties: + success: + type: boolean + required: + - goal +``` diff --git a/.cursor/rules/ros-integration.mdc b/.cursor/rules/ros-integration.mdc new file mode 100644 index 0000000..4057b48 --- /dev/null +++ b/.cursor/rules/ros-integration.mdc @@ -0,0 +1,233 @@ +--- +description: ROS 2 集成开发规范 +globs: ["unilabos/ros/**/*.py", "**/*_node.py"] +--- + +# ROS 2 集成开发规范 + +## 概述 + +Uni-Lab-OS 使用 ROS 2 作为设备通信中间件,基于 rclpy 实现。 + +## 核心组件 + +### BaseROS2DeviceNode + +设备节点基类,提供: +- ROS Topic 自动发布(状态属性) +- Action Server 自动创建(设备动作) +- 资源管理服务 +- 异步任务调度 + +```python +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode +``` + +### 消息转换器 + +```python +from unilabos.ros.msgs.message_converter import ( + convert_to_ros_msg, + convert_from_ros_msg_with_mapping, + msg_converter_manager, + ros_action_to_json_schema, + ros_message_to_json_schema, +) +``` + +## 设备与 ROS 集成 + +### post_init 方法 + +设备类必须实现 `post_init` 方法接收 ROS 节点: + +```python +class MyDevice: + _ros_node: BaseROS2DeviceNode + + def post_init(self, ros_node: BaseROS2DeviceNode): + """ROS节点注入""" + self._ros_node = ros_node +``` + +### 状态属性发布 + +设备的 `@property` 属性会自动发布为 ROS Topic: + +```python +class MyDevice: + @property + def temperature(self) -> float: + return self._temperature + + # 自动发布到 /{namespace}/temperature Topic +``` + +### Topic 配置装饰器 + +```python +from unilabos.utils.decorator import topic_config + +class MyDevice: + @property + @topic_config(period=1.0, print_publish=False, qos=10) + def fast_data(self) -> float: + """高频数据 - 每秒发布一次""" + return self._fast_data + + @property + @topic_config(period=5.0) + def slow_data(self) -> str: + """低频数据 - 每5秒发布一次""" + return self._slow_data +``` + +### 订阅装饰器 + +```python +from unilabos.utils.decorator import subscribe + +class MyDevice: + @subscribe(topic="/external/sensor_data", qos=10) + def on_sensor_data(self, msg): + """订阅外部Topic""" + self._sensor_value = msg.data +``` + +## 异步操作 + +### 使用 ROS 节点睡眠 + +```python +# 推荐:使用ROS节点的睡眠方法 +await self._ros_node.sleep(1.0) + +# 不推荐:直接使用asyncio(可能导致回调阻塞) +await asyncio.sleep(1.0) +``` + +### 获取事件循环 + +```python +from unilabos.ros.x.rclpyx import get_event_loop + +loop = get_event_loop() +``` + +## 消息类型 + +### unilabos_msgs 包 + +```python +from unilabos_msgs.msg import Resource +from unilabos_msgs.srv import ( + ResourceAdd, + ResourceDelete, + ResourceUpdate, + ResourceList, + SerialCommand, +) +from unilabos_msgs.action import SendCmd +``` + +### Resource 消息结构 + +```python +Resource: + id: str + name: str + category: str + type: str + parent: str + children: List[str] + config: str # JSON字符串 + data: str # JSON字符串 + sample_id: str + pose: Pose +``` + +## 日志适配器 + +```python +from unilabos.utils.log import info, debug, warning, error, trace + +class MyDevice: + def __init__(self): + # 创建设备专属日志器 + self.logger = logging.getLogger(f"MyDevice.{self.device_id}") +``` + +ROSLoggerAdapter 同时向自定义日志和 ROS 日志发送消息。 + +## Action Server + +设备动作自动创建为 ROS Action Server: + +```yaml +# 在注册表中配置 +action_value_mappings: + my_action: + type: UniLabJsonCommandAsync # 异步Action + goal: {...} + feedback: {...} + result: {...} +``` + +### Action 类型 + +- **UniLabJsonCommand**: 同步动作 +- **UniLabJsonCommandAsync**: 异步动作(支持feedback) + +## 服务客户端 + +```python +from rclpy.client import Client + +# 调用其他节点的服务 +response = await self._ros_node.call_service( + service_name="/other_node/service", + request=MyServiceRequest(...) +) +``` + +## 命名空间 + +设备节点使用命名空间隔离: + +``` +/{device_id}/ # 设备命名空间 +/{device_id}/status # 状态Topic +/{device_id}/temperature # 温度Topic +/{device_id}/my_action # 动作Server +``` + +## 调试 + +### 查看 Topic + +```bash +ros2 topic list +ros2 topic echo /{device_id}/status +``` + +### 查看 Action + +```bash +ros2 action list +ros2 action info /{device_id}/my_action +``` + +### 查看 Service + +```bash +ros2 service list +ros2 service call /{device_id}/resource_list unilabos_msgs/srv/ResourceList +``` + +## 最佳实践 + +1. **状态属性命名**: 使用蛇形命名法(snake_case) +2. **Topic 频率**: 根据数据变化频率调整,避免过高频率 +3. **Action 反馈**: 长时间操作提供进度反馈 +4. **错误处理**: 使用 try-except 捕获并记录错误 +5. **资源清理**: 在 cleanup 方法中正确清理资源 diff --git a/.cursor/rules/testing-patterns.mdc b/.cursor/rules/testing-patterns.mdc new file mode 100644 index 0000000..73df7b0 --- /dev/null +++ b/.cursor/rules/testing-patterns.mdc @@ -0,0 +1,357 @@ +--- +description: 测试开发规范 +globs: ["tests/**/*.py", "**/test_*.py"] +--- + +# 测试开发规范 + +## 目录结构 + +``` +tests/ +├── __init__.py +├── devices/ # 设备测试 +│ └── liquid_handling/ +│ └── test_transfer_liquid.py +├── resources/ # 资源测试 +│ ├── test_bottle_carrier.py +│ └── test_resourcetreeset.py +├── ros/ # ROS消息测试 +│ └── msgs/ +│ ├── test_basic.py +│ ├── test_conversion.py +│ └── test_mapping.py +└── workflow/ # 工作流测试 + └── merge_workflow.py +``` + +## 测试框架 + +使用 pytest 作为测试框架: + +```bash +# 运行所有测试 +pytest tests/ + +# 运行特定测试文件 +pytest tests/resources/test_bottle_carrier.py + +# 运行特定测试函数 +pytest tests/resources/test_bottle_carrier.py::test_bottle_carrier + +# 显示详细输出 +pytest -v tests/ + +# 显示打印输出 +pytest -s tests/ +``` + +## 测试文件模板 + +```python +import pytest +from typing import List, Dict, Any + +# 导入被测试的模块 +from unilabos.resources.bioyond.bottle_carriers import ( + BIOYOND_Electrolyte_6VialCarrier, +) +from unilabos.resources.bioyond.bottles import ( + BIOYOND_PolymerStation_Solid_Vial, +) + + +class TestBottleCarrier: + """BottleCarrier 测试类""" + + def setup_method(self): + """每个测试方法前执行""" + self.carrier = BIOYOND_Electrolyte_6VialCarrier("test_carrier") + + def teardown_method(self): + """每个测试方法后执行""" + pass + + def test_carrier_creation(self): + """测试载架创建""" + assert self.carrier.name == "test_carrier" + assert len(self.carrier.sites) == 6 + + def test_bottle_placement(self): + """测试瓶子放置""" + bottle = BIOYOND_PolymerStation_Solid_Vial("test_bottle") + # 测试逻辑... + assert bottle.name == "test_bottle" + + +def test_standalone_function(): + """独立测试函数""" + result = some_function() + assert result is True + + +# 参数化测试 +@pytest.mark.parametrize("input,expected", [ + ("5 min", 300.0), + ("1 h", 3600.0), + ("120", 120.0), + (60, 60.0), +]) +def test_time_parsing(input, expected): + """测试时间解析""" + from unilabos.compile.utils.unit_parser import parse_time_input + assert parse_time_input(input) == expected + + +# 异常测试 +def test_invalid_input_raises_error(): + """测试无效输入抛出异常""" + with pytest.raises(ValueError) as exc_info: + invalid_function("bad_input") + assert "invalid" in str(exc_info.value).lower() + + +# 跳过条件测试 +@pytest.mark.skipif( + not os.environ.get("ROS_DISTRO"), + reason="需要ROS环境" +) +def test_ros_feature(): + """需要ROS环境的测试""" + pass +``` + +## 设备测试 + +### 虚拟设备测试 + +```python +import pytest +import asyncio +from unittest.mock import MagicMock, AsyncMock + +from unilabos.devices.virtual.virtual_stirrer import VirtualStirrer + + +class TestVirtualStirrer: + """VirtualStirrer 测试""" + + @pytest.fixture + def stirrer(self): + """创建测试用搅拌器""" + device = VirtualStirrer( + device_id="test_stirrer", + config={"max_speed": 1500.0, "min_speed": 50.0} + ) + + # Mock ROS节点 + mock_node = MagicMock() + mock_node.sleep = AsyncMock(return_value=None) + device.post_init(mock_node) + + return device + + @pytest.mark.asyncio + async def test_initialize(self, stirrer): + """测试初始化""" + result = await stirrer.initialize() + assert result is True + assert stirrer.status == "待机中" + + @pytest.mark.asyncio + async def test_stir_action(self, stirrer): + """测试搅拌动作""" + await stirrer.initialize() + + result = await stirrer.stir( + stir_time=5.0, + stir_speed=300.0, + settling_time=2.0 + ) + + assert result is True + assert stirrer.operation_mode == "Completed" + + @pytest.mark.asyncio + async def test_stir_invalid_speed(self, stirrer): + """测试无效速度""" + await stirrer.initialize() + + # 速度超出范围 + result = await stirrer.stir( + stir_time=5.0, + stir_speed=2000.0, # 超过max_speed + settling_time=0.0 + ) + + assert result is False + assert "错误" in stirrer.status +``` + +### 异步测试配置 + +```python +# conftest.py +import pytest +import asyncio + + +@pytest.fixture(scope="session") +def event_loop(): + """创建事件循环""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() +``` + +## 资源测试 + +```python +import pytest +from unilabos.resources.resource_tracker import ( + ResourceTreeSet, + ResourceTreeInstance, +) + + +def test_resource_tree_creation(): + """测试资源树创建""" + tree_set = ResourceTreeSet() + + # 添加资源 + resource = {"id": "res_1", "name": "Resource 1"} + tree_set.add_resource(resource) + + # 验证 + assert len(tree_set.all_nodes) == 1 + assert tree_set.get_resource("res_1") is not None + + +def test_resource_tree_merge(): + """测试资源树合并""" + local_set = ResourceTreeSet() + remote_set = ResourceTreeSet() + + # 设置数据... + + local_set.merge_remote_resources(remote_set) + + # 验证合并结果... +``` + +## ROS 消息测试 + +```python +import pytest +from unilabos.ros.msgs.message_converter import ( + convert_to_ros_msg, + convert_from_ros_msg_with_mapping, + msg_converter_manager, +) + + +def test_message_conversion(): + """测试消息转换""" + # Python -> ROS + python_data = {"id": "test", "value": 42} + ros_msg = convert_to_ros_msg(python_data, MyMsgType) + + assert ros_msg.id == "test" + assert ros_msg.value == 42 + + # ROS -> Python + result = convert_from_ros_msg_with_mapping(ros_msg, mapping) + assert result["id"] == "test" +``` + +## 协议测试 + +```python +import pytest +import networkx as nx +from unilabos.compile.stir_protocol import ( + generate_stir_protocol, + extract_vessel_id, +) + + +@pytest.fixture +def topology_graph(): + """创建测试拓扑图""" + G = nx.DiGraph() + G.add_node("flask_1", **{"class": "flask"}) + G.add_node("stirrer_1", **{"class": "virtual_stirrer"}) + G.add_edge("stirrer_1", "flask_1") + return G + + +def test_generate_stir_protocol(topology_graph): + """测试搅拌协议生成""" + actions = generate_stir_protocol( + G=topology_graph, + vessel="flask_1", + time="5 min", + stir_speed=300.0 + ) + + assert len(actions) == 1 + assert actions[0]["device_id"] == "stirrer_1" + assert actions[0]["action_name"] == "stir" + + +def test_extract_vessel_id(): + """测试vessel_id提取""" + # 字典格式 + assert extract_vessel_id({"id": "flask_1"}) == "flask_1" + + # 字符串格式 + assert extract_vessel_id("flask_2") == "flask_2" + + # 空值 + assert extract_vessel_id("") == "" +``` + +## 测试标记 + +```python +# 慢速测试 +@pytest.mark.slow +def test_long_running(): + pass + +# 需要网络 +@pytest.mark.network +def test_network_call(): + pass + +# 需要ROS +@pytest.mark.ros +def test_ros_feature(): + pass +``` + +运行特定标记的测试: + +```bash +pytest -m "not slow" # 排除慢速测试 +pytest -m ros # 仅ROS测试 +``` + +## 覆盖率 + +```bash +# 生成覆盖率报告 +pytest --cov=unilabos tests/ + +# HTML报告 +pytest --cov=unilabos --cov-report=html tests/ +``` + +## 最佳实践 + +1. **测试命名**: `test_{功能}_{场景}_{预期结果}` +2. **独立性**: 每个测试独立运行,不依赖其他测试 +3. **Mock外部依赖**: 使用 unittest.mock 模拟外部服务 +4. **参数化**: 使用 `@pytest.mark.parametrize` 减少重复代码 +5. **fixtures**: 使用 fixtures 共享测试设置 +6. **断言清晰**: 每个断言只验证一件事 diff --git a/.cursor/rules/unilabos-project.mdc b/.cursor/rules/unilabos-project.mdc new file mode 100644 index 0000000..1b6a24e --- /dev/null +++ b/.cursor/rules/unilabos-project.mdc @@ -0,0 +1,353 @@ +--- +description: Uni-Lab-OS 实验室自动化平台开发规范 - 核心规则 +globs: ["**/*.py", "**/*.yaml", "**/*.json"] +--- + +# Uni-Lab-OS 项目开发规范 + +## 项目概述 + +Uni-Lab-OS 是一个实验室自动化操作系统,用于连接和控制各种实验设备,实现实验工作流的自动化和标准化。 + +## 技术栈 + +- **Python 3.11** - 核心开发语言 +- **ROS 2** - 设备通信中间件 (rclpy) +- **Conda/Mamba** - 包管理 (robostack-staging, conda-forge) +- **FastAPI** - Web API 服务 +- **WebSocket** - 实时通信 +- **NetworkX** - 拓扑图管理 +- **YAML** - 配置和注册表定义 +- **PyLabRobot** - 实验室自动化库集成 +- **pytest** - 测试框架 +- **asyncio** - 异步编程 + +## 项目结构 + +``` +unilabos/ +├── app/ # 应用入口、Web服务、后端 +├── compile/ # 协议编译器 (stir, add, filter 等) +├── config/ # 配置管理 +├── devices/ # 设备驱动 (真实/虚拟) +├── device_comms/ # 设备通信协议 +├── device_mesh/ # 3D网格和可视化 +├── registry/ # 设备和资源类型注册表 (YAML) +├── resources/ # 资源定义 +├── ros/ # ROS 2 集成 +├── utils/ # 工具函数 +└── workflow/ # 工作流管理 +``` + +## 代码规范 + +### Python 风格 + +1. **类型注解**:所有函数必须使用类型注解 + ```python + def transfer_liquid( + source: str, + destination: str, + volume: float, + **kwargs + ) -> List[Dict[str, Any]]: + ``` + +2. **Docstring**:使用 Google 风格的文档字符串 + ```python + def initialize(self) -> bool: + """ + 初始化设备 + + Returns: + bool: 初始化是否成功 + """ + ``` + +3. **导入顺序**: + - 标准库 + - 第三方库 + - ROS 相关 (rclpy, unilabos_msgs) + - 项目内部模块 + +### 异步编程 + +1. 设备操作方法使用 `async def` +2. 使用 `await self._ros_node.sleep()` 而非 `asyncio.sleep()` +3. 长时间运行操作需提供进度反馈 + +```python +async def stir(self, stir_time: float, stir_speed: float, **kwargs) -> bool: + """执行搅拌操作""" + start_time = time_module.time() + while True: + elapsed = time_module.time() - start_time + remaining = max(0, stir_time - elapsed) + + self.data.update({ + "remaining_time": remaining, + "status": f"搅拌中: {stir_speed} RPM" + }) + + if remaining <= 0: + break + await self._ros_node.sleep(1.0) + return True +``` + +### 日志规范 + +使用项目自定义日志系统: + +```python +from unilabos.utils.log import logger, info, debug, warning, error, trace + +# 在设备类中使用 +self.logger = logging.getLogger(f"DeviceName.{self.device_id}") +self.logger.info("设备初始化完成") +``` + +## 设备驱动开发 + +### 设备类结构 + +```python +from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode + +class MyDevice: + """设备驱动类""" + + _ros_node: BaseROS2DeviceNode + + def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs): + self.device_id = device_id or "unknown_device" + self.config = config or {} + self.data = {} # 设备状态数据 + + def post_init(self, ros_node: BaseROS2DeviceNode): + """ROS节点注入""" + self._ros_node = ros_node + + async def initialize(self) -> bool: + """初始化设备""" + pass + + async def cleanup(self) -> bool: + """清理设备""" + pass + + # 状态属性 - 自动发布为 ROS Topic + @property + def status(self) -> str: + return self.data.get("status", "待机") +``` + +### 状态属性装饰器 + +```python +from unilabos.utils.decorator import topic_config + +class MyDevice: + @property + @topic_config(period=1.0, qos=10) # 每秒发布一次 + def temperature(self) -> float: + return self._temperature +``` + +### 虚拟设备 + +虚拟设备放置在 `unilabos/devices/virtual/` 目录下,命名为 `virtual_*.py` + +## 注册表配置 + +### 设备注册表 (YAML) + +位置: `unilabos/registry/devices/*.yaml` + +```yaml +my_device_type: + category: + - my_category + description: "设备描述" + version: "1.0.0" + class: + module: "unilabos.devices.my_device:MyDevice" + type: python + status_types: + status: String + temperature: Float64 + action_value_mappings: + auto-initialize: + type: UniLabJsonCommandAsync + goal: {} + feedback: {} + result: {} + schema: {...} +``` + +### 资源注册表 (YAML) + +位置: `unilabos/registry/resources/**/*.yaml` + +```yaml +my_container: + category: + - container + class: + module: "unilabos.resources.my_resource:MyContainer" + type: pylabrobot + version: "1.0.0" +``` + +## 协议编译器 + +位置: `unilabos/compile/*_protocol.py` + +### 协议生成函数模板 + +```python +from typing import List, Dict, Any, Union +import networkx as nx + +def generate_my_protocol( + G: nx.DiGraph, + vessel: Union[str, dict], + param1: float = 0.0, + **kwargs +) -> List[Dict[str, Any]]: + """ + 生成操作协议序列 + + Args: + G: 物理拓扑图 + vessel: 容器ID或字典 + param1: 参数1 + + Returns: + List[Dict]: 动作序列 + """ + # 提取vessel_id + vessel_id = vessel if isinstance(vessel, str) else vessel.get("id", "") + + # 查找设备 + device_id = find_connected_device(G, vessel_id) + + # 生成动作 + action_sequence = [{ + "device_id": device_id, + "action_name": "my_action", + "action_kwargs": { + "vessel": {"id": vessel_id}, + "param1": float(param1) + } + }] + + return action_sequence +``` + +## 测试规范 + +### 测试文件位置 + +- 单元测试: `tests/` 目录 +- 设备测试: `tests/devices/` +- 资源测试: `tests/resources/` +- ROS消息测试: `tests/ros/msgs/` + +### 测试命名 + +```python +# tests/devices/my_device/test_my_device.py + +import pytest + +def test_device_initialization(): + """测试设备初始化""" + pass + +def test_device_action(): + """测试设备动作""" + pass +``` + +## 错误处理 + +```python +from unilabos.utils.exception import UniLabException + +try: + result = await device.execute_action() +except ValueError as e: + self.logger.error(f"参数错误: {e}") + self.data["status"] = "错误: 参数无效" + return False +except Exception as e: + self.logger.error(f"执行失败: {e}") + raise +``` + +## 配置管理 + +```python +from unilabos.config.config import BasicConfig, HTTPConfig + +# 读取配置 +port = BasicConfig.port +is_host = BasicConfig.is_host_mode + +# 配置文件: local_config.py +``` + +## 常用工具 + +### 单例模式 + +```python +from unilabos.utils.decorator import singleton + +@singleton +class MyManager: + pass +``` + +### 类型检查 + +```python +from unilabos.utils.type_check import NoAliasDumper + +yaml.dump(data, f, Dumper=NoAliasDumper) +``` + +### 导入管理 + +```python +from unilabos.utils.import_manager import get_class + +device_class = get_class("unilabos.devices.my_device:MyDevice") +``` + +## Git 提交规范 + +提交信息格式: +``` +(): + + +``` + +类型: +- `feat`: 新功能 +- `fix`: 修复bug +- `docs`: 文档更新 +- `refactor`: 重构 +- `test`: 测试相关 +- `chore`: 构建/工具相关 + +示例: +``` +feat(devices): 添加虚拟搅拌器设备 + +- 实现VirtualStirrer类 +- 支持定时搅拌和持续搅拌模式 +- 添加速度验证逻辑 +``` diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 0000000..0bd258b --- /dev/null +++ b/.cursorignore @@ -0,0 +1,188 @@ +# ============================================================ +# Uni-Lab-OS Cursor Ignore 配置,控制 Cursor AI 的文件索引范围 +# ============================================================ + +# ==================== 敏感配置文件 ==================== +# 本地配置(可能包含密钥) +**/local_config.py +test_config.py +local_test*.py + +# 环境变量和密钥 +.env +.env.* +**/.certs/ +*.pem +*.key +credentials.json +secrets.yaml + +# ==================== 二进制和 3D 模型文件 ==================== +# 3D 模型文件(无需索引) +*.stl +*.dae +*.glb +*.gltf +*.obj +*.fbx +*.blend + +# URDF/Xacro 机器人描述文件(大型XML) +*.xacro + +# 图片文件 +*.png +*.jpg +*.jpeg +*.gif +*.webp +*.ico +*.svg +*.bmp + +# 压缩包 +*.zip +*.tar +*.tar.gz +*.tgz +*.bz2 +*.rar +*.7z + +# ==================== Python 生成文件 ==================== +__pycache__/ +*.py[cod] +*$py.class +*.so +*.pyd +*.egg +*.egg-info/ +.eggs/ +dist/ +build/ +*.manifest +*.spec + +# ==================== IDE 和编辑器 ==================== +.idea/ +.vscode/ +*.swp +*.swo +*~ +.#* + +# ==================== 测试和覆盖率 ==================== +.pytest_cache/ +.coverage +.coverage.* +htmlcov/ +.tox/ +.nox/ +coverage.xml +*.cover + +# ==================== 虚拟环境 ==================== +.venv/ +venv/ +env/ +ENV/ + +# ==================== ROS 2 生成文件 ==================== +# ROS 构建目录 +build/ +install/ +log/ +logs/ +devel/ + +# ROS 消息生成 +msg_gen/ +srv_gen/ +msg/*Action.msg +msg/*ActionFeedback.msg +msg/*ActionGoal.msg +msg/*ActionResult.msg +msg/*Feedback.msg +msg/*Goal.msg +msg/*Result.msg +msg/_*.py +srv/_*.py +build_isolated/ +devel_isolated/ + +# ROS 动态配置 +*.cfgc +/cfg/cpp/ +/cfg/*.py + +# ==================== 项目特定目录 ==================== +# 工作数据目录 +unilabos_data/ + +# 临时和输出目录 +temp/ +output/ +cursor_docs/ +configs/ + +# 文档构建 +docs/_build/ +/site + +# ==================== 大型数据文件 ==================== +# 点云数据 +*.pcd + +# GraphML 图形文件 +*.graphml + +# 日志文件 +*.log + +# 数据库 +*.sqlite3 +*.db + +# Jupyter 检查点 +.ipynb_checkpoints/ + +# ==================== 设备网格资源 ==================== +# 3D 网格文件目录(包含大量 STL/DAE 文件) +unilabos/device_mesh/devices/**/*.stl +unilabos/device_mesh/devices/**/*.dae +unilabos/device_mesh/resources/**/*.stl +unilabos/device_mesh/resources/**/*.glb +unilabos/device_mesh/resources/**/*.xacro + +# RViz 配置 +*.rviz + +# ==================== 系统文件 ==================== +.DS_Store +Thumbs.db +desktop.ini + +# ==================== 锁文件 ==================== +poetry.lock +Pipfile.lock +pdm.lock +package-lock.json +yarn.lock + +# ==================== 类型检查缓存 ==================== +.mypy_cache/ +.dmypy.json +.pytype/ +.pyre/ +pyrightconfig.json + +# ==================== 其他 ==================== +# Catkin +CATKIN_IGNORE + +# Eclipse/Qt +.project +.cproject +CMakeLists.txt.user +*.user +qtcreator-* diff --git a/.gitignore b/.gitignore index 838331e..610be61 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,6 @@ temp/ output/ unilabos_data/ pyrightconfig.json -.cursorignore ## Python # Byte-compiled / optimized / DLL files