mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 21:11:12 +00:00
Workshop bj (#99)
* Add LaiYu Liquid device integration and tests Introduce LaiYu Liquid device implementation, including backend, controllers, drivers, configuration, and resource files. Add hardware connection, tip pickup, and simplified test scripts, as well as experiment and registry configuration for LaiYu Liquid. Documentation and .gitignore for the device are also included. * feat(LaiYu_Liquid): 重构设备模块结构并添加硬件文档 refactor: 重新组织LaiYu_Liquid模块目录结构 docs: 添加SOPA移液器和步进电机控制指令文档 fix: 修正设备配置中的最大体积默认值 test: 新增工作台配置测试用例 chore: 删除过时的测试脚本和配置文件 * add * 重构: 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py 并更新所有导入引用 - 使用 git mv 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py - 更新所有相关文件中的导入引用 - 保持代码功能不变,仅改善命名一致性 - 测试确认所有导入正常工作 * 修复: 在 core/__init__.py 中添加 LaiYuLiquidBackend 导出 - 添加 LaiYuLiquidBackend 到导入列表 - 添加 LaiYuLiquidBackend 到 __all__ 导出列表 - 确保所有主要类都可以正确导入 * 修复大小写文件夹名字
This commit is contained in:
44
unilabos/devices/laiyu_liquid/core/__init__.py
Normal file
44
unilabos/devices/laiyu_liquid/core/__init__.py
Normal file
@@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
LaiYu液体处理设备核心模块
|
||||
|
||||
该模块包含LaiYu液体处理设备的核心功能组件:
|
||||
- LaiYu_Liquid.py: 主设备类和配置管理
|
||||
- abstract_protocol.py: 抽象协议定义
|
||||
- laiyu_liquid_res.py: 设备资源管理
|
||||
|
||||
作者: UniLab团队
|
||||
版本: 2.0.0
|
||||
"""
|
||||
|
||||
from .laiyu_liquid_main import (
|
||||
LaiYuLiquid,
|
||||
LaiYuLiquidConfig,
|
||||
LaiYuLiquidBackend,
|
||||
LaiYuLiquidDeck,
|
||||
LaiYuLiquidContainer,
|
||||
LaiYuLiquidTipRack,
|
||||
create_quick_setup
|
||||
)
|
||||
|
||||
from .laiyu_liquid_res import (
|
||||
LaiYuLiquidDeck,
|
||||
LaiYuLiquidContainer,
|
||||
LaiYuLiquidTipRack
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# 主设备类
|
||||
'LaiYuLiquid',
|
||||
'LaiYuLiquidConfig',
|
||||
'LaiYuLiquidBackend',
|
||||
|
||||
# 设备资源
|
||||
'LaiYuLiquidDeck',
|
||||
'LaiYuLiquidContainer',
|
||||
'LaiYuLiquidTipRack',
|
||||
|
||||
# 工具函数
|
||||
'create_quick_setup'
|
||||
]
|
||||
529
unilabos/devices/laiyu_liquid/core/abstract_protocol.py
Normal file
529
unilabos/devices/laiyu_liquid/core/abstract_protocol.py
Normal file
@@ -0,0 +1,529 @@
|
||||
"""
|
||||
LaiYu_Liquid 抽象协议实现
|
||||
|
||||
该模块提供了液体资源管理和转移的抽象协议,包括:
|
||||
- MaterialResource: 液体资源管理类
|
||||
- transfer_liquid: 液体转移函数
|
||||
- 相关的辅助类和函数
|
||||
|
||||
主要功能:
|
||||
- 管理多孔位的液体资源
|
||||
- 计算和跟踪液体体积
|
||||
- 处理液体转移操作
|
||||
- 提供资源状态查询
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Union, Any, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
import uuid
|
||||
import time
|
||||
|
||||
# pylabrobot 导入
|
||||
from pylabrobot.resources import Resource, Well, Plate
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LiquidType(Enum):
|
||||
"""液体类型枚举"""
|
||||
WATER = "water"
|
||||
ETHANOL = "ethanol"
|
||||
DMSO = "dmso"
|
||||
BUFFER = "buffer"
|
||||
SAMPLE = "sample"
|
||||
REAGENT = "reagent"
|
||||
WASTE = "waste"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LiquidInfo:
|
||||
"""液体信息类"""
|
||||
liquid_type: LiquidType = LiquidType.UNKNOWN
|
||||
volume: float = 0.0 # 体积 (μL)
|
||||
concentration: Optional[float] = None # 浓度 (mg/ml, M等)
|
||||
ph: Optional[float] = None # pH值
|
||||
temperature: Optional[float] = None # 温度 (°C)
|
||||
viscosity: Optional[float] = None # 粘度 (cP)
|
||||
density: Optional[float] = None # 密度 (g/ml)
|
||||
description: str = "" # 描述信息
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.liquid_type.value}({self.description})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class WellContent:
|
||||
"""孔位内容类"""
|
||||
volume: float = 0.0 # 当前体积 (ul)
|
||||
max_volume: float = 1000.0 # 最大容量 (ul)
|
||||
liquid_info: LiquidInfo = field(default_factory=LiquidInfo)
|
||||
last_updated: float = field(default_factory=time.time)
|
||||
|
||||
@property
|
||||
def is_empty(self) -> bool:
|
||||
"""检查是否为空"""
|
||||
return self.volume <= 0.0
|
||||
|
||||
@property
|
||||
def is_full(self) -> bool:
|
||||
"""检查是否已满"""
|
||||
return self.volume >= self.max_volume
|
||||
|
||||
@property
|
||||
def available_volume(self) -> float:
|
||||
"""可用体积"""
|
||||
return max(0.0, self.max_volume - self.volume)
|
||||
|
||||
@property
|
||||
def fill_percentage(self) -> float:
|
||||
"""填充百分比"""
|
||||
return (self.volume / self.max_volume) * 100.0 if self.max_volume > 0 else 0.0
|
||||
|
||||
def can_add_volume(self, volume: float) -> bool:
|
||||
"""检查是否可以添加指定体积"""
|
||||
return (self.volume + volume) <= self.max_volume
|
||||
|
||||
def can_remove_volume(self, volume: float) -> bool:
|
||||
"""检查是否可以移除指定体积"""
|
||||
return self.volume >= volume
|
||||
|
||||
def add_volume(self, volume: float, liquid_info: Optional[LiquidInfo] = None) -> bool:
|
||||
"""
|
||||
添加液体体积
|
||||
|
||||
Args:
|
||||
volume: 要添加的体积 (ul)
|
||||
liquid_info: 液体信息
|
||||
|
||||
Returns:
|
||||
bool: 是否成功添加
|
||||
"""
|
||||
if not self.can_add_volume(volume):
|
||||
return False
|
||||
|
||||
self.volume += volume
|
||||
if liquid_info:
|
||||
self.liquid_info = liquid_info
|
||||
self.last_updated = time.time()
|
||||
return True
|
||||
|
||||
def remove_volume(self, volume: float) -> bool:
|
||||
"""
|
||||
移除液体体积
|
||||
|
||||
Args:
|
||||
volume: 要移除的体积 (ul)
|
||||
|
||||
Returns:
|
||||
bool: 是否成功移除
|
||||
"""
|
||||
if not self.can_remove_volume(volume):
|
||||
return False
|
||||
|
||||
self.volume -= volume
|
||||
self.last_updated = time.time()
|
||||
|
||||
# 如果完全清空,重置液体信息
|
||||
if self.volume <= 0.0:
|
||||
self.volume = 0.0
|
||||
self.liquid_info = LiquidInfo()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class MaterialResource:
|
||||
"""
|
||||
液体资源管理类
|
||||
|
||||
该类用于管理液体处理过程中的资源状态,包括:
|
||||
- 跟踪多个孔位的液体体积和类型
|
||||
- 计算总体积和可用体积
|
||||
- 处理液体的添加和移除
|
||||
- 提供资源状态查询
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
resource: Resource,
|
||||
wells: Optional[List[Well]] = None,
|
||||
default_max_volume: float = 1000.0
|
||||
):
|
||||
"""
|
||||
初始化材料资源
|
||||
|
||||
Args:
|
||||
resource: pylabrobot 资源对象
|
||||
wells: 孔位列表,如果为None则自动获取
|
||||
default_max_volume: 默认最大体积 (ul)
|
||||
"""
|
||||
self.resource = resource
|
||||
self.resource_id = str(uuid.uuid4())
|
||||
self.default_max_volume = default_max_volume
|
||||
|
||||
# 获取孔位列表
|
||||
if wells is None:
|
||||
if hasattr(resource, 'get_wells'):
|
||||
self.wells = resource.get_wells()
|
||||
elif hasattr(resource, 'wells'):
|
||||
self.wells = resource.wells
|
||||
else:
|
||||
# 如果没有孔位,创建一个虚拟孔位
|
||||
self.wells = [resource]
|
||||
else:
|
||||
self.wells = wells
|
||||
|
||||
# 初始化孔位内容
|
||||
self.well_contents: Dict[str, WellContent] = {}
|
||||
for well in self.wells:
|
||||
well_id = self._get_well_id(well)
|
||||
self.well_contents[well_id] = WellContent(
|
||||
max_volume=default_max_volume
|
||||
)
|
||||
|
||||
logger.info(f"初始化材料资源: {resource.name}, 孔位数: {len(self.wells)}")
|
||||
|
||||
def _get_well_id(self, well: Union[Well, Resource]) -> str:
|
||||
"""获取孔位ID"""
|
||||
if hasattr(well, 'name'):
|
||||
return well.name
|
||||
else:
|
||||
return str(id(well))
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""资源名称"""
|
||||
return self.resource.name
|
||||
|
||||
@property
|
||||
def total_volume(self) -> float:
|
||||
"""总液体体积"""
|
||||
return sum(content.volume for content in self.well_contents.values())
|
||||
|
||||
@property
|
||||
def total_max_volume(self) -> float:
|
||||
"""总最大容量"""
|
||||
return sum(content.max_volume for content in self.well_contents.values())
|
||||
|
||||
@property
|
||||
def available_volume(self) -> float:
|
||||
"""总可用体积"""
|
||||
return sum(content.available_volume for content in self.well_contents.values())
|
||||
|
||||
@property
|
||||
def well_count(self) -> int:
|
||||
"""孔位数量"""
|
||||
return len(self.wells)
|
||||
|
||||
@property
|
||||
def empty_wells(self) -> List[str]:
|
||||
"""空孔位列表"""
|
||||
return [well_id for well_id, content in self.well_contents.items()
|
||||
if content.is_empty]
|
||||
|
||||
@property
|
||||
def full_wells(self) -> List[str]:
|
||||
"""满孔位列表"""
|
||||
return [well_id for well_id, content in self.well_contents.items()
|
||||
if content.is_full]
|
||||
|
||||
@property
|
||||
def occupied_wells(self) -> List[str]:
|
||||
"""有液体的孔位列表"""
|
||||
return [well_id for well_id, content in self.well_contents.items()
|
||||
if not content.is_empty]
|
||||
|
||||
def get_well_content(self, well_id: str) -> Optional[WellContent]:
|
||||
"""获取指定孔位的内容"""
|
||||
return self.well_contents.get(well_id)
|
||||
|
||||
def get_well_volume(self, well_id: str) -> float:
|
||||
"""获取指定孔位的体积"""
|
||||
content = self.get_well_content(well_id)
|
||||
return content.volume if content else 0.0
|
||||
|
||||
def set_well_volume(
|
||||
self,
|
||||
well_id: str,
|
||||
volume: float,
|
||||
liquid_info: Optional[LiquidInfo] = None
|
||||
) -> bool:
|
||||
"""
|
||||
设置指定孔位的体积
|
||||
|
||||
Args:
|
||||
well_id: 孔位ID
|
||||
volume: 体积 (ul)
|
||||
liquid_info: 液体信息
|
||||
|
||||
Returns:
|
||||
bool: 是否成功设置
|
||||
"""
|
||||
if well_id not in self.well_contents:
|
||||
logger.error(f"孔位 {well_id} 不存在")
|
||||
return False
|
||||
|
||||
content = self.well_contents[well_id]
|
||||
if volume > content.max_volume:
|
||||
logger.error(f"体积 {volume} 超过最大容量 {content.max_volume}")
|
||||
return False
|
||||
|
||||
content.volume = max(0.0, volume)
|
||||
if liquid_info:
|
||||
content.liquid_info = liquid_info
|
||||
content.last_updated = time.time()
|
||||
|
||||
logger.info(f"设置孔位 {well_id} 体积: {volume}ul")
|
||||
return True
|
||||
|
||||
def add_liquid(
|
||||
self,
|
||||
well_id: str,
|
||||
volume: float,
|
||||
liquid_info: Optional[LiquidInfo] = None
|
||||
) -> bool:
|
||||
"""
|
||||
向指定孔位添加液体
|
||||
|
||||
Args:
|
||||
well_id: 孔位ID
|
||||
volume: 添加的体积 (ul)
|
||||
liquid_info: 液体信息
|
||||
|
||||
Returns:
|
||||
bool: 是否成功添加
|
||||
"""
|
||||
if well_id not in self.well_contents:
|
||||
logger.error(f"孔位 {well_id} 不存在")
|
||||
return False
|
||||
|
||||
content = self.well_contents[well_id]
|
||||
success = content.add_volume(volume, liquid_info)
|
||||
|
||||
if success:
|
||||
logger.info(f"向孔位 {well_id} 添加 {volume}ul 液体")
|
||||
else:
|
||||
logger.error(f"无法向孔位 {well_id} 添加 {volume}ul 液体")
|
||||
|
||||
return success
|
||||
|
||||
def remove_liquid(self, well_id: str, volume: float) -> bool:
|
||||
"""
|
||||
从指定孔位移除液体
|
||||
|
||||
Args:
|
||||
well_id: 孔位ID
|
||||
volume: 移除的体积 (ul)
|
||||
|
||||
Returns:
|
||||
bool: 是否成功移除
|
||||
"""
|
||||
if well_id not in self.well_contents:
|
||||
logger.error(f"孔位 {well_id} 不存在")
|
||||
return False
|
||||
|
||||
content = self.well_contents[well_id]
|
||||
success = content.remove_volume(volume)
|
||||
|
||||
if success:
|
||||
logger.info(f"从孔位 {well_id} 移除 {volume}ul 液体")
|
||||
else:
|
||||
logger.error(f"无法从孔位 {well_id} 移除 {volume}ul 液体")
|
||||
|
||||
return success
|
||||
|
||||
def find_wells_with_volume(self, min_volume: float) -> List[str]:
|
||||
"""
|
||||
查找具有指定最小体积的孔位
|
||||
|
||||
Args:
|
||||
min_volume: 最小体积 (ul)
|
||||
|
||||
Returns:
|
||||
List[str]: 符合条件的孔位ID列表
|
||||
"""
|
||||
return [well_id for well_id, content in self.well_contents.items()
|
||||
if content.volume >= min_volume]
|
||||
|
||||
def find_wells_with_space(self, min_space: float) -> List[str]:
|
||||
"""
|
||||
查找具有指定最小空间的孔位
|
||||
|
||||
Args:
|
||||
min_space: 最小空间 (ul)
|
||||
|
||||
Returns:
|
||||
List[str]: 符合条件的孔位ID列表
|
||||
"""
|
||||
return [well_id for well_id, content in self.well_contents.items()
|
||||
if content.available_volume >= min_space]
|
||||
|
||||
def get_status_summary(self) -> Dict[str, Any]:
|
||||
"""获取资源状态摘要"""
|
||||
return {
|
||||
"resource_name": self.name,
|
||||
"resource_id": self.resource_id,
|
||||
"well_count": self.well_count,
|
||||
"total_volume": self.total_volume,
|
||||
"total_max_volume": self.total_max_volume,
|
||||
"available_volume": self.available_volume,
|
||||
"fill_percentage": (self.total_volume / self.total_max_volume) * 100.0,
|
||||
"empty_wells": len(self.empty_wells),
|
||||
"full_wells": len(self.full_wells),
|
||||
"occupied_wells": len(self.occupied_wells)
|
||||
}
|
||||
|
||||
def get_detailed_status(self) -> Dict[str, Any]:
|
||||
"""获取详细状态信息"""
|
||||
well_details = {}
|
||||
for well_id, content in self.well_contents.items():
|
||||
well_details[well_id] = {
|
||||
"volume": content.volume,
|
||||
"max_volume": content.max_volume,
|
||||
"available_volume": content.available_volume,
|
||||
"fill_percentage": content.fill_percentage,
|
||||
"liquid_type": content.liquid_info.liquid_type.value,
|
||||
"description": content.liquid_info.description,
|
||||
"last_updated": content.last_updated
|
||||
}
|
||||
|
||||
return {
|
||||
"summary": self.get_status_summary(),
|
||||
"wells": well_details
|
||||
}
|
||||
|
||||
|
||||
def transfer_liquid(
|
||||
source: MaterialResource,
|
||||
target: MaterialResource,
|
||||
volume: float,
|
||||
source_well_id: Optional[str] = None,
|
||||
target_well_id: Optional[str] = None,
|
||||
liquid_info: Optional[LiquidInfo] = None
|
||||
) -> bool:
|
||||
"""
|
||||
在两个材料资源之间转移液体
|
||||
|
||||
Args:
|
||||
source: 源资源
|
||||
target: 目标资源
|
||||
volume: 转移体积 (ul)
|
||||
source_well_id: 源孔位ID,如果为None则自动选择
|
||||
target_well_id: 目标孔位ID,如果为None则自动选择
|
||||
liquid_info: 液体信息
|
||||
|
||||
Returns:
|
||||
bool: 转移是否成功
|
||||
"""
|
||||
try:
|
||||
# 自动选择源孔位
|
||||
if source_well_id is None:
|
||||
available_wells = source.find_wells_with_volume(volume)
|
||||
if not available_wells:
|
||||
logger.error(f"源资源 {source.name} 没有足够体积的孔位")
|
||||
return False
|
||||
source_well_id = available_wells[0]
|
||||
|
||||
# 自动选择目标孔位
|
||||
if target_well_id is None:
|
||||
available_wells = target.find_wells_with_space(volume)
|
||||
if not available_wells:
|
||||
logger.error(f"目标资源 {target.name} 没有足够空间的孔位")
|
||||
return False
|
||||
target_well_id = available_wells[0]
|
||||
|
||||
# 检查源孔位是否有足够液体
|
||||
if not source.get_well_content(source_well_id).can_remove_volume(volume):
|
||||
logger.error(f"源孔位 {source_well_id} 液体不足")
|
||||
return False
|
||||
|
||||
# 检查目标孔位是否有足够空间
|
||||
if not target.get_well_content(target_well_id).can_add_volume(volume):
|
||||
logger.error(f"目标孔位 {target_well_id} 空间不足")
|
||||
return False
|
||||
|
||||
# 获取源液体信息
|
||||
source_content = source.get_well_content(source_well_id)
|
||||
transfer_liquid_info = liquid_info or source_content.liquid_info
|
||||
|
||||
# 执行转移
|
||||
if source.remove_liquid(source_well_id, volume):
|
||||
if target.add_liquid(target_well_id, volume, transfer_liquid_info):
|
||||
logger.info(f"成功转移 {volume}ul 液体: {source.name}[{source_well_id}] -> {target.name}[{target_well_id}]")
|
||||
return True
|
||||
else:
|
||||
# 如果目标添加失败,回滚源操作
|
||||
source.add_liquid(source_well_id, volume, source_content.liquid_info)
|
||||
logger.error("目标添加失败,已回滚源操作")
|
||||
return False
|
||||
else:
|
||||
logger.error("源移除失败")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"液体转移失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def create_material_resource(
|
||||
name: str,
|
||||
resource: Resource,
|
||||
initial_volumes: Optional[Dict[str, float]] = None,
|
||||
liquid_info: Optional[LiquidInfo] = None,
|
||||
max_volume: float = 1000.0
|
||||
) -> MaterialResource:
|
||||
"""
|
||||
创建材料资源的便捷函数
|
||||
|
||||
Args:
|
||||
name: 资源名称
|
||||
resource: pylabrobot 资源对象
|
||||
initial_volumes: 初始体积字典 {well_id: volume}
|
||||
liquid_info: 液体信息
|
||||
max_volume: 最大体积
|
||||
|
||||
Returns:
|
||||
MaterialResource: 创建的材料资源
|
||||
"""
|
||||
material_resource = MaterialResource(
|
||||
resource=resource,
|
||||
default_max_volume=max_volume
|
||||
)
|
||||
|
||||
# 设置初始体积
|
||||
if initial_volumes:
|
||||
for well_id, volume in initial_volumes.items():
|
||||
material_resource.set_well_volume(well_id, volume, liquid_info)
|
||||
|
||||
return material_resource
|
||||
|
||||
|
||||
def batch_transfer_liquid(
|
||||
transfers: List[Tuple[MaterialResource, MaterialResource, float]],
|
||||
liquid_info: Optional[LiquidInfo] = None
|
||||
) -> List[bool]:
|
||||
"""
|
||||
批量液体转移
|
||||
|
||||
Args:
|
||||
transfers: 转移列表 [(source, target, volume), ...]
|
||||
liquid_info: 液体信息
|
||||
|
||||
Returns:
|
||||
List[bool]: 每个转移操作的结果
|
||||
"""
|
||||
results = []
|
||||
|
||||
for source, target, volume in transfers:
|
||||
result = transfer_liquid(source, target, volume, liquid_info=liquid_info)
|
||||
results.append(result)
|
||||
|
||||
if not result:
|
||||
logger.warning(f"批量转移中的操作失败: {source.name} -> {target.name}")
|
||||
|
||||
success_count = sum(results)
|
||||
logger.info(f"批量转移完成: {success_count}/{len(transfers)} 成功")
|
||||
|
||||
return results
|
||||
881
unilabos/devices/laiyu_liquid/core/laiyu_liquid_main.py
Normal file
881
unilabos/devices/laiyu_liquid/core/laiyu_liquid_main.py
Normal file
@@ -0,0 +1,881 @@
|
||||
"""
|
||||
LaiYu_Liquid 液体处理工作站主要集成文件
|
||||
|
||||
该模块实现了 LaiYu_Liquid 与 UniLabOS 系统的集成,提供标准化的液体处理接口。
|
||||
主要包含:
|
||||
- LaiYuLiquidBackend: 硬件通信后端
|
||||
- LaiYuLiquid: 主要接口类
|
||||
- 相关的异常类和容器类
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import List, Optional, Dict, Any, Union, Tuple
|
||||
from dataclasses import dataclass
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
# 基础导入
|
||||
try:
|
||||
from pylabrobot.resources import Deck, Plate, TipRack, Tip, Resource, Well
|
||||
PYLABROBOT_AVAILABLE = True
|
||||
except ImportError:
|
||||
# 如果 pylabrobot 不可用,创建基础的模拟类
|
||||
PYLABROBOT_AVAILABLE = False
|
||||
|
||||
class Resource:
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
|
||||
class Deck(Resource):
|
||||
pass
|
||||
|
||||
class Plate(Resource):
|
||||
pass
|
||||
|
||||
class TipRack(Resource):
|
||||
pass
|
||||
|
||||
class Tip(Resource):
|
||||
pass
|
||||
|
||||
class Well(Resource):
|
||||
pass
|
||||
|
||||
# LaiYu_Liquid 控制器导入
|
||||
try:
|
||||
from .controllers.pipette_controller import (
|
||||
PipetteController, TipStatus, LiquidClass, LiquidParameters
|
||||
)
|
||||
from .controllers.xyz_controller import (
|
||||
XYZController, MachineConfig, CoordinateOrigin, MotorAxis
|
||||
)
|
||||
CONTROLLERS_AVAILABLE = True
|
||||
except ImportError:
|
||||
CONTROLLERS_AVAILABLE = False
|
||||
# 创建模拟的控制器类
|
||||
class PipetteController:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def connect(self):
|
||||
return True
|
||||
|
||||
def initialize(self):
|
||||
return True
|
||||
|
||||
class XYZController:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def connect_device(self):
|
||||
return True
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LaiYuLiquidError(RuntimeError):
|
||||
"""LaiYu_Liquid 设备异常"""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LaiYuLiquidConfig:
|
||||
"""LaiYu_Liquid 设备配置"""
|
||||
port: str = "/dev/cu.usbserial-3130" # RS485转USB端口
|
||||
address: int = 1 # 设备地址
|
||||
baudrate: int = 9600 # 波特率
|
||||
timeout: float = 5.0 # 通信超时时间
|
||||
|
||||
# 工作台尺寸
|
||||
deck_width: float = 340.0 # 工作台宽度 (mm)
|
||||
deck_height: float = 250.0 # 工作台高度 (mm)
|
||||
deck_depth: float = 160.0 # 工作台深度 (mm)
|
||||
|
||||
# 移液参数
|
||||
max_volume: float = 1000.0 # 最大体积 (μL)
|
||||
min_volume: float = 0.1 # 最小体积 (μL)
|
||||
|
||||
# 运动参数
|
||||
max_speed: float = 100.0 # 最大速度 (mm/s)
|
||||
acceleration: float = 50.0 # 加速度 (mm/s²)
|
||||
|
||||
# 安全参数
|
||||
safe_height: float = 50.0 # 安全高度 (mm)
|
||||
tip_pickup_depth: float = 10.0 # 吸头拾取深度 (mm)
|
||||
liquid_detection: bool = True # 液面检测
|
||||
|
||||
# 取枪头相关参数
|
||||
tip_pickup_speed: int = 30 # 取枪头时的移动速度 (rpm)
|
||||
tip_pickup_acceleration: int = 500 # 取枪头时的加速度 (rpm/s)
|
||||
tip_approach_height: float = 5.0 # 接近枪头时的高度 (mm)
|
||||
tip_pickup_force_depth: float = 2.0 # 强制插入深度 (mm)
|
||||
tip_pickup_retract_height: float = 20.0 # 取枪头后的回退高度 (mm)
|
||||
|
||||
# 丢弃枪头相关参数
|
||||
tip_drop_height: float = 10.0 # 丢弃枪头时的高度 (mm)
|
||||
tip_drop_speed: int = 50 # 丢弃枪头时的移动速度 (rpm)
|
||||
trash_position: Tuple[float, float, float] = (300.0, 200.0, 0.0) # 垃圾桶位置 (mm)
|
||||
|
||||
# 安全范围配置
|
||||
deck_width: float = 300.0 # 工作台宽度 (mm)
|
||||
deck_height: float = 200.0 # 工作台高度 (mm)
|
||||
deck_depth: float = 100.0 # 工作台深度 (mm)
|
||||
safe_height: float = 50.0 # 安全高度 (mm)
|
||||
position_validation: bool = True # 启用位置验证
|
||||
emergency_stop_enabled: bool = True # 启用紧急停止
|
||||
|
||||
|
||||
class LaiYuLiquidDeck:
|
||||
"""LaiYu_Liquid 工作台管理"""
|
||||
|
||||
def __init__(self, config: LaiYuLiquidConfig):
|
||||
self.config = config
|
||||
self.resources: Dict[str, Resource] = {}
|
||||
self.positions: Dict[str, Tuple[float, float, float]] = {}
|
||||
|
||||
def add_resource(self, name: str, resource: Resource, position: Tuple[float, float, float]):
|
||||
"""添加资源到工作台"""
|
||||
self.resources[name] = resource
|
||||
self.positions[name] = position
|
||||
|
||||
def get_resource(self, name: str) -> Optional[Resource]:
|
||||
"""获取资源"""
|
||||
return self.resources.get(name)
|
||||
|
||||
def get_position(self, name: str) -> Optional[Tuple[float, float, float]]:
|
||||
"""获取资源位置"""
|
||||
return self.positions.get(name)
|
||||
|
||||
def list_resources(self) -> List[str]:
|
||||
"""列出所有资源"""
|
||||
return list(self.resources.keys())
|
||||
|
||||
|
||||
class LaiYuLiquidContainer:
|
||||
"""LaiYu_Liquid 容器类"""
|
||||
|
||||
def __init__(self, name: str, size_x: float = 0, size_y: float = 0, size_z: float = 0, container_type: str = "", volume: float = 0.0, max_volume: float = 1000.0, lid_height: float = 0.0):
|
||||
self.name = name
|
||||
self.size_x = size_x
|
||||
self.size_y = size_y
|
||||
self.size_z = size_z
|
||||
self.lid_height = lid_height
|
||||
self.container_type = container_type
|
||||
self.volume = volume
|
||||
self.max_volume = max_volume
|
||||
self.last_updated = time.time()
|
||||
self.child_resources = {} # 存储子资源
|
||||
|
||||
@property
|
||||
def is_empty(self) -> bool:
|
||||
return self.volume <= 0.0
|
||||
|
||||
@property
|
||||
def is_full(self) -> bool:
|
||||
return self.volume >= self.max_volume
|
||||
|
||||
@property
|
||||
def available_volume(self) -> float:
|
||||
return max(0.0, self.max_volume - self.volume)
|
||||
|
||||
def add_volume(self, volume: float) -> bool:
|
||||
"""添加体积"""
|
||||
if self.volume + volume <= self.max_volume:
|
||||
self.volume += volume
|
||||
self.last_updated = time.time()
|
||||
return True
|
||||
return False
|
||||
|
||||
def remove_volume(self, volume: float) -> bool:
|
||||
"""移除体积"""
|
||||
if self.volume >= volume:
|
||||
self.volume -= volume
|
||||
self.last_updated = time.time()
|
||||
return True
|
||||
return False
|
||||
|
||||
def assign_child_resource(self, resource, location=None):
|
||||
"""分配子资源 - 与 PyLabRobot 资源管理系统兼容"""
|
||||
if hasattr(resource, 'name'):
|
||||
self.child_resources[resource.name] = {
|
||||
'resource': resource,
|
||||
'location': location
|
||||
}
|
||||
|
||||
|
||||
class LaiYuLiquidTipRack:
|
||||
"""LaiYu_Liquid 吸头架类"""
|
||||
|
||||
def __init__(self, name: str, size_x: float = 0, size_y: float = 0, size_z: float = 0, tip_count: int = 96, tip_volume: float = 1000.0):
|
||||
self.name = name
|
||||
self.size_x = size_x
|
||||
self.size_y = size_y
|
||||
self.size_z = size_z
|
||||
self.tip_count = tip_count
|
||||
self.tip_volume = tip_volume
|
||||
self.tips_available = [True] * tip_count
|
||||
self.child_resources = {} # 存储子资源
|
||||
|
||||
@property
|
||||
def available_tips(self) -> int:
|
||||
return sum(self.tips_available)
|
||||
|
||||
@property
|
||||
def is_empty(self) -> bool:
|
||||
return self.available_tips == 0
|
||||
|
||||
def pick_tip(self, position: int) -> bool:
|
||||
"""拾取吸头"""
|
||||
if 0 <= position < self.tip_count and self.tips_available[position]:
|
||||
self.tips_available[position] = False
|
||||
return True
|
||||
return False
|
||||
|
||||
def has_tip(self, position: int) -> bool:
|
||||
"""检查位置是否有吸头"""
|
||||
if 0 <= position < self.tip_count:
|
||||
return self.tips_available[position]
|
||||
return False
|
||||
|
||||
def assign_child_resource(self, resource, location=None):
|
||||
"""分配子资源到指定位置"""
|
||||
self.child_resources[resource.name] = {
|
||||
'resource': resource,
|
||||
'location': location
|
||||
}
|
||||
|
||||
|
||||
def get_module_info():
|
||||
"""获取模块信息"""
|
||||
return {
|
||||
"name": "LaiYu_Liquid",
|
||||
"version": "1.0.0",
|
||||
"description": "LaiYu液体处理工作站模块,提供移液器控制、XYZ轴控制和资源管理功能",
|
||||
"author": "UniLabOS Team",
|
||||
"capabilities": [
|
||||
"移液器控制",
|
||||
"XYZ轴运动控制",
|
||||
"吸头架管理",
|
||||
"板和容器管理",
|
||||
"资源位置管理"
|
||||
],
|
||||
"dependencies": {
|
||||
"required": ["serial"],
|
||||
"optional": ["pylabrobot"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class LaiYuLiquidBackend:
|
||||
"""LaiYu_Liquid 硬件通信后端"""
|
||||
|
||||
def __init__(self, config: LaiYuLiquidConfig, deck: Optional['LaiYuLiquidDeck'] = None):
|
||||
self.config = config
|
||||
self.deck = deck # 工作台引用,用于获取资源位置信息
|
||||
self.pipette_controller = None
|
||||
self.xyz_controller = None
|
||||
self.is_connected = False
|
||||
self.is_initialized = False
|
||||
|
||||
# 状态跟踪
|
||||
self.current_position = (0.0, 0.0, 0.0)
|
||||
self.tip_attached = False
|
||||
self.current_volume = 0.0
|
||||
|
||||
def _validate_position(self, x: float, y: float, z: float) -> bool:
|
||||
"""验证位置是否在安全范围内"""
|
||||
try:
|
||||
# 检查X轴范围
|
||||
if not (0 <= x <= self.config.deck_width):
|
||||
logger.error(f"X轴位置 {x:.2f}mm 超出范围 [0, {self.config.deck_width}]")
|
||||
return False
|
||||
|
||||
# 检查Y轴范围
|
||||
if not (0 <= y <= self.config.deck_height):
|
||||
logger.error(f"Y轴位置 {y:.2f}mm 超出范围 [0, {self.config.deck_height}]")
|
||||
return False
|
||||
|
||||
# 检查Z轴范围(负值表示向下,0为工作台表面)
|
||||
if not (-self.config.deck_depth <= z <= self.config.safe_height):
|
||||
logger.error(f"Z轴位置 {z:.2f}mm 超出安全范围 [{-self.config.deck_depth}, {self.config.safe_height}]")
|
||||
return False
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"位置验证失败: {e}")
|
||||
return False
|
||||
|
||||
def _check_hardware_ready(self) -> bool:
|
||||
"""检查硬件是否准备就绪"""
|
||||
if not self.is_connected:
|
||||
logger.error("设备未连接")
|
||||
return False
|
||||
|
||||
if CONTROLLERS_AVAILABLE:
|
||||
if self.xyz_controller is None:
|
||||
logger.error("XYZ控制器未初始化")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def emergency_stop(self) -> bool:
|
||||
"""紧急停止所有运动"""
|
||||
try:
|
||||
logger.warning("执行紧急停止")
|
||||
|
||||
if CONTROLLERS_AVAILABLE and self.xyz_controller:
|
||||
# 停止XYZ控制器
|
||||
await self.xyz_controller.stop_all_motion()
|
||||
logger.info("XYZ控制器已停止")
|
||||
|
||||
if self.pipette_controller:
|
||||
# 停止移液器控制器
|
||||
await self.pipette_controller.stop()
|
||||
logger.info("移液器控制器已停止")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"紧急停止失败: {e}")
|
||||
return False
|
||||
|
||||
async def move_to_safe_position(self) -> bool:
|
||||
"""移动到安全位置"""
|
||||
try:
|
||||
if not self._check_hardware_ready():
|
||||
return False
|
||||
|
||||
safe_position = (
|
||||
self.config.deck_width / 2, # 工作台中心X
|
||||
self.config.deck_height / 2, # 工作台中心Y
|
||||
self.config.safe_height # 安全高度Z
|
||||
)
|
||||
|
||||
if not self._validate_position(*safe_position):
|
||||
logger.error("安全位置无效")
|
||||
return False
|
||||
|
||||
if CONTROLLERS_AVAILABLE and self.xyz_controller:
|
||||
await self.xyz_controller.move_to_work_coord(*safe_position)
|
||||
self.current_position = safe_position
|
||||
logger.info(f"已移动到安全位置: {safe_position}")
|
||||
return True
|
||||
else:
|
||||
# 模拟模式
|
||||
self.current_position = safe_position
|
||||
logger.info("模拟移动到安全位置")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"移动到安全位置失败: {e}")
|
||||
return False
|
||||
|
||||
async def setup(self) -> bool:
|
||||
"""设置硬件连接"""
|
||||
try:
|
||||
if CONTROLLERS_AVAILABLE:
|
||||
# 初始化移液器控制器
|
||||
self.pipette_controller = PipetteController(
|
||||
port=self.config.port,
|
||||
address=self.config.address
|
||||
)
|
||||
|
||||
# 初始化XYZ控制器
|
||||
machine_config = MachineConfig()
|
||||
self.xyz_controller = XYZController(
|
||||
port=self.config.port,
|
||||
baudrate=self.config.baudrate,
|
||||
machine_config=machine_config
|
||||
)
|
||||
|
||||
# 连接设备
|
||||
pipette_connected = await asyncio.to_thread(self.pipette_controller.connect)
|
||||
xyz_connected = await asyncio.to_thread(self.xyz_controller.connect_device)
|
||||
|
||||
if pipette_connected and xyz_connected:
|
||||
self.is_connected = True
|
||||
logger.info("LaiYu_Liquid 硬件连接成功")
|
||||
return True
|
||||
else:
|
||||
logger.error("LaiYu_Liquid 硬件连接失败")
|
||||
return False
|
||||
else:
|
||||
# 模拟模式
|
||||
logger.info("LaiYu_Liquid 运行在模拟模式")
|
||||
self.is_connected = True
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"LaiYu_Liquid 设置失败: {e}")
|
||||
return False
|
||||
|
||||
async def stop(self):
|
||||
"""停止设备"""
|
||||
try:
|
||||
if self.pipette_controller and hasattr(self.pipette_controller, 'disconnect'):
|
||||
await asyncio.to_thread(self.pipette_controller.disconnect)
|
||||
|
||||
if self.xyz_controller and hasattr(self.xyz_controller, 'disconnect'):
|
||||
await asyncio.to_thread(self.xyz_controller.disconnect)
|
||||
|
||||
self.is_connected = False
|
||||
self.is_initialized = False
|
||||
logger.info("LaiYu_Liquid 已停止")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"LaiYu_Liquid 停止失败: {e}")
|
||||
|
||||
async def move_to(self, x: float, y: float, z: float) -> bool:
|
||||
"""移动到指定位置"""
|
||||
try:
|
||||
if not self.is_connected:
|
||||
raise LaiYuLiquidError("设备未连接")
|
||||
|
||||
# 模拟移动
|
||||
await asyncio.sleep(0.1) # 模拟移动时间
|
||||
self.current_position = (x, y, z)
|
||||
logger.debug(f"移动到位置: ({x}, {y}, {z})")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"移动失败: {e}")
|
||||
return False
|
||||
|
||||
async def pick_up_tip(self, tip_rack: str, position: int) -> bool:
|
||||
"""拾取吸头 - 包含真正的Z轴下降控制"""
|
||||
try:
|
||||
# 硬件准备检查
|
||||
if not self._check_hardware_ready():
|
||||
return False
|
||||
|
||||
if self.tip_attached:
|
||||
logger.warning("已有吸头附着,无法拾取新吸头")
|
||||
return False
|
||||
|
||||
logger.info(f"开始从 {tip_rack} 位置 {position} 拾取吸头")
|
||||
|
||||
# 获取枪头架位置信息
|
||||
if self.deck is None:
|
||||
logger.error("工作台未初始化")
|
||||
return False
|
||||
|
||||
tip_position = self.deck.get_position(tip_rack)
|
||||
if tip_position is None:
|
||||
logger.error(f"未找到枪头架 {tip_rack} 的位置信息")
|
||||
return False
|
||||
|
||||
# 计算具体枪头位置(这里简化处理,实际应根据position计算偏移)
|
||||
tip_x, tip_y, tip_z = tip_position
|
||||
|
||||
# 验证所有关键位置的安全性
|
||||
safe_z = tip_z + self.config.tip_approach_height
|
||||
pickup_z = tip_z - self.config.tip_pickup_force_depth
|
||||
retract_z = tip_z + self.config.tip_pickup_retract_height
|
||||
|
||||
if not (self._validate_position(tip_x, tip_y, safe_z) and
|
||||
self._validate_position(tip_x, tip_y, pickup_z) and
|
||||
self._validate_position(tip_x, tip_y, retract_z)):
|
||||
logger.error("枪头拾取位置超出安全范围")
|
||||
return False
|
||||
|
||||
if CONTROLLERS_AVAILABLE and self.xyz_controller:
|
||||
# 真实硬件控制流程
|
||||
logger.info("使用真实XYZ控制器进行枪头拾取")
|
||||
|
||||
try:
|
||||
# 1. 移动到枪头上方的安全位置
|
||||
safe_z = tip_z + self.config.tip_approach_height
|
||||
logger.info(f"移动到枪头上方安全位置: ({tip_x:.2f}, {tip_y:.2f}, {safe_z:.2f})")
|
||||
move_success = await asyncio.to_thread(
|
||||
self.xyz_controller.move_to_work_coord,
|
||||
tip_x, tip_y, safe_z
|
||||
)
|
||||
if not move_success:
|
||||
logger.error("移动到枪头上方失败")
|
||||
return False
|
||||
|
||||
# 2. Z轴下降到枪头位置
|
||||
pickup_z = tip_z - self.config.tip_pickup_force_depth
|
||||
logger.info(f"Z轴下降到枪头拾取位置: {pickup_z:.2f}mm")
|
||||
z_down_success = await asyncio.to_thread(
|
||||
self.xyz_controller.move_to_work_coord,
|
||||
tip_x, tip_y, pickup_z
|
||||
)
|
||||
if not z_down_success:
|
||||
logger.error("Z轴下降到枪头位置失败")
|
||||
return False
|
||||
|
||||
# 3. 等待一小段时间确保枪头牢固附着
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
# 4. Z轴上升到回退高度
|
||||
retract_z = tip_z + self.config.tip_pickup_retract_height
|
||||
logger.info(f"Z轴上升到回退高度: {retract_z:.2f}mm")
|
||||
z_up_success = await asyncio.to_thread(
|
||||
self.xyz_controller.move_to_work_coord,
|
||||
tip_x, tip_y, retract_z
|
||||
)
|
||||
if not z_up_success:
|
||||
logger.error("Z轴上升失败")
|
||||
return False
|
||||
|
||||
# 5. 更新当前位置
|
||||
self.current_position = (tip_x, tip_y, retract_z)
|
||||
|
||||
except Exception as move_error:
|
||||
logger.error(f"枪头拾取过程中发生错误: {move_error}")
|
||||
# 尝试移动到安全位置
|
||||
if self.config.emergency_stop_enabled:
|
||||
await self.emergency_stop()
|
||||
await self.move_to_safe_position()
|
||||
return False
|
||||
|
||||
else:
|
||||
# 模拟模式
|
||||
logger.info("模拟模式:执行枪头拾取动作")
|
||||
await asyncio.sleep(1.0) # 模拟整个拾取过程的时间
|
||||
self.current_position = (tip_x, tip_y, tip_z + self.config.tip_pickup_retract_height)
|
||||
|
||||
# 6. 标记枪头已附着
|
||||
self.tip_attached = True
|
||||
logger.info("吸头拾取成功")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"拾取吸头失败: {e}")
|
||||
return False
|
||||
|
||||
async def drop_tip(self, location: str = "trash") -> bool:
|
||||
"""丢弃吸头 - 包含真正的Z轴控制"""
|
||||
try:
|
||||
# 硬件准备检查
|
||||
if not self._check_hardware_ready():
|
||||
return False
|
||||
|
||||
if not self.tip_attached:
|
||||
logger.warning("没有吸头附着,无需丢弃")
|
||||
return True
|
||||
|
||||
logger.info(f"开始丢弃吸头到 {location}")
|
||||
|
||||
# 确定丢弃位置
|
||||
if location == "trash":
|
||||
# 使用配置中的垃圾桶位置
|
||||
drop_x, drop_y, drop_z = self.config.trash_position
|
||||
else:
|
||||
# 尝试从deck获取指定位置
|
||||
if self.deck is None:
|
||||
logger.error("工作台未初始化")
|
||||
return False
|
||||
|
||||
drop_position = self.deck.get_position(location)
|
||||
if drop_position is None:
|
||||
logger.error(f"未找到丢弃位置 {location} 的信息")
|
||||
return False
|
||||
drop_x, drop_y, drop_z = drop_position
|
||||
|
||||
# 验证丢弃位置的安全性
|
||||
safe_z = drop_z + self.config.safe_height
|
||||
drop_height_z = drop_z + self.config.tip_drop_height
|
||||
|
||||
if not (self._validate_position(drop_x, drop_y, safe_z) and
|
||||
self._validate_position(drop_x, drop_y, drop_height_z)):
|
||||
logger.error("枪头丢弃位置超出安全范围")
|
||||
return False
|
||||
|
||||
if CONTROLLERS_AVAILABLE and self.xyz_controller:
|
||||
# 真实硬件控制流程
|
||||
logger.info("使用真实XYZ控制器进行枪头丢弃")
|
||||
|
||||
try:
|
||||
# 1. 移动到丢弃位置上方的安全高度
|
||||
safe_z = drop_z + self.config.tip_drop_height
|
||||
logger.info(f"移动到丢弃位置上方: ({drop_x:.2f}, {drop_y:.2f}, {safe_z:.2f})")
|
||||
move_success = await asyncio.to_thread(
|
||||
self.xyz_controller.move_to_work_coord,
|
||||
drop_x, drop_y, safe_z
|
||||
)
|
||||
if not move_success:
|
||||
logger.error("移动到丢弃位置上方失败")
|
||||
return False
|
||||
|
||||
# 2. Z轴下降到丢弃高度
|
||||
logger.info(f"Z轴下降到丢弃高度: {drop_z:.2f}mm")
|
||||
z_down_success = await asyncio.to_thread(
|
||||
self.xyz_controller.move_to_work_coord,
|
||||
drop_x, drop_y, drop_z
|
||||
)
|
||||
if not z_down_success:
|
||||
logger.error("Z轴下降到丢弃位置失败")
|
||||
return False
|
||||
|
||||
# 3. 执行枪头弹出动作(如果有移液器控制器)
|
||||
if self.pipette_controller:
|
||||
try:
|
||||
# 发送弹出枪头命令
|
||||
await asyncio.to_thread(self.pipette_controller.eject_tip)
|
||||
logger.info("执行枪头弹出命令")
|
||||
except Exception as e:
|
||||
logger.warning(f"枪头弹出命令失败: {e}")
|
||||
|
||||
# 4. 等待一小段时间确保枪头完全脱离
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# 5. Z轴上升到安全高度
|
||||
logger.info(f"Z轴上升到安全高度: {safe_z:.2f}mm")
|
||||
z_up_success = await asyncio.to_thread(
|
||||
self.xyz_controller.move_to_work_coord,
|
||||
drop_x, drop_y, safe_z
|
||||
)
|
||||
if not z_up_success:
|
||||
logger.error("Z轴上升失败")
|
||||
return False
|
||||
|
||||
# 6. 更新当前位置
|
||||
self.current_position = (drop_x, drop_y, safe_z)
|
||||
|
||||
except Exception as drop_error:
|
||||
logger.error(f"枪头丢弃过程中发生错误: {drop_error}")
|
||||
# 尝试移动到安全位置
|
||||
if self.config.emergency_stop_enabled:
|
||||
await self.emergency_stop()
|
||||
await self.move_to_safe_position()
|
||||
return False
|
||||
|
||||
else:
|
||||
# 模拟模式
|
||||
logger.info("模拟模式:执行枪头丢弃动作")
|
||||
await asyncio.sleep(0.8) # 模拟整个丢弃过程的时间
|
||||
self.current_position = (drop_x, drop_y, drop_z + self.config.tip_drop_height)
|
||||
|
||||
# 7. 标记枪头已脱离,清空体积
|
||||
self.tip_attached = False
|
||||
self.current_volume = 0.0
|
||||
logger.info("吸头丢弃成功")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"丢弃吸头失败: {e}")
|
||||
return False
|
||||
|
||||
async def aspirate(self, volume: float, location: str) -> bool:
|
||||
"""吸取液体"""
|
||||
try:
|
||||
if not self.is_connected:
|
||||
raise LaiYuLiquidError("设备未连接")
|
||||
|
||||
if not self.tip_attached:
|
||||
raise LaiYuLiquidError("没有吸头附着")
|
||||
|
||||
if volume <= 0 or volume > self.config.max_volume:
|
||||
raise LaiYuLiquidError(f"体积超出范围: {volume}")
|
||||
|
||||
# 模拟吸取
|
||||
await asyncio.sleep(0.3)
|
||||
self.current_volume += volume
|
||||
logger.debug(f"从 {location} 吸取 {volume} μL")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"吸取失败: {e}")
|
||||
return False
|
||||
|
||||
async def dispense(self, volume: float, location: str) -> bool:
|
||||
"""分配液体"""
|
||||
try:
|
||||
if not self.is_connected:
|
||||
raise LaiYuLiquidError("设备未连接")
|
||||
|
||||
if not self.tip_attached:
|
||||
raise LaiYuLiquidError("没有吸头附着")
|
||||
|
||||
if volume <= 0 or volume > self.current_volume:
|
||||
raise LaiYuLiquidError(f"分配体积无效: {volume}")
|
||||
|
||||
# 模拟分配
|
||||
await asyncio.sleep(0.3)
|
||||
self.current_volume -= volume
|
||||
logger.debug(f"向 {location} 分配 {volume} μL")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"分配失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class LaiYuLiquid:
|
||||
"""LaiYu_Liquid 主要接口类"""
|
||||
|
||||
def __init__(self, config: Optional[LaiYuLiquidConfig] = None, **kwargs):
|
||||
# 如果传入了关键字参数,创建配置对象
|
||||
if kwargs and config is None:
|
||||
# 从kwargs中提取配置参数
|
||||
config_params = {}
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(LaiYuLiquidConfig, key):
|
||||
config_params[key] = value
|
||||
self.config = LaiYuLiquidConfig(**config_params)
|
||||
else:
|
||||
self.config = config or LaiYuLiquidConfig()
|
||||
|
||||
# 先创建deck,然后传递给backend
|
||||
self.deck = LaiYuLiquidDeck(self.config)
|
||||
self.backend = LaiYuLiquidBackend(self.config, self.deck)
|
||||
self.is_setup = False
|
||||
|
||||
@property
|
||||
def current_position(self) -> Tuple[float, float, float]:
|
||||
"""获取当前位置"""
|
||||
return self.backend.current_position
|
||||
|
||||
@property
|
||||
def current_volume(self) -> float:
|
||||
"""获取当前体积"""
|
||||
return self.backend.current_volume
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""获取连接状态"""
|
||||
return self.backend.is_connected
|
||||
|
||||
@property
|
||||
def is_initialized(self) -> bool:
|
||||
"""获取初始化状态"""
|
||||
return self.backend.is_initialized
|
||||
|
||||
@property
|
||||
def tip_attached(self) -> bool:
|
||||
"""获取吸头附着状态"""
|
||||
return self.backend.tip_attached
|
||||
|
||||
async def setup(self) -> bool:
|
||||
"""设置液体处理器"""
|
||||
try:
|
||||
success = await self.backend.setup()
|
||||
if success:
|
||||
self.is_setup = True
|
||||
logger.info("LaiYu_Liquid 设置完成")
|
||||
return success
|
||||
except Exception as e:
|
||||
logger.error(f"LaiYu_Liquid 设置失败: {e}")
|
||||
return False
|
||||
|
||||
async def stop(self):
|
||||
"""停止液体处理器"""
|
||||
await self.backend.stop()
|
||||
self.is_setup = False
|
||||
|
||||
async def transfer(self, source: str, target: str, volume: float,
|
||||
tip_rack: str = "tip_rack_1", tip_position: int = 0) -> bool:
|
||||
"""液体转移"""
|
||||
try:
|
||||
if not self.is_setup:
|
||||
raise LaiYuLiquidError("设备未设置")
|
||||
|
||||
# 获取源和目标位置
|
||||
source_pos = self.deck.get_position(source)
|
||||
target_pos = self.deck.get_position(target)
|
||||
tip_pos = self.deck.get_position(tip_rack)
|
||||
|
||||
if not all([source_pos, target_pos, tip_pos]):
|
||||
raise LaiYuLiquidError("位置信息不完整")
|
||||
|
||||
# 执行转移步骤
|
||||
steps = [
|
||||
("移动到吸头架", self.backend.move_to(*tip_pos)),
|
||||
("拾取吸头", self.backend.pick_up_tip(tip_rack, tip_position)),
|
||||
("移动到源位置", self.backend.move_to(*source_pos)),
|
||||
("吸取液体", self.backend.aspirate(volume, source)),
|
||||
("移动到目标位置", self.backend.move_to(*target_pos)),
|
||||
("分配液体", self.backend.dispense(volume, target)),
|
||||
("丢弃吸头", self.backend.drop_tip())
|
||||
]
|
||||
|
||||
for step_name, step_coro in steps:
|
||||
logger.debug(f"执行步骤: {step_name}")
|
||||
success = await step_coro
|
||||
if not success:
|
||||
raise LaiYuLiquidError(f"步骤失败: {step_name}")
|
||||
|
||||
logger.info(f"液体转移完成: {source} -> {target}, {volume} μL")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"液体转移失败: {e}")
|
||||
return False
|
||||
|
||||
def add_resource(self, name: str, resource_type: str, position: Tuple[float, float, float]):
|
||||
"""添加资源到工作台"""
|
||||
if resource_type == "plate":
|
||||
resource = Plate(name)
|
||||
elif resource_type == "tip_rack":
|
||||
resource = TipRack(name)
|
||||
else:
|
||||
resource = Resource(name)
|
||||
|
||||
self.deck.add_resource(name, resource, position)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取设备状态"""
|
||||
return {
|
||||
"connected": self.backend.is_connected,
|
||||
"setup": self.is_setup,
|
||||
"current_position": self.backend.current_position,
|
||||
"tip_attached": self.backend.tip_attached,
|
||||
"current_volume": self.backend.current_volume,
|
||||
"resources": self.deck.list_resources()
|
||||
}
|
||||
|
||||
|
||||
def create_quick_setup() -> LaiYuLiquidDeck:
|
||||
"""
|
||||
创建快速设置的LaiYu液体处理工作站
|
||||
|
||||
Returns:
|
||||
LaiYuLiquidDeck: 配置好的工作台实例
|
||||
"""
|
||||
# 创建默认配置
|
||||
config = LaiYuLiquidConfig()
|
||||
|
||||
# 创建工作台
|
||||
deck = LaiYuLiquidDeck(config)
|
||||
|
||||
# 导入资源创建函数
|
||||
try:
|
||||
from .laiyu_liquid_res import (
|
||||
create_tip_rack_1000ul,
|
||||
create_tip_rack_200ul,
|
||||
create_96_well_plate,
|
||||
create_waste_container
|
||||
)
|
||||
|
||||
# 添加基本资源
|
||||
tip_rack_1000 = create_tip_rack_1000ul("tip_rack_1000")
|
||||
tip_rack_200 = create_tip_rack_200ul("tip_rack_200")
|
||||
plate_96 = create_96_well_plate("plate_96")
|
||||
waste = create_waste_container("waste")
|
||||
|
||||
# 添加到工作台
|
||||
deck.add_resource("tip_rack_1000", tip_rack_1000, (50, 50, 0))
|
||||
deck.add_resource("tip_rack_200", tip_rack_200, (150, 50, 0))
|
||||
deck.add_resource("plate_96", plate_96, (250, 50, 0))
|
||||
deck.add_resource("waste", waste, (50, 150, 0))
|
||||
|
||||
except ImportError:
|
||||
# 如果资源模块不可用,创建空的工作台
|
||||
logger.warning("资源模块不可用,创建空的工作台")
|
||||
|
||||
return deck
|
||||
|
||||
|
||||
__all__ = [
|
||||
"LaiYuLiquid",
|
||||
"LaiYuLiquidBackend",
|
||||
"LaiYuLiquidConfig",
|
||||
"LaiYuLiquidDeck",
|
||||
"LaiYuLiquidContainer",
|
||||
"LaiYuLiquidTipRack",
|
||||
"LaiYuLiquidError",
|
||||
"create_quick_setup",
|
||||
"get_module_info"
|
||||
]
|
||||
954
unilabos/devices/laiyu_liquid/core/laiyu_liquid_res.py
Normal file
954
unilabos/devices/laiyu_liquid/core/laiyu_liquid_res.py
Normal file
@@ -0,0 +1,954 @@
|
||||
"""
|
||||
LaiYu_Liquid 资源定义模块
|
||||
|
||||
该模块提供了 LaiYu_Liquid 工作站专用的资源定义函数,包括:
|
||||
- 各种规格的枪头架
|
||||
- 不同类型的板和容器
|
||||
- 特殊功能位置
|
||||
- 资源创建的便捷函数
|
||||
|
||||
所有资源都基于 deck.json 中的配置参数创建。
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from pathlib import Path
|
||||
|
||||
# PyLabRobot 资源导入
|
||||
try:
|
||||
from pylabrobot.resources import (
|
||||
Resource, Deck, Plate, TipRack, Container, Tip,
|
||||
Coordinate
|
||||
)
|
||||
from pylabrobot.resources.tip_rack import TipSpot
|
||||
from pylabrobot.resources.well import Well as PlateWell
|
||||
PYLABROBOT_AVAILABLE = True
|
||||
except ImportError:
|
||||
# 如果 PyLabRobot 不可用,创建模拟类
|
||||
PYLABROBOT_AVAILABLE = False
|
||||
|
||||
class Resource:
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
|
||||
class Deck(Resource):
|
||||
pass
|
||||
|
||||
class Plate(Resource):
|
||||
pass
|
||||
|
||||
class TipRack(Resource):
|
||||
pass
|
||||
|
||||
class Container(Resource):
|
||||
pass
|
||||
|
||||
class Tip(Resource):
|
||||
pass
|
||||
|
||||
class TipSpot(Resource):
|
||||
def __init__(self, name: str, **kwargs):
|
||||
super().__init__(name)
|
||||
# 忽略其他参数
|
||||
|
||||
class PlateWell(Resource):
|
||||
pass
|
||||
|
||||
class Coordinate:
|
||||
def __init__(self, x: float, y: float, z: float):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.z = z
|
||||
|
||||
# 本地导入
|
||||
from .laiyu_liquid_main import LaiYuLiquidDeck, LaiYuLiquidContainer, LaiYuLiquidTipRack
|
||||
|
||||
|
||||
def load_deck_config() -> Dict[str, Any]:
|
||||
"""
|
||||
加载工作台配置文件
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: 配置字典
|
||||
"""
|
||||
# 优先使用最新的deckconfig.json文件
|
||||
config_path = Path(__file__).parent / "controllers" / "deckconfig.json"
|
||||
|
||||
# 如果最新配置文件不存在,回退到旧配置文件
|
||||
if not config_path.exists():
|
||||
config_path = Path(__file__).parent / "config" / "deck.json"
|
||||
|
||||
try:
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
# 如果找不到配置文件,返回默认配置
|
||||
return {
|
||||
"name": "LaiYu_Liquid_Deck",
|
||||
"size_x": 340.0,
|
||||
"size_y": 250.0,
|
||||
"size_z": 160.0
|
||||
}
|
||||
|
||||
|
||||
# 加载配置
|
||||
DECK_CONFIG = load_deck_config()
|
||||
|
||||
|
||||
class LaiYuTipRack1000(LaiYuLiquidTipRack):
|
||||
"""1000μL 枪头架"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化1000μL枪头架
|
||||
|
||||
Args:
|
||||
name: 枪头架名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=127.76,
|
||||
size_y=85.48,
|
||||
size_z=30.0,
|
||||
tip_count=96,
|
||||
tip_volume=1000.0
|
||||
)
|
||||
|
||||
# 创建枪头位置
|
||||
self._create_tip_spots(
|
||||
tip_count=96,
|
||||
tip_spacing=9.0,
|
||||
tip_type="1000ul"
|
||||
)
|
||||
|
||||
def _create_tip_spots(self, tip_count: int, tip_spacing: float, tip_type: str):
|
||||
"""
|
||||
创建枪头位置 - 从配置文件中读取绝对坐标
|
||||
|
||||
Args:
|
||||
tip_count: 枪头数量
|
||||
tip_spacing: 枪头间距
|
||||
tip_type: 枪头类型
|
||||
"""
|
||||
# 从配置文件中获取枪头架的孔位信息
|
||||
config = DECK_CONFIG
|
||||
tip_module = None
|
||||
|
||||
# 查找枪头架模块
|
||||
for module in config.get("children", []):
|
||||
if module.get("type") == "tip_rack":
|
||||
tip_module = module
|
||||
break
|
||||
|
||||
if not tip_module:
|
||||
# 如果配置文件中没有找到,使用默认的相对坐标计算
|
||||
rows = 8
|
||||
cols = 12
|
||||
|
||||
for row in range(rows):
|
||||
for col in range(cols):
|
||||
spot_name = f"{chr(65 + row)}{col + 1:02d}"
|
||||
x = col * tip_spacing + tip_spacing / 2
|
||||
y = row * tip_spacing + tip_spacing / 2
|
||||
|
||||
# 创建枪头 - 根据PyLabRobot或模拟类使用不同参数
|
||||
if PYLABROBOT_AVAILABLE:
|
||||
# PyLabRobot的Tip需要特定参数
|
||||
tip = Tip(
|
||||
has_filter=False,
|
||||
total_tip_length=95.0, # 1000ul枪头长度
|
||||
maximal_volume=1000.0, # 最大体积
|
||||
fitting_depth=8.0 # 安装深度
|
||||
)
|
||||
else:
|
||||
# 模拟类只需要name
|
||||
tip = Tip(name=f"tip_{spot_name}")
|
||||
|
||||
# 创建枪头位置
|
||||
if PYLABROBOT_AVAILABLE:
|
||||
# PyLabRobot的TipSpot需要特定参数
|
||||
tip_spot = TipSpot(
|
||||
name=spot_name,
|
||||
size_x=9.0, # 枪头位置宽度
|
||||
size_y=9.0, # 枪头位置深度
|
||||
size_z=95.0, # 枪头位置高度
|
||||
make_tip=lambda: tip # 创建枪头的函数
|
||||
)
|
||||
else:
|
||||
# 模拟类只需要name
|
||||
tip_spot = TipSpot(name=spot_name)
|
||||
|
||||
# 将吸头位置分配到吸头架
|
||||
self.assign_child_resource(
|
||||
tip_spot,
|
||||
location=Coordinate(x, y, 0)
|
||||
)
|
||||
return
|
||||
|
||||
# 使用配置文件中的绝对坐标
|
||||
module_position = tip_module.get("position", {"x": 0, "y": 0, "z": 0})
|
||||
|
||||
for well_config in tip_module.get("wells", []):
|
||||
spot_name = well_config["id"]
|
||||
well_pos = well_config["position"]
|
||||
|
||||
# 计算相对于模块的坐标(绝对坐标减去模块位置)
|
||||
relative_x = well_pos["x"] - module_position["x"]
|
||||
relative_y = well_pos["y"] - module_position["y"]
|
||||
relative_z = well_pos["z"] - module_position["z"]
|
||||
|
||||
# 创建枪头 - 根据PyLabRobot或模拟类使用不同参数
|
||||
if PYLABROBOT_AVAILABLE:
|
||||
# PyLabRobot的Tip需要特定参数
|
||||
tip = Tip(
|
||||
has_filter=False,
|
||||
total_tip_length=95.0, # 1000ul枪头长度
|
||||
maximal_volume=1000.0, # 最大体积
|
||||
fitting_depth=8.0 # 安装深度
|
||||
)
|
||||
else:
|
||||
# 模拟类只需要name
|
||||
tip = Tip(name=f"tip_{spot_name}")
|
||||
|
||||
# 创建枪头位置
|
||||
if PYLABROBOT_AVAILABLE:
|
||||
# PyLabRobot的TipSpot需要特定参数
|
||||
tip_spot = TipSpot(
|
||||
name=spot_name,
|
||||
size_x=well_config.get("diameter", 9.0), # 使用配置中的直径
|
||||
size_y=well_config.get("diameter", 9.0),
|
||||
size_z=well_config.get("depth", 95.0), # 使用配置中的深度
|
||||
make_tip=lambda: tip # 创建枪头的函数
|
||||
)
|
||||
else:
|
||||
# 模拟类只需要name
|
||||
tip_spot = TipSpot(name=spot_name)
|
||||
|
||||
# 将吸头位置分配到吸头架
|
||||
self.assign_child_resource(
|
||||
tip_spot,
|
||||
location=Coordinate(relative_x, relative_y, relative_z)
|
||||
)
|
||||
|
||||
# 注意:在PyLabRobot中,Tip不是Resource,不需要分配给TipSpot
|
||||
# TipSpot的make_tip函数会在需要时创建Tip
|
||||
|
||||
|
||||
class LaiYuTipRack200(LaiYuLiquidTipRack):
|
||||
"""200μL 枪头架"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化200μL枪头架
|
||||
|
||||
Args:
|
||||
name: 枪头架名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=127.76,
|
||||
size_y=85.48,
|
||||
size_z=30.0,
|
||||
tip_count=96,
|
||||
tip_volume=200.0
|
||||
)
|
||||
|
||||
# 创建枪头位置
|
||||
self._create_tip_spots(
|
||||
tip_count=96,
|
||||
tip_spacing=9.0,
|
||||
tip_type="200ul"
|
||||
)
|
||||
|
||||
def _create_tip_spots(self, tip_count: int, tip_spacing: float, tip_type: str):
|
||||
"""
|
||||
创建枪头位置
|
||||
|
||||
Args:
|
||||
tip_count: 枪头数量
|
||||
tip_spacing: 枪头间距
|
||||
tip_type: 枪头类型
|
||||
"""
|
||||
rows = 8
|
||||
cols = 12
|
||||
|
||||
for row in range(rows):
|
||||
for col in range(cols):
|
||||
spot_name = f"{chr(65 + row)}{col + 1:02d}"
|
||||
x = col * tip_spacing + tip_spacing / 2
|
||||
y = row * tip_spacing + tip_spacing / 2
|
||||
|
||||
# 创建枪头 - 根据PyLabRobot或模拟类使用不同参数
|
||||
if PYLABROBOT_AVAILABLE:
|
||||
# PyLabRobot的Tip需要特定参数
|
||||
tip = Tip(
|
||||
has_filter=False,
|
||||
total_tip_length=72.0, # 200ul枪头长度
|
||||
maximal_volume=200.0, # 最大体积
|
||||
fitting_depth=8.0 # 安装深度
|
||||
)
|
||||
else:
|
||||
# 模拟类只需要name
|
||||
tip = Tip(name=f"tip_{spot_name}")
|
||||
|
||||
# 创建枪头位置
|
||||
if PYLABROBOT_AVAILABLE:
|
||||
# PyLabRobot的TipSpot需要特定参数
|
||||
tip_spot = TipSpot(
|
||||
name=spot_name,
|
||||
size_x=9.0, # 枪头位置宽度
|
||||
size_y=9.0, # 枪头位置深度
|
||||
size_z=72.0, # 枪头位置高度
|
||||
make_tip=lambda: tip # 创建枪头的函数
|
||||
)
|
||||
else:
|
||||
# 模拟类只需要name
|
||||
tip_spot = TipSpot(name=spot_name)
|
||||
|
||||
# 将吸头位置分配到吸头架
|
||||
self.assign_child_resource(
|
||||
tip_spot,
|
||||
location=Coordinate(x, y, 0)
|
||||
)
|
||||
|
||||
# 注意:在PyLabRobot中,Tip不是Resource,不需要分配给TipSpot
|
||||
# TipSpot的make_tip函数会在需要时创建Tip
|
||||
|
||||
|
||||
class LaiYu96WellPlate(LaiYuLiquidContainer):
|
||||
"""96孔板"""
|
||||
|
||||
def __init__(self, name: str, lid_height: float = 0.0):
|
||||
"""
|
||||
初始化96孔板
|
||||
|
||||
Args:
|
||||
name: 板名称
|
||||
lid_height: 盖子高度
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=127.76,
|
||||
size_y=85.48,
|
||||
size_z=14.22,
|
||||
container_type="96_well_plate",
|
||||
volume=0.0,
|
||||
max_volume=200.0,
|
||||
lid_height=lid_height
|
||||
)
|
||||
|
||||
# 创建孔位
|
||||
self._create_wells(
|
||||
well_count=96,
|
||||
well_volume=200.0,
|
||||
well_spacing=9.0
|
||||
)
|
||||
|
||||
def get_size_z(self) -> float:
|
||||
"""获取孔位深度"""
|
||||
return 10.0 # 96孔板孔位深度
|
||||
|
||||
def _create_wells(self, well_count: int, well_volume: float, well_spacing: float):
|
||||
"""
|
||||
创建孔位 - 从配置文件中读取绝对坐标
|
||||
|
||||
Args:
|
||||
well_count: 孔位数量
|
||||
well_volume: 孔位体积
|
||||
well_spacing: 孔位间距
|
||||
"""
|
||||
# 从配置文件中获取96孔板的孔位信息
|
||||
config = DECK_CONFIG
|
||||
plate_module = None
|
||||
|
||||
# 查找96孔板模块
|
||||
for module in config.get("children", []):
|
||||
if module.get("type") == "96_well_plate":
|
||||
plate_module = module
|
||||
break
|
||||
|
||||
if not plate_module:
|
||||
# 如果配置文件中没有找到,使用默认的相对坐标计算
|
||||
rows = 8
|
||||
cols = 12
|
||||
|
||||
for row in range(rows):
|
||||
for col in range(cols):
|
||||
well_name = f"{chr(65 + row)}{col + 1:02d}"
|
||||
x = col * well_spacing + well_spacing / 2
|
||||
y = row * well_spacing + well_spacing / 2
|
||||
|
||||
# 创建孔位
|
||||
well = PlateWell(
|
||||
name=well_name,
|
||||
size_x=well_spacing * 0.8,
|
||||
size_y=well_spacing * 0.8,
|
||||
size_z=self.get_size_z(),
|
||||
max_volume=well_volume
|
||||
)
|
||||
|
||||
# 添加到板
|
||||
self.assign_child_resource(
|
||||
well,
|
||||
location=Coordinate(x, y, 0)
|
||||
)
|
||||
return
|
||||
|
||||
# 使用配置文件中的绝对坐标
|
||||
module_position = plate_module.get("position", {"x": 0, "y": 0, "z": 0})
|
||||
|
||||
for well_config in plate_module.get("wells", []):
|
||||
well_name = well_config["id"]
|
||||
well_pos = well_config["position"]
|
||||
|
||||
# 计算相对于模块的坐标(绝对坐标减去模块位置)
|
||||
relative_x = well_pos["x"] - module_position["x"]
|
||||
relative_y = well_pos["y"] - module_position["y"]
|
||||
relative_z = well_pos["z"] - module_position["z"]
|
||||
|
||||
# 创建孔位
|
||||
well = PlateWell(
|
||||
name=well_name,
|
||||
size_x=well_config.get("diameter", 8.2) * 0.8, # 使用配置中的直径
|
||||
size_y=well_config.get("diameter", 8.2) * 0.8,
|
||||
size_z=well_config.get("depth", self.get_size_z()),
|
||||
max_volume=well_config.get("volume", well_volume)
|
||||
)
|
||||
|
||||
# 添加到板
|
||||
self.assign_child_resource(
|
||||
well,
|
||||
location=Coordinate(relative_x, relative_y, relative_z)
|
||||
)
|
||||
|
||||
|
||||
class LaiYuDeepWellPlate(LaiYuLiquidContainer):
|
||||
"""深孔板"""
|
||||
|
||||
def __init__(self, name: str, lid_height: float = 0.0):
|
||||
"""
|
||||
初始化深孔板
|
||||
|
||||
Args:
|
||||
name: 板名称
|
||||
lid_height: 盖子高度
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=127.76,
|
||||
size_y=85.48,
|
||||
size_z=41.3,
|
||||
container_type="deep_well_plate",
|
||||
volume=0.0,
|
||||
max_volume=2000.0,
|
||||
lid_height=lid_height
|
||||
)
|
||||
|
||||
# 创建孔位
|
||||
self._create_wells(
|
||||
well_count=96,
|
||||
well_volume=2000.0,
|
||||
well_spacing=9.0
|
||||
)
|
||||
|
||||
def get_size_z(self) -> float:
|
||||
"""获取孔位深度"""
|
||||
return 35.0 # 深孔板孔位深度
|
||||
|
||||
def _create_wells(self, well_count: int, well_volume: float, well_spacing: float):
|
||||
"""
|
||||
创建孔位 - 从配置文件中读取绝对坐标
|
||||
|
||||
Args:
|
||||
well_count: 孔位数量
|
||||
well_volume: 孔位体积
|
||||
well_spacing: 孔位间距
|
||||
"""
|
||||
# 从配置文件中获取深孔板的孔位信息
|
||||
config = DECK_CONFIG
|
||||
plate_module = None
|
||||
|
||||
# 查找深孔板模块(通常是第二个96孔板模块)
|
||||
plate_modules = []
|
||||
for module in config.get("children", []):
|
||||
if module.get("type") == "96_well_plate":
|
||||
plate_modules.append(module)
|
||||
|
||||
# 如果有多个96孔板模块,选择第二个作为深孔板
|
||||
if len(plate_modules) > 1:
|
||||
plate_module = plate_modules[1]
|
||||
elif len(plate_modules) == 1:
|
||||
plate_module = plate_modules[0]
|
||||
|
||||
if not plate_module:
|
||||
# 如果配置文件中没有找到,使用默认的相对坐标计算
|
||||
rows = 8
|
||||
cols = 12
|
||||
|
||||
for row in range(rows):
|
||||
for col in range(cols):
|
||||
well_name = f"{chr(65 + row)}{col + 1:02d}"
|
||||
x = col * well_spacing + well_spacing / 2
|
||||
y = row * well_spacing + well_spacing / 2
|
||||
|
||||
# 创建孔位
|
||||
well = PlateWell(
|
||||
name=well_name,
|
||||
size_x=well_spacing * 0.8,
|
||||
size_y=well_spacing * 0.8,
|
||||
size_z=self.get_size_z(),
|
||||
max_volume=well_volume
|
||||
)
|
||||
|
||||
# 添加到板
|
||||
self.assign_child_resource(
|
||||
well,
|
||||
location=Coordinate(x, y, 0)
|
||||
)
|
||||
return
|
||||
|
||||
# 使用配置文件中的绝对坐标
|
||||
module_position = plate_module.get("position", {"x": 0, "y": 0, "z": 0})
|
||||
|
||||
for well_config in plate_module.get("wells", []):
|
||||
well_name = well_config["id"]
|
||||
well_pos = well_config["position"]
|
||||
|
||||
# 计算相对于模块的坐标(绝对坐标减去模块位置)
|
||||
relative_x = well_pos["x"] - module_position["x"]
|
||||
relative_y = well_pos["y"] - module_position["y"]
|
||||
relative_z = well_pos["z"] - module_position["z"]
|
||||
|
||||
# 创建孔位
|
||||
well = PlateWell(
|
||||
name=well_name,
|
||||
size_x=well_config.get("diameter", 8.2) * 0.8, # 使用配置中的直径
|
||||
size_y=well_config.get("diameter", 8.2) * 0.8,
|
||||
size_z=well_config.get("depth", self.get_size_z()),
|
||||
max_volume=well_config.get("volume", well_volume)
|
||||
)
|
||||
|
||||
# 添加到板
|
||||
self.assign_child_resource(
|
||||
well,
|
||||
location=Coordinate(relative_x, relative_y, relative_z)
|
||||
)
|
||||
|
||||
|
||||
class LaiYuWasteContainer(Container):
|
||||
"""废液容器"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化废液容器
|
||||
|
||||
Args:
|
||||
name: 容器名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=100.0,
|
||||
size_y=100.0,
|
||||
size_z=50.0,
|
||||
max_volume=5000.0
|
||||
)
|
||||
|
||||
|
||||
class LaiYuWashContainer(Container):
|
||||
"""清洗容器"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化清洗容器
|
||||
|
||||
Args:
|
||||
name: 容器名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=100.0,
|
||||
size_y=100.0,
|
||||
size_z=50.0,
|
||||
max_volume=5000.0
|
||||
)
|
||||
|
||||
|
||||
class LaiYuReagentContainer(Container):
|
||||
"""试剂容器"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化试剂容器
|
||||
|
||||
Args:
|
||||
name: 容器名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=50.0,
|
||||
size_y=50.0,
|
||||
size_z=100.0,
|
||||
max_volume=2000.0
|
||||
)
|
||||
|
||||
|
||||
class LaiYu8TubeRack(LaiYuLiquidContainer):
|
||||
"""8管试管架"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化8管试管架
|
||||
|
||||
Args:
|
||||
name: 试管架名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=151.0,
|
||||
size_y=75.0,
|
||||
size_z=75.0,
|
||||
container_type="tube_rack",
|
||||
volume=0.0,
|
||||
max_volume=77000.0
|
||||
)
|
||||
|
||||
# 创建孔位
|
||||
self._create_wells(
|
||||
well_count=8,
|
||||
well_volume=77000.0,
|
||||
well_spacing=35.0
|
||||
)
|
||||
|
||||
def get_size_z(self) -> float:
|
||||
"""获取孔位深度"""
|
||||
return 117.0 # 试管深度
|
||||
|
||||
def _create_wells(self, well_count: int, well_volume: float, well_spacing: float):
|
||||
"""
|
||||
创建孔位 - 从配置文件中读取绝对坐标
|
||||
|
||||
Args:
|
||||
well_count: 孔位数量
|
||||
well_volume: 孔位体积
|
||||
well_spacing: 孔位间距
|
||||
"""
|
||||
# 从配置文件中获取8管试管架的孔位信息
|
||||
config = DECK_CONFIG
|
||||
tube_module = None
|
||||
|
||||
# 查找8管试管架模块
|
||||
for module in config.get("children", []):
|
||||
if module.get("type") == "tube_rack":
|
||||
tube_module = module
|
||||
break
|
||||
|
||||
if not tube_module:
|
||||
# 如果配置文件中没有找到,使用默认的相对坐标计算
|
||||
rows = 2
|
||||
cols = 4
|
||||
|
||||
for row in range(rows):
|
||||
for col in range(cols):
|
||||
well_name = f"{chr(65 + row)}{col + 1}"
|
||||
x = col * well_spacing + well_spacing / 2
|
||||
y = row * well_spacing + well_spacing / 2
|
||||
|
||||
# 创建孔位
|
||||
well = PlateWell(
|
||||
name=well_name,
|
||||
size_x=29.0,
|
||||
size_y=29.0,
|
||||
size_z=self.get_size_z(),
|
||||
max_volume=well_volume
|
||||
)
|
||||
|
||||
# 添加到试管架
|
||||
self.assign_child_resource(
|
||||
well,
|
||||
location=Coordinate(x, y, 0)
|
||||
)
|
||||
return
|
||||
|
||||
# 使用配置文件中的绝对坐标
|
||||
module_position = tube_module.get("position", {"x": 0, "y": 0, "z": 0})
|
||||
|
||||
for well_config in tube_module.get("wells", []):
|
||||
well_name = well_config["id"]
|
||||
well_pos = well_config["position"]
|
||||
|
||||
# 计算相对于模块的坐标(绝对坐标减去模块位置)
|
||||
relative_x = well_pos["x"] - module_position["x"]
|
||||
relative_y = well_pos["y"] - module_position["y"]
|
||||
relative_z = well_pos["z"] - module_position["z"]
|
||||
|
||||
# 创建孔位
|
||||
well = PlateWell(
|
||||
name=well_name,
|
||||
size_x=well_config.get("diameter", 29.0),
|
||||
size_y=well_config.get("diameter", 29.0),
|
||||
size_z=well_config.get("depth", self.get_size_z()),
|
||||
max_volume=well_config.get("volume", well_volume)
|
||||
)
|
||||
|
||||
# 添加到试管架
|
||||
self.assign_child_resource(
|
||||
well,
|
||||
location=Coordinate(relative_x, relative_y, relative_z)
|
||||
)
|
||||
|
||||
|
||||
class LaiYuTipDisposal(Resource):
|
||||
"""枪头废料位置"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化枪头废料位置
|
||||
|
||||
Args:
|
||||
name: 位置名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=100.0,
|
||||
size_y=100.0,
|
||||
size_z=50.0
|
||||
)
|
||||
|
||||
|
||||
class LaiYuMaintenancePosition(Resource):
|
||||
"""维护位置"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
"""
|
||||
初始化维护位置
|
||||
|
||||
Args:
|
||||
name: 位置名称
|
||||
"""
|
||||
super().__init__(
|
||||
name=name,
|
||||
size_x=50.0,
|
||||
size_y=50.0,
|
||||
size_z=100.0
|
||||
)
|
||||
|
||||
|
||||
# 资源创建函数
|
||||
def create_tip_rack_1000ul(name: str = "tip_rack_1000ul") -> LaiYuTipRack1000:
|
||||
"""
|
||||
创建1000μL枪头架
|
||||
|
||||
Args:
|
||||
name: 枪头架名称
|
||||
|
||||
Returns:
|
||||
LaiYuTipRack1000: 1000μL枪头架实例
|
||||
"""
|
||||
return LaiYuTipRack1000(name)
|
||||
|
||||
|
||||
def create_tip_rack_200ul(name: str = "tip_rack_200ul") -> LaiYuTipRack200:
|
||||
"""
|
||||
创建200μL枪头架
|
||||
|
||||
Args:
|
||||
name: 枪头架名称
|
||||
|
||||
Returns:
|
||||
LaiYuTipRack200: 200μL枪头架实例
|
||||
"""
|
||||
return LaiYuTipRack200(name)
|
||||
|
||||
|
||||
def create_96_well_plate(name: str = "96_well_plate", lid_height: float = 0.0) -> LaiYu96WellPlate:
|
||||
"""
|
||||
创建96孔板
|
||||
|
||||
Args:
|
||||
name: 板名称
|
||||
lid_height: 盖子高度
|
||||
|
||||
Returns:
|
||||
LaiYu96WellPlate: 96孔板实例
|
||||
"""
|
||||
return LaiYu96WellPlate(name, lid_height)
|
||||
|
||||
|
||||
def create_deep_well_plate(name: str = "deep_well_plate", lid_height: float = 0.0) -> LaiYuDeepWellPlate:
|
||||
"""
|
||||
创建深孔板
|
||||
|
||||
Args:
|
||||
name: 板名称
|
||||
lid_height: 盖子高度
|
||||
|
||||
Returns:
|
||||
LaiYuDeepWellPlate: 深孔板实例
|
||||
"""
|
||||
return LaiYuDeepWellPlate(name, lid_height)
|
||||
|
||||
|
||||
def create_8_tube_rack(name: str = "8_tube_rack") -> LaiYu8TubeRack:
|
||||
"""
|
||||
创建8管试管架
|
||||
|
||||
Args:
|
||||
name: 试管架名称
|
||||
|
||||
Returns:
|
||||
LaiYu8TubeRack: 8管试管架实例
|
||||
"""
|
||||
return LaiYu8TubeRack(name)
|
||||
|
||||
|
||||
def create_waste_container(name: str = "waste_container") -> LaiYuWasteContainer:
|
||||
"""
|
||||
创建废液容器
|
||||
|
||||
Args:
|
||||
name: 容器名称
|
||||
|
||||
Returns:
|
||||
LaiYuWasteContainer: 废液容器实例
|
||||
"""
|
||||
return LaiYuWasteContainer(name)
|
||||
|
||||
|
||||
def create_wash_container(name: str = "wash_container") -> LaiYuWashContainer:
|
||||
"""
|
||||
创建清洗容器
|
||||
|
||||
Args:
|
||||
name: 容器名称
|
||||
|
||||
Returns:
|
||||
LaiYuWashContainer: 清洗容器实例
|
||||
"""
|
||||
return LaiYuWashContainer(name)
|
||||
|
||||
|
||||
def create_reagent_container(name: str = "reagent_container") -> LaiYuReagentContainer:
|
||||
"""
|
||||
创建试剂容器
|
||||
|
||||
Args:
|
||||
name: 容器名称
|
||||
|
||||
Returns:
|
||||
LaiYuReagentContainer: 试剂容器实例
|
||||
"""
|
||||
return LaiYuReagentContainer(name)
|
||||
|
||||
|
||||
def create_tip_disposal(name: str = "tip_disposal") -> LaiYuTipDisposal:
|
||||
"""
|
||||
创建枪头废料位置
|
||||
|
||||
Args:
|
||||
name: 位置名称
|
||||
|
||||
Returns:
|
||||
LaiYuTipDisposal: 枪头废料位置实例
|
||||
"""
|
||||
return LaiYuTipDisposal(name)
|
||||
|
||||
|
||||
def create_maintenance_position(name: str = "maintenance_position") -> LaiYuMaintenancePosition:
|
||||
"""
|
||||
创建维护位置
|
||||
|
||||
Args:
|
||||
name: 位置名称
|
||||
|
||||
Returns:
|
||||
LaiYuMaintenancePosition: 维护位置实例
|
||||
"""
|
||||
return LaiYuMaintenancePosition(name)
|
||||
|
||||
|
||||
def create_standard_deck() -> LaiYuLiquidDeck:
|
||||
"""
|
||||
创建标准工作台配置
|
||||
|
||||
Returns:
|
||||
LaiYuLiquidDeck: 配置好的工作台实例
|
||||
"""
|
||||
# 从配置文件创建工作台
|
||||
deck = LaiYuLiquidDeck(config=DECK_CONFIG)
|
||||
|
||||
return deck
|
||||
|
||||
|
||||
def get_resource_by_name(deck: LaiYuLiquidDeck, name: str) -> Optional[Resource]:
|
||||
"""
|
||||
根据名称获取资源
|
||||
|
||||
Args:
|
||||
deck: 工作台实例
|
||||
name: 资源名称
|
||||
|
||||
Returns:
|
||||
Optional[Resource]: 找到的资源,如果不存在则返回None
|
||||
"""
|
||||
for child in deck.children:
|
||||
if child.name == name:
|
||||
return child
|
||||
return None
|
||||
|
||||
|
||||
def get_resources_by_type(deck: LaiYuLiquidDeck, resource_type: type) -> List[Resource]:
|
||||
"""
|
||||
根据类型获取资源列表
|
||||
|
||||
Args:
|
||||
deck: 工作台实例
|
||||
resource_type: 资源类型
|
||||
|
||||
Returns:
|
||||
List[Resource]: 匹配类型的资源列表
|
||||
"""
|
||||
return [child for child in deck.children if isinstance(child, resource_type)]
|
||||
|
||||
|
||||
def list_all_resources(deck: LaiYuLiquidDeck) -> Dict[str, List[str]]:
|
||||
"""
|
||||
列出所有资源
|
||||
|
||||
Args:
|
||||
deck: 工作台实例
|
||||
|
||||
Returns:
|
||||
Dict[str, List[str]]: 按类型分组的资源名称字典
|
||||
"""
|
||||
resources = {
|
||||
"tip_racks": [],
|
||||
"plates": [],
|
||||
"containers": [],
|
||||
"positions": []
|
||||
}
|
||||
|
||||
for child in deck.children:
|
||||
if isinstance(child, (LaiYuTipRack1000, LaiYuTipRack200)):
|
||||
resources["tip_racks"].append(child.name)
|
||||
elif isinstance(child, (LaiYu96WellPlate, LaiYuDeepWellPlate)):
|
||||
resources["plates"].append(child.name)
|
||||
elif isinstance(child, (LaiYuWasteContainer, LaiYuWashContainer, LaiYuReagentContainer)):
|
||||
resources["containers"].append(child.name)
|
||||
elif isinstance(child, (LaiYuTipDisposal, LaiYuMaintenancePosition)):
|
||||
resources["positions"].append(child.name)
|
||||
|
||||
return resources
|
||||
|
||||
|
||||
# 导出的类别名(向后兼容)
|
||||
TipRack1000ul = LaiYuTipRack1000
|
||||
TipRack200ul = LaiYuTipRack200
|
||||
Plate96Well = LaiYu96WellPlate
|
||||
Plate96DeepWell = LaiYuDeepWellPlate
|
||||
TubeRack8 = LaiYu8TubeRack
|
||||
WasteContainer = LaiYuWasteContainer
|
||||
WashContainer = LaiYuWashContainer
|
||||
ReagentContainer = LaiYuReagentContainer
|
||||
TipDisposal = LaiYuTipDisposal
|
||||
MaintenancePosition = LaiYuMaintenancePosition
|
||||
Reference in New Issue
Block a user