Compare commits

...

41 Commits

Author SHA1 Message Date
qxw138
e840516ba4 Update biomek.py 2025-06-06 22:50:11 +08:00
qxw138
146d8c5296 Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-06 22:49:35 +08:00
qxw138
6573c9e02e biomek_test.py
biomek_test.py是最新的版本,运行它会生成complete_biomek_protocol.json
2025-06-06 22:42:06 +08:00
Xuwznln
c7b9c6a825 fix handles not as default entry 2025-06-06 18:13:53 +08:00
Xuwznln
48c43d3303 fix biomek startup
add action handles
2025-06-06 17:45:54 +08:00
Xuwznln
55be5e8188 registry 2025-06-06 17:21:19 +08:00
qxw138
1b9f3c666d 1 2025-06-06 14:44:17 +08:00
qxw138
097114d38c new actions 2025-06-06 14:31:10 +08:00
qxw138
5bec899479 new actions 2025-06-06 13:56:39 +08:00
Guangxin Zhang
5e86112ebf Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-06 13:25:34 +08:00
Guangxin Zhang
24ecb13b79 Update 2025-06-06 13:22:15 +08:00
qxw138
2573d34713 Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-06 13:18:42 +08:00
Guangxin Zhang
106d71e1db Refine 2025-06-06 11:11:17 +08:00
Guangxin Zhang
3c2a4a64ac Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-06 11:11:10 +08:00
Guangxin Zhang
1e00a66a65 New parameter for biomek to run. 2025-06-06 11:05:36 +08:00
qxw138
46da42deef Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-06 00:13:11 +08:00
Xuwznln
101c1bc3cc fix key name typo 2025-06-05 22:15:57 +08:00
qxw138
a62112ae26 new action 2025-06-05 17:26:36 +08:00
Xuwznln
dd5a7cab75 支持Biomek创建 2025-06-05 16:04:44 +08:00
Xuwznln
39de3ac58e 更新transfer_biomek的msg 2025-06-05 15:41:16 +08:00
Xuwznln
b99969278c 更新transfer_biomek的msg 2025-06-05 15:30:51 +08:00
Guangxin Zhang
b957ad2f71 Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-04 21:49:27 +08:00
Guangxin Zhang
e1a7c3a103 Updated transfer_biomek 2025-06-04 21:49:22 +08:00
Guangxin Zhang
e63c15997c New transfer_biomek 2025-06-04 21:29:54 +08:00
Xuwznln
c5a495f409 新增transfer_biomek的msg 2025-06-04 19:03:00 +08:00
Guangxin Zhang
5b240cb0ea Update biomek.py 2025-06-04 17:30:53 +08:00
Guangxin Zhang
147b8f47c0 Biomek test 2025-06-04 16:38:18 +08:00
Guangxin Zhang
6d2489af5f Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-06-04 13:27:11 +08:00
Guangxin Zhang
807dcdd226 Update biomek.py 2025-06-04 13:27:05 +08:00
Guangxin Zhang
8a29bc5597 Remove warnings 2025-06-04 13:20:12 +08:00
Guangxin Zhang
6f6c70ee57 delete 's' 2025-06-04 13:11:45 +08:00
Xuwznln
478a85951c 修复biomek缺少的字段 2025-05-31 00:00:55 +08:00
Xuwznln
0f2555c90c 注册表上报handle和schema (param input) 2025-05-31 00:00:39 +08:00
Guangxin Zhang
d2dda6ee03 Merge branch '37-biomek-i5i7' of https://github.com/dptech-corp/Uni-Lab-OS into 37-biomek-i5i7 2025-05-30 17:11:23 +08:00
Guangxin Zhang
208540b307 Update biomek.py 2025-05-30 17:08:19 +08:00
Guangxin Zhang
cb7c56a1d9 Convert LH action to biomek. 2025-05-30 17:00:06 +08:00
Xuwznln
ea2e9c3e3a fix biomek success type 2025-05-30 16:50:13 +08:00
Guangxin Zhang
0452a68180 Test 2025-05-30 16:03:49 +08:00
Xuwznln
90a0f3db9b merge 2025-05-30 15:40:14 +08:00
Junhan Chang
055d120ba8 更新LiquidHandlerBiomek类,添加资源创建功能,优化协议创建方法,修复部分代码格式问题,更新YAML配置以支持新功能。 2025-05-30 15:38:23 +08:00
Junhan Chang
a948f09f60 add biomek.py demo implementation 2025-05-30 13:33:10 +08:00
23 changed files with 13567 additions and 74 deletions

View File

@@ -0,0 +1,907 @@
import requests
from typing import List, Sequence, Optional, Union, Literal
from geometry_msgs.msg import Point
from pylabrobot.liquid_handling import LiquidHandler
from unilabos_msgs.msg import Resource
from pylabrobot.resources import (
TipRack,
Container,
Coordinate,
)
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker # type: ignore
from .liquid_handler_abstract import LiquidHandlerAbstract
import json
from typing import Sequence, Optional, List, Union, Literal
class LiquidHandlerBiomek(LiquidHandlerAbstract):
"""
Biomek液体处理器的实现类继承自LiquidHandlerAbstract。
该类用于处理Biomek液体处理器的特定操作。
"""
def __init__(self, backend=None, deck=None, *args, **kwargs):
super().__init__(backend, deck, *args, **kwargs)
self._status = "Idle" # 初始状态为 Idle
self._success = False # 初始成功状态为 False
self._status_queue = kwargs.get("status_queue", None) # 状态队列
self.temp_protocol = {}
self.py32_path = "/opt/py32" # Biomek的Python 3.2路径
self.aspirate_techniques = {
'MC P300 high':{
'Position': 'P1',
'Height': -2.0,
'Volume': '50',
'liquidtype': 'Well Contents',
'WellsX': 12,
'LabwareClass': 'Matrix96_750uL',
'AutoSelectPrototype': True,
'ColsFirst': True,
'CustomHeight': False,
'DataSetPattern': False,
'HeightFrom': 0,
'LocalPattern': True,
'Operation': 'Aspirate',
'OverrideHeight': False,
'Pattern': (True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True),
'Prototype': 'MC P300 High',
'ReferencedPattern': '',
'RowsFirst': False,
'SectionExpression': '',
'SelectionInfo': (1,),
'SetMark': True,
'Source': True,
'StartAtMark': False,
'StartAtSelection': True,
'UseExpression': False},
}
self.dispense_techniques = {
'MC P300 high':{
'Position': 'P11',
'Height': -2.0,
'Volume': '50',
'liquidtype': 'Tip Contents',
'WellsX': 12,
'LabwareClass': 'Matrix96_750uL',
'AutoSelectPrototype': True,
'ColsFirst': True,
'CustomHeight': False,
'DataSetPattern': False,
'HeightFrom': 0,
'LocalPattern': True,
'Operation': 'Dispense',
'OverrideHeight': False,
'Pattern': (True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True),
'Prototype': 'MC P300 High',
'ReferencedPattern': '',
'RowsFirst': False,
'SectionExpression': '',
'SelectionInfo': (1,),
'SetMark': True,
'Source': False,
'StartAtMark': False,
'StartAtSelection': True,
'UseExpression': False}
}
@classmethod
def deserialize(cls, data: dict, allow_marshal: bool = False) -> LiquidHandler:
return LiquidHandler.deserialize(data, allow_marshal)
def create_protocol(
self,
protocol_name: str,
protocol_description: str,
protocol_version: str,
protocol_author: str,
protocol_date: str,
protocol_type: str,
none_keys: List[str] = [],
):
"""
创建一个新的协议。
Args:
protocol_name (str): 协议名称
protocol_description (str): 协议描述
protocol_version (str): 协议版本
protocol_author (str): 协议作者
protocol_date (str): 协议日期
protocol_type (str): 协议类型
none_keys (List[str]): 需要设置为None的键列表
Returns:
dict: 创建的协议字典
"""
self.temp_protocol = {
"meta": {
"name": protocol_name,
"description": protocol_description,
"version": protocol_version,
"author": protocol_author,
"date": protocol_date,
"type": protocol_type,
},
"labwares": [],
"steps": [],
}
return self.temp_protocol
def run_protocol(self):
"""
执行创建的实验流程。
工作站的完整执行流程是,
从 create_protocol 开始,创建新的 method
随后执行 transfer_liquid 等操作向实验流程添加步骤,
最后 run_protocol 执行整个方法。
Returns:
dict: 执行结果
"""
#use popen or subprocess to create py32 process and communicate send the temp protocol to it
if not self.temp_protocol:
raise ValueError("No protocol created. Please create a protocol first.")
# 模拟执行协议
self._status = "Running"
self._success = True
# 在这里可以添加实际执行协议的逻辑
response = requests.post("localhost:5000/api/protocols", json=self.temp_protocol)
def create_resource(
self,
resource_tracker: DeviceNodeResourceTracker,
resources: list[Resource],
bind_parent_id: str,
bind_location: dict[str, float],
liquid_input_slot: list[int],
liquid_type: list[str],
liquid_volume: list[int],
slot_on_deck: int,
):
"""
创建一个新的资源。
Args:
device_id (str): 设备ID
res_id (str): 资源ID
class_name (str): 资源类名
parent (str): 父级ID
bind_locations (Point): 绑定位置
liquid_input_slot (list[int]): 液体输入槽列表
liquid_type (list[str]): 液体类型列表
liquid_volume (list[int]): 液体体积列表
slot_on_deck (int): 甲板上的槽位
Returns:
dict: 创建的资源字典
"""
# TODO需要对好接口下面这个是临时的
for resource in resources:
res_id = resource.id
class_name = resource.class_name
parent = bind_parent_id
bind_locations = Coordinate.from_point(resource.bind_location)
liquid_input_slot = liquid_input_slot
liquid_type = liquid_type
liquid_volume = liquid_volume
slot_on_deck = slot_on_deck
resource = {
"id": res_id,
"class": class_name,
"parent": parent,
"bind_locations": bind_locations.to_dict(),
"liquid_input_slot": liquid_input_slot,
"liquid_type": liquid_type,
"liquid_volume": liquid_volume,
"slot_on_deck": slot_on_deck,
}
self.temp_protocol["labwares"].append(resource)
return resource
def transfer_liquid(
self,
sources: Sequence[Container],
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
*,
use_channels: Optional[List[int]] = None,
asp_vols: Union[List[float], float],
dis_vols: Union[List[float], float],
asp_flow_rates: Optional[List[Optional[float]]] = None,
dis_flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None,
touch_tip: bool = False,
liquid_height: Optional[List[Optional[float]]] = None,
blow_out_air_volume: Optional[List[Optional[float]]] = None,
spread: Literal["wide", "tight", "custom"] = "wide",
is_96_well: bool = False,
mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none",
mix_times: Optional[int] = None,
mix_vol: Optional[int] = None,
mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None,
delays: Optional[List[int]] = None,
none_keys: List[str] = []
):
transfer_params = {
"Span8": False,
"Pod": "Pod1",
"items": {},
"Wash": False,
"Dynamic?": True,
"AutoSelectActiveWashTechnique": False,
"ActiveWashTechnique": "",
"ChangeTipsBetweenDests": False,
"ChangeTipsBetweenSources": False,
"DefaultCaption": "",
"UseExpression": False,
"LeaveTipsOn": False,
"MandrelExpression": "",
"Repeats": "1",
"RepeatsByVolume": False,
"Replicates": "1",
"ShowTipHandlingDetails": False,
"ShowTransferDetails": True,
"Solvent": "Well Content",
"Span8Wash": False,
"Span8WashVolume": "2",
"Span8WasteVolume": "1",
"SplitVolume": False,
"SplitVolumeCleaning": False,
"Stop": "Destinations",
"TipLocation": "BC1025F",
"UseCurrentTips": False,
"UseDisposableTips": True,
"UseFixedTips": False,
"UseJIT": True,
"UseMandrelSelection": True,
"UseProbes": [True, True, True, True, True, True, True, True],
"WashCycles": "1",
"WashVolume": "110%",
"Wizard": False
}
items: dict = {}
for idx, (src, dst) in enumerate(zip(sources, targets)):
items[str(idx)] = {
"Source": str(src),
"Destination": str(dst),
"Volume": dis_vols[idx]
}
transfer_params["items"] = items
transfer_params["Solvent"] = "Water"
TipLocation = tip_racks[0].name
transfer_params["TipLocation"] = TipLocation
if len(tip_racks) == 1:
transfer_params['UseCurrentTips'] = True
elif len(tip_racks) > 1:
transfer_params["ChangeTipsBetweenDests"] = True
self.temp_protocol["steps"].append(transfer_params)
return
def transfer_biomek(
self,
source: str,
target: str,
tip_rack: str,
volume: float,
aspirate_techniques: str,
dispense_techniques: str,
):
"""
处理Biomek的液体转移操作。
"""
items = []
asp_params = self.aspirate_techniques.get(aspirate_techniques, {})
dis_params = self.dispense_techniques.get(dispense_techniques, {})
asp_params['Position'] = source
dis_params['Position'] = target
asp_params['Volume'] = str(volume)
dis_params['Volume'] = str(volume)
items.append(asp_params)
items.append(dis_params)
transfer_params = {
"Span8": False,
"Pod": "Pod1",
"items": [],
"Wash": False,
"Dynamic?": True,
"AutoSelectActiveWashTechnique": False,
"ActiveWashTechnique": "",
"ChangeTipsBetweenDests": True,
"ChangeTipsBetweenSources": False,
"DefaultCaption": "",
"UseExpression": False,
"LeaveTipsOn": False,
"MandrelExpression": "",
"Repeats": "1",
"RepeatsByVolume": False,
"Replicates": "1",
"ShowTipHandlingDetails": False,
"ShowTransferDetails": True,
"Solvent": "Water",
"Span8Wash": False,
"Span8WashVolume": "2",
"Span8WasteVolume": "1",
"SplitVolume": False,
"SplitVolumeCleaning": False,
"Stop": "Destinations",
"TipLocation": "BC230",
"UseCurrentTips": False,
"UseDisposableTips": False,
"UseFixedTips": False,
"UseJIT": True,
"UseMandrelSelection": True,
"UseProbes": [True, True, True, True, True, True, True, True],
"WashCycles": "4",
"WashVolume": "110%",
"Wizard": False
}
transfer_params["items"] = items
transfer_params["Solvent"] = 'Water'
transfer_params["TipLocation"] = tip_rack
self.temp_protocol["steps"].append(transfer_params)
return
def move_biomek(
self,
source: str,
target: str,
):
"""
处理Biomek移动板子的操作。
"""
move_params = {
"Pod": "Pod1",
"GripSide": "A1 near",
"Source": source,
"Target": target,
"LeaveBottomLabware": False,
}
self.temp_protocol["steps"].append(move_params)
return
def incubation_biomek(
self,
time: int,
):
"""
处理Biomek的孵育操作。
"""
incubation_params = {
"Message": "Paused",
"Location": "the whole system",
"Time": time,
"Mode": "TimedResource"
}
self.temp_protocol["steps"].append(incubation_params)
return
def oscillation_biomek(
self,
rpm: int,
time: int,
):
"""
处理Biomek的振荡操作。
"""
oscillation_params = {
'Device': 'OrbitalShaker0',
'Parameters': (str(rpm), '2', str(time), 'CounterClockwise'),
'Command': 'Timed Shake'
}
self.temp_protocol["steps"].append(oscillation_params)
return
if __name__ == "__main__":
steps_info = '''
{
"steps": [
{
"step_number": 1,
"operation": "transfer",
"description": "转移PCR产物或酶促反应液至0.05ml 96孔板中",
"parameters": {
"source": "P1",
"target": "P11",
"tip_rack": "BC230",
"volume": 50
}
},
{
"step_number": 2,
"operation": "transfer",
"description": "加入2倍体积Bind Beads BC至产物中",
"parameters": {
"source": "P2",
"target": "P11",
"tip_rack": "BC230",
"volume": 100
}
},
{
"step_number": 3,
"operation": "move_labware",
"description": "移动P11至Orbital1用于振荡混匀",
"parameters": {
"source": "P11",
"target": "Orbital1"
}
},
{
"step_number": 4,
"operation": "oscillation",
"description": "在Orbital1上振荡混匀Bind Beads BC与PCR产物700-900rpm300秒",
"parameters": {
"rpm": 800,
"time": 300
}
},
{
"step_number": 5,
"operation": "move_labware",
"description": "移动混匀后的板回P11",
"parameters": {
"source": "Orbital1",
"target": "P11"
}
},
{
"step_number": 6,
"operation": "move_labware",
"description": "将P11移动到磁力架P12吸附3分钟",
"parameters": {
"source": "P11",
"target": "P12"
}
},
{
"step_number": 7,
"operation": "incubation",
"description": "磁力架上室温静置3分钟完成吸附",
"parameters": {
"time": 180
}
},
{
"step_number": 8,
"operation": "transfer",
"description": "去除上清液至废液槽",
"parameters": {
"source": "P12",
"target": "P22",
"tip_rack": "BC230",
"volume": 150
}
},
{
"step_number": 9,
"operation": "transfer",
"description": "加入300-500μl 75%乙醇清洗",
"parameters": {
"source": "P3",
"target": "P12",
"tip_rack": "BC230",
"volume": 400
}
},
{
"step_number": 10,
"operation": "move_labware",
"description": "移动清洗板到Orbital1进行振荡",
"parameters": {
"source": "P12",
"target": "Orbital1"
}
},
{
"step_number": 11,
"operation": "oscillation",
"description": "乙醇清洗液振荡混匀700-900rpm, 45秒",
"parameters": {
"rpm": 800,
"time": 45
}
},
{
"step_number": 12,
"operation": "move_labware",
"description": "振荡后将板移回磁力架P12吸附",
"parameters": {
"source": "Orbital1",
"target": "P12"
}
},
{
"step_number": 13,
"operation": "incubation",
"description": "吸附3分钟",
"parameters": {
"time": 180
}
},
{
"step_number": 14,
"operation": "transfer",
"description": "去除乙醇上清液至废液槽",
"parameters": {
"source": "P12",
"target": "P22",
"tip_rack": "BC230",
"volume": 400
}
},
{
"step_number": 15,
"operation": "transfer",
"description": "第二次加入300-500μl 75%乙醇清洗",
"parameters": {
"source": "P3",
"target": "P12",
"tip_rack": "BC230",
"volume": 400
}
},
{
"step_number": 16,
"operation": "move_labware",
"description": "再次移动清洗板到Orbital1振荡",
"parameters": {
"source": "P12",
"target": "Orbital1"
}
},
{
"step_number": 17,
"operation": "oscillation",
"description": "再次乙醇清洗液振荡混匀700-900rpm, 45秒",
"parameters": {
"rpm": 800,
"time": 45
}
},
{
"step_number": 18,
"operation": "move_labware",
"description": "振荡后板送回磁力架P12吸附",
"parameters": {
"source": "Orbital1",
"target": "P12"
}
},
{
"step_number": 19,
"operation": "incubation",
"description": "再次吸附3分钟",
"parameters": {
"time": 180
}
},
{
"step_number": 20,
"operation": "transfer",
"description": "去除乙醇上清液至废液槽",
"parameters": {
"source": "P12",
"target": "P22",
"tip_rack": "BC230",
"volume": 400
}
},
{
"step_number": 21,
"operation": "incubation",
"description": "空气干燥15分钟",
"parameters": {
"time": 900
}
},
{
"step_number": 22,
"operation": "transfer",
"description": "加30-50μl Elution Buffer洗脱",
"parameters": {
"source": "P4",
"target": "P12",
"tip_rack": "BC230",
"volume": 40
}
},
{
"step_number": 23,
"operation": "move_labware",
"description": "移动到Orbital1振荡混匀60秒",
"parameters": {
"source": "P12",
"target": "Orbital1"
}
},
{
"step_number": 24,
"operation": "oscillation",
"description": "Elution Buffer振荡混匀700-900rpm, 60秒",
"parameters": {
"rpm": 800,
"time": 60
}
},
{
"step_number": 25,
"operation": "move_labware",
"description": "振荡后送回磁力架P12",
"parameters": {
"source": "Orbital1",
"target": "P12"
}
},
{
"step_number": 26,
"operation": "incubation",
"description": "室温静置3分钟洗脱反应",
"parameters": {
"time": 180
}
},
{
"step_number": 27,
"operation": "transfer",
"description": "将上清液DNA转移到新板P13",
"parameters": {
"source": "P12",
"target": "P13",
"tip_rack": "BC230",
"volume": 40
}
}
]
}
'''
labware_with_liquid = '''
[
{
"id": "Tip Rack BC230 on TL1",
"parent": "deck",
"slot_on_deck": "TL1",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on TL2",
"parent": "deck",
"slot_on_deck": "TL2",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on TL3",
"parent": "deck",
"slot_on_deck": "TL3",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on TL4",
"parent": "deck",
"slot_on_deck": "TL4",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on TL5",
"parent": "deck",
"slot_on_deck": "TL5",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on P5",
"parent": "deck",
"slot_on_deck": "P5",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on P6",
"parent": "deck",
"slot_on_deck": "P6",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on P15",
"parent": "deck",
"slot_on_deck": "P15",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "Tip Rack BC230 on P16",
"parent": "deck",
"slot_on_deck": "P16",
"class_name": "BC230",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "stock plate on P1",
"parent": "deck",
"slot_on_deck": "P1",
"class_name": "nest_12_reservoir_15ml",
"liquid_type": [
"master_mix"
],
"liquid_volume": [10000],
"liquid_input_wells": [
"A1"
]
},
{
"id": "stock plate on P2",
"parent": "deck",
"slot_on_deck": "P2",
"class_name": "nest_12_reservoir_15ml",
"liquid_type": [
"bind beads"
],
"liquid_volume": [10000],
"liquid_input_wells": [
"A1"
]
},
{
"id": "stock plate on P3",
"parent": "deck",
"slot_on_deck": "P3",
"class_name": "nest_12_reservoir_15ml",
"liquid_type": [
"ethyl alcohol"
],
"liquid_volume": [10000],
"liquid_input_wells": [
"A1"
]
},
{
"id": "elution buffer on P4",
"parent": "deck",
"slot_on_deck": "P4",
"class_name": "nest_12_reservoir_15ml",
"liquid_type": [
"elution buffer"
],
"liquid_volume": [5000],
"liquid_input_wells": [
"A1"
]
},
{
"id": "oscillation",
"parent": "deck",
"slot_on_deck": "Orbital1",
"class_name": "Orbital",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "working plate on P11",
"parent": "deck",
"slot_on_deck": "P11",
"class_name": "NEST 2ml Deep Well Plate",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "magnetics module on P12",
"parent": "deck",
"slot_on_deck": "P12",
"class_name": "magnetics module",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "working plate on P13",
"parent": "deck",
"slot_on_deck": "P13",
"class_name": "NEST 2ml Deep Well Plate",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
},
{
"id": "waste on P22",
"parent": "deck",
"slot_on_deck": "P22",
"class_name": "nest_1_reservoir_195ml",
"liquid_type": [],
"liquid_volume": [],
"liquid_input_wells": []
}
]
'''
handler = LiquidHandlerBiomek()
handler.temp_protocol = {
"meta": {},
"labwares": [],
"steps": []
}
input_steps = json.loads(steps_info)
labwares = json.loads(labware_with_liquid)
for step in input_steps['steps']:
operation = step['operation']
parameters = step['parameters']
if operation == 'transfer':
handler.transfer_biomek(source=parameters['source'],
target=parameters['target'],
volume=parameters['volume'],
tip_rack=parameters['tip_rack'],
aspirate_techniques='MC P300 high',
dispense_techniques='MC P300 high')
elif operation == 'move_labware':
handler.move_biomek(source=parameters['source'],
target=parameters['target'])
elif operation == 'oscillation':
handler.oscillation_biomek(rpm=parameters['rpm'],
time=parameters['time'])
elif operation == 'incubation':
handler.incubation_biomek(time=parameters['time'])
print(json.dumps(handler.temp_protocol, indent=4))

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -6,13 +6,8 @@ import asyncio
import time import time
from pylabrobot.liquid_handling import LiquidHandler from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.resources import ( from pylabrobot.resources import Resource, TipRack, Container, Coordinate, Well
Resource,
TipRack,
Container,
Coordinate,
Well
)
class LiquidHandlerAbstract(LiquidHandler): class LiquidHandlerAbstract(LiquidHandler):
"""Extended LiquidHandler with additional operations.""" """Extended LiquidHandler with additional operations."""
@@ -21,6 +16,19 @@ class LiquidHandlerAbstract(LiquidHandler):
# REMOVE LIQUID -------------------------------------------------- # REMOVE LIQUID --------------------------------------------------
# --------------------------------------------------------------- # ---------------------------------------------------------------
async def create_protocol(
self,
protocol_name: str,
protocol_description: str,
protocol_version: str,
protocol_author: str,
protocol_date: str,
protocol_type: str,
none_keys: List[str] = [],
):
"""Create a new protocol with the given metadata."""
pass
async def remove_liquid( async def remove_liquid(
self, self,
vols: List[float], vols: List[float],
@@ -35,26 +43,26 @@ class LiquidHandlerAbstract(LiquidHandler):
spread: Optional[Literal["wide", "tight", "custom"]] = "wide", spread: Optional[Literal["wide", "tight", "custom"]] = "wide",
delays: Optional[List[int]] = None, delays: Optional[List[int]] = None,
is_96_well: Optional[bool] = False, is_96_well: Optional[bool] = False,
top: Optional[List(float)] = None, top: Optional[List[float]] = None,
none_keys: List[str] = [] none_keys: List[str] = [],
): ):
"""A complete *remove* (aspirate → waste) operation.""" """A complete *remove* (aspirate → waste) operation."""
trash = self.deck.get_trash_area() trash = self.deck.get_trash_area()
try: try:
if is_96_well: if is_96_well:
pass # This mode is not verified pass # This mode is not verified
else: else:
if len(vols) != len(sources): if len(vols) != len(sources):
raise ValueError("Length of `vols` must match `sources`.") raise ValueError("Length of `vols` must match `sources`.")
for src, vol in zip(sources, vols): for src, vol in zip(sources, vols):
self.move_to(src, dis_to_top=top[0] if top else 0) await self.move_to(src, dis_to_top=top[0] if top else 0)
tip = next(self.current_tip) tip = next(self.current_tip)
await self.pick_up_tips(tip) await self.pick_up_tips(tip)
await self.aspirate( await self.aspirate(
resources=[src], resources=[src],
vols=[vol], vols=[vol],
use_channels=use_channels, # only aspirate96 used, default to None use_channels=use_channels, # only aspirate96 used, default to None
flow_rates=[flow_rates[0]] if flow_rates else None, flow_rates=[flow_rates[0]] if flow_rates else None,
offsets=[offsets[0]] if offsets else None, offsets=[offsets[0]] if offsets else None,
liquid_height=[liquid_height[0]] if liquid_height else None, liquid_height=[liquid_height[0]] if liquid_height else None,
@@ -64,15 +72,15 @@ class LiquidHandlerAbstract(LiquidHandler):
await self.custom_delay(seconds=delays[0] if delays else 0) await self.custom_delay(seconds=delays[0] if delays else 0)
await self.dispense( await self.dispense(
resources=waste_liquid, resources=waste_liquid,
vols=[vol], vols=[vol],
use_channels=use_channels, use_channels=use_channels,
flow_rates=[flow_rates[1]] if flow_rates else None, flow_rates=[flow_rates[1]] if flow_rates else None,
offsets=[offsets[1]] if offsets else None, offsets=[offsets[1]] if offsets else None,
liquid_height=[liquid_height[1]] if liquid_height else None, liquid_height=[liquid_height[1]] if liquid_height else None,
blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None, blow_out_air_volume=blow_out_air_volume[1] if blow_out_air_volume else None,
spread=spread, spread=spread,
) )
await self.discard_tips() # For now, each of tips is discarded after use await self.discard_tips() # For now, each of tips is discarded after use
except Exception as e: except Exception as e:
raise RuntimeError(f"Liquid removal failed: {e}") from e raise RuntimeError(f"Liquid removal failed: {e}") from e
@@ -100,13 +108,13 @@ class LiquidHandlerAbstract(LiquidHandler):
mix_vol: Optional[int] = None, mix_vol: Optional[int] = None,
mix_rate: Optional[int] = None, mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None, mix_liquid_height: Optional[float] = None,
none_keys: List[str] = [] none_keys: List[str] = [],
): ):
"""A complete *add* (aspirate reagent → dispense into targets) operation.""" """A complete *add* (aspirate reagent → dispense into targets) operation."""
try: try:
if is_96_well: if is_96_well:
pass # This mode is not verified. pass # This mode is not verified.
else: else:
if len(asp_vols) != len(targets): if len(asp_vols) != len(targets):
raise ValueError("Length of `vols` must match `targets`.") raise ValueError("Length of `vols` must match `targets`.")
@@ -122,7 +130,7 @@ class LiquidHandlerAbstract(LiquidHandler):
offsets=[offsets[0]] if offsets else None, offsets=[offsets[0]] if offsets else None,
liquid_height=[liquid_height[0]] if liquid_height else None, liquid_height=[liquid_height[0]] if liquid_height else None,
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
spread=spread spread=spread,
) )
if delays is not None: if delays is not None:
await self.custom_delay(seconds=delays[0]) await self.custom_delay(seconds=delays[0])
@@ -144,7 +152,8 @@ class LiquidHandlerAbstract(LiquidHandler):
mix_vol=mix_vol, mix_vol=mix_vol,
offsets=offsets if offsets else None, offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None) mix_rate=mix_rate if mix_rate else None,
)
if delays is not None: if delays is not None:
await self.custom_delay(seconds=delays[1]) await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_]) await self.touch_tip(targets[_])
@@ -158,13 +167,13 @@ class LiquidHandlerAbstract(LiquidHandler):
# --------------------------------------------------------------- # ---------------------------------------------------------------
async def transfer_liquid( async def transfer_liquid(
self, self,
asp_vols: Union[List[float], float],
dis_vols: Union[List[float], float],
sources: Sequence[Container], sources: Sequence[Container],
targets: Sequence[Container], targets: Sequence[Container],
tip_racks: Sequence[TipRack], tip_racks: Sequence[TipRack],
*, *,
use_channels: Optional[List[int]] = None, use_channels: Optional[List[int]] = None,
asp_vols: Union[List[float], float],
dis_vols: Union[List[float], float],
asp_flow_rates: Optional[List[Optional[float]]] = None, asp_flow_rates: Optional[List[Optional[float]]] = None,
dis_flow_rates: Optional[List[Optional[float]]] = None, dis_flow_rates: Optional[List[Optional[float]]] = None,
offsets: Optional[List[Coordinate]] = None, offsets: Optional[List[Coordinate]] = None,
@@ -179,7 +188,7 @@ class LiquidHandlerAbstract(LiquidHandler):
mix_rate: Optional[int] = None, mix_rate: Optional[int] = None,
mix_liquid_height: Optional[float] = None, mix_liquid_height: Optional[float] = None,
delays: Optional[List[int]] = None, delays: Optional[List[int]] = None,
none_keys: List[str] = [] none_keys: List[str] = [],
): ):
"""Transfer liquid from each *source* well/plate to the corresponding *target*. """Transfer liquid from each *source* well/plate to the corresponding *target*.
@@ -201,14 +210,15 @@ class LiquidHandlerAbstract(LiquidHandler):
# 96channel head mode # 96channel head mode
# ------------------------------------------------------------------ # ------------------------------------------------------------------
if is_96_well: if is_96_well:
pass # This mode is not verified pass # This mode is not verified
else: else:
if not (len(asp_vols) == len(sources) and len(dis_vols) == len(targets)): if not (len(asp_vols) == len(sources) and len(dis_vols) == len(targets)):
raise ValueError("`sources`, `targets`, and `vols` must have the same length.") raise ValueError("`sources`, `targets`, and `vols` must have the same length.")
tip_iter = self.iter_tips(tip_racks) tip_iter = self.iter_tips(tip_racks)
for src, tgt, asp_vol, asp_flow_rate, dis_vol, dis_flow_rate in ( for src, tgt, asp_vol, asp_flow_rate, dis_vol, dis_flow_rate in zip(
zip(sources, targets, asp_vols, asp_flow_rates, dis_vols, dis_flow_rates)): sources, targets, asp_vols, asp_flow_rates, dis_vols, dis_flow_rates
):
tip = next(tip_iter) tip = next(tip_iter)
await self.pick_up_tips(tip) await self.pick_up_tips(tip)
# Aspirate from source # Aspirate from source
@@ -247,9 +257,9 @@ class LiquidHandlerAbstract(LiquidHandler):
except Exception as exc: except Exception as exc:
raise RuntimeError(f"Liquid transfer failed: {exc}") from exc raise RuntimeError(f"Liquid transfer failed: {exc}") from exc
# --------------------------------------------------------------- # ---------------------------------------------------------------
# Helper utilities # Helper utilities
# --------------------------------------------------------------- # ---------------------------------------------------------------
async def custom_delay(self, seconds=0, msg=None): async def custom_delay(self, seconds=0, msg=None):
""" """
@@ -266,28 +276,26 @@ class LiquidHandlerAbstract(LiquidHandler):
print(f"Done: {msg}") print(f"Done: {msg}")
print(f"Current time: {time.strftime('%H:%M:%S')}") print(f"Current time: {time.strftime('%H:%M:%S')}")
async def touch_tip(self, async def touch_tip(self, targets: Sequence[Container]):
targets: Sequence[Container],
):
"""Touch the tip to the side of the well.""" """Touch the tip to the side of the well."""
await self.aspirate( await self.aspirate(
resources=[targets], resources=[targets],
vols=[0], vols=[0],
use_channels=None, use_channels=None,
flow_rates=None, flow_rates=None,
offsets=[Coordinate(x=-targets.get_size_x()/2,y=0,z=0)], offsets=[Coordinate(x=-targets.get_size_x() / 2, y=0, z=0)],
liquid_height=None, liquid_height=None,
blow_out_air_volume=None blow_out_air_volume=None,
) )
#await self.custom_delay(seconds=1) # In the simulation, we do not need to wait # await self.custom_delay(seconds=1) # In the simulation, we do not need to wait
await self.aspirate( await self.aspirate(
resources=[targets], resources=[targets],
vols=[0], vols=[0],
use_channels=None, use_channels=None,
flow_rates=None, flow_rates=None,
offsets=[Coordinate(x=targets.get_size_x()/2,y=0,z=0)], offsets=[Coordinate(x=targets.get_size_x() / 2, y=0, z=0)],
liquid_height=None, liquid_height=None,
blow_out_air_volume=None blow_out_air_volume=None,
) )
async def mix( async def mix(
@@ -298,9 +306,9 @@ class LiquidHandlerAbstract(LiquidHandler):
height_to_bottom: Optional[float] = None, height_to_bottom: Optional[float] = None,
offsets: Optional[Coordinate] = None, offsets: Optional[Coordinate] = None,
mix_rate: Optional[float] = None, mix_rate: Optional[float] = None,
none_keys: List[str] = [] none_keys: List[str] = [],
): ):
if mix_time is None: # No mixing required if mix_time is None: # No mixing required
return return
"""Mix the liquid in the target wells.""" """Mix the liquid in the target wells."""
for _ in range(mix_time): for _ in range(mix_time):
@@ -333,7 +341,7 @@ class LiquidHandlerAbstract(LiquidHandler):
tip_iter = self.iter_tips(tip_racks) tip_iter = self.iter_tips(tip_racks)
self.current_tip = tip_iter self.current_tip = tip_iter
async def move_to(self, well: Well, dis_to_top: float = 0 , channel: int = 0): async def move_to(self, well: Well, dis_to_top: float = 0, channel: int = 0):
""" """
Move a single channel to a specific well with a given z-height. Move a single channel to a specific well with a given z-height.
@@ -352,4 +360,3 @@ class LiquidHandlerAbstract(LiquidHandler):
await self.move_channel_x(channel, abs_loc.x) await self.move_channel_x(channel, abs_loc.x)
await self.move_channel_y(channel, abs_loc.y) await self.move_channel_y(channel, abs_loc.y)
await self.move_channel_z(channel, abs_loc.z + well_height + dis_to_top) await self.move_channel_z(channel, abs_loc.z + well_height + dis_to_top)

View File

@@ -0,0 +1,154 @@
import json
from typing import Sequence, Optional, List, Union, Literal
json_path = "/Users/guangxinzhang/Documents/Deep Potential/opentrons/convert/protocols/enriched_steps/sci-lucif-assay4.json"
with open(json_path, "r") as f:
data = json.load(f)
transfer_example = data[0]
#print(transfer_example)
temp_protocol = []
TipLocation = "BC1025F" # Assuming this is a fixed tip location for the transfer
sources = transfer_example["sources"] # Assuming sources is a list of Container objects
targets = transfer_example["targets"] # Assuming targets is a list of Container objects
tip_racks = transfer_example["tip_racks"] # Assuming tip_racks is a list of TipRack objects
asp_vols = transfer_example["asp_vols"] # Assuming asp_vols is a list of volumes
solvent = "PBS"
def transfer_liquid(
#self,
sources,#: Sequence[Container],
targets,#: Sequence[Container],
tip_racks,#: Sequence[TipRack],
TipLocation,
# *,
# use_channels: Optional[List[int]] = None,
asp_vols: Union[List[float], float],
solvent: Optional[str] = None,
# dis_vols: Union[List[float], float],
# asp_flow_rates: Optional[List[Optional[float]]] = None,
# dis_flow_rates: Optional[List[Optional[float]]] = None,
# offsets,#: Optional[List[]] = None,
# touch_tip: bool = False,
# liquid_height: Optional[List[Optional[float]]] = None,
# blow_out_air_volume: Optional[List[Optional[float]]] = None,
# spread: Literal["wide", "tight", "custom"] = "wide",
# is_96_well: bool = False,
# mix_stage: Optional[Literal["none", "before", "after", "both"]] = "none",
# mix_times,#: Optional[list() = None,
# mix_vol: Optional[int] = None,
# mix_rate: Optional[int] = None,
# mix_liquid_height: Optional[float] = None,
# delays: Optional[List[int]] = None,
# none_keys: List[str] = []
):
# -------- Build Biomek transfer step --------
# 1) Construct default parameter scaffold (values mirror Biomek “Transfer” block).
transfer_params = {
"Span8": False,
"Pod": "Pod1",
"items": {}, # to be filled below
"Wash": False,
"Dynamic?": True,
"AutoSelectActiveWashTechnique": False,
"ActiveWashTechnique": "",
"ChangeTipsBetweenDests": False,
"ChangeTipsBetweenSources": False,
"DefaultCaption": "", # filled after we know first pair/vol
"UseExpression": False,
"LeaveTipsOn": False,
"MandrelExpression": "",
"Repeats": "1",
"RepeatsByVolume": False,
"Replicates": "1",
"ShowTipHandlingDetails": False,
"ShowTransferDetails": True,
"Solvent": "Water",
"Span8Wash": False,
"Span8WashVolume": "2",
"Span8WasteVolume": "1",
"SplitVolume": False,
"SplitVolumeCleaning": False,
"Stop": "Destinations",
"TipLocation": "BC1025F",
"UseCurrentTips": False,
"UseDisposableTips": True,
"UseFixedTips": False,
"UseJIT": True,
"UseMandrelSelection": True,
"UseProbes": [True, True, True, True, True, True, True, True],
"WashCycles": "1",
"WashVolume": "110%",
"Wizard": False
}
items: dict = {}
for idx, (src, dst) in enumerate(zip(sources, targets)):
items[str(idx)] = {
"Source": str(src),
"Destination": str(dst),
"Volume": asp_vols[idx]
}
transfer_params["items"] = items
transfer_params["Solvent"] = solvent if solvent else "Water"
transfer_params["TipLocation"] = TipLocation
if len(tip_racks) == 1:
transfer_params['UseCurrentTips'] = True
elif len(tip_racks) > 1:
transfer_params["ChangeTipsBetweenDests"] = True
return transfer_params
action = transfer_liquid(sources=sources,targets=targets,tip_racks=tip_racks, asp_vols=asp_vols,solvent = solvent, TipLocation=TipLocation)
print(json.dumps(action,indent=2))
# print(action)
"""
"transfer": {
"items": {},
"Wash": false,
"Dynamic?": true,
"AutoSelectActiveWashTechnique": false,
"ActiveWashTechnique": "",
"ChangeTipsBetweenDests": true,
"ChangeTipsBetweenSources": false,
"DefaultCaption": "Transfer 100 µL from P13 to P3",
"UseExpression": false,
"LeaveTipsOn": false,
"MandrelExpression": "",
"Repeats": "1",
"RepeatsByVolume": false,
"Replicates": "1",
"ShowTipHandlingDetails": true,
"ShowTransferDetails": true,
"Span8Wash": false,
"Span8WashVolume": "2",
"Span8WasteVolume": "1",
"SplitVolume": false,
"SplitVolumeCleaning": false,
"Stop": "Destinations",
"TipLocation": "BC1025F",
"UseCurrentTips": false,
"UseDisposableTips": false,
"UseFixedTips": false,
"UseJIT": true,
"UseMandrelSelection": true,
"UseProbes": [true, true, true, true, true, true, true, true],
"WashCycles": "3",
"WashVolume": "110%",
"Wizard": false
"""

File diff suppressed because it is too large Load Diff

View File

@@ -5,22 +5,22 @@ class SolenoidValveMock:
def __init__(self, port: str = "COM6"): def __init__(self, port: str = "COM6"):
self._status = "Idle" self._status = "Idle"
self._valve_position = "OPEN" self._valve_position = "OPEN"
@property @property
def status(self) -> str: def status(self) -> str:
return self._status return self._status
@property @property
def valve_position(self) -> str: def valve_position(self) -> str:
return self._valve_position return self._valve_position
def get_valve_position(self) -> str: def get_valve_position(self) -> str:
return self._valve_position return self._valve_position
def set_valve_position(self, position): def set_valve_position(self, position):
self._status = "Busy" self._status = "Busy"
time.sleep(5) time.sleep(5)
self._valve_position = position self._valve_position = position
time.sleep(5) time.sleep(5)
self._status = "Idle" self._status = "Idle"

View File

@@ -4,17 +4,17 @@ import time
class VacuumPumpMock: class VacuumPumpMock:
def __init__(self, port: str = "COM6"): def __init__(self, port: str = "COM6"):
self._status = "OPEN" self._status = "OPEN"
@property @property
def status(self) -> str: def status(self) -> str:
return self._status return self._status
def get_status(self) -> str: def get_status(self) -> str:
return self._status return self._status
def set_status(self, position): def set_status(self, position):
time.sleep(5) time.sleep(5)
self._status = position self._status = position
time.sleep(5) time.sleep(5)

View File

@@ -1,5 +1,6 @@
liquid_handler: liquid_handler:
description: Liquid handler device controlled by pylabrobot description: Liquid handler device controlled by pylabrobot
icon: icon_yiyezhan.webp
class: class:
module: unilabos.devices.liquid_handling.liquid_handler_abstract:LiquidHandlerAbstract module: unilabos.devices.liquid_handling.liquid_handler_abstract:LiquidHandlerAbstract
type: python type: python
@@ -22,8 +23,8 @@ liquid_handler:
is_96_well: is_96_well is_96_well: is_96_well
top: top top: top
none_keys: none_keys none_keys: none_keys
feedback: { } feedback: {}
result: { } result: {}
add_liquid: add_liquid:
type: LiquidHandlerAdd type: LiquidHandlerAdd
goal: goal:
@@ -43,8 +44,8 @@ liquid_handler:
mix_rate: mix_rate mix_rate: mix_rate
mix_liquid_height: mix_liquid_height mix_liquid_height: mix_liquid_height
none_keys: none_keys none_keys: none_keys
feedback: { } feedback: {}
result: { } result: {}
transfer_liquid: transfer_liquid:
type: LiquidHandlerTransfer type: LiquidHandlerTransfer
goal: goal:
@@ -69,8 +70,8 @@ liquid_handler:
mix_liquid_height: mix_liquid_height mix_liquid_height: mix_liquid_height
delays: delays delays: delays
none_keys: none_keys none_keys: none_keys
feedback: { } feedback: {}
result: { } result: {}
mix: mix:
type: LiquidHandlerMix type: LiquidHandlerMix
goal: goal:
@@ -81,16 +82,16 @@ liquid_handler:
offsets: offsets offsets: offsets
mix_rate: mix_rate mix_rate: mix_rate
none_keys: none_keys none_keys: none_keys
feedback: { } feedback: {}
result: { } result: {}
move_to: move_to:
type: LiquidHandlerMoveTo type: LiquidHandlerMoveTo
goal: goal:
well: well well: well
dis_to_top: dis_to_top dis_to_top: dis_to_top
channel: channel channel: channel
feedback: { } feedback: {}
result: { } result: {}
aspirate: aspirate:
type: LiquidHandlerAspirate type: LiquidHandlerAspirate
goal: goal:
@@ -245,6 +246,21 @@ liquid_handler:
target_vols: target_vols target_vols: target_vols
aspiration_flow_rate: aspiration_flow_rate aspiration_flow_rate: aspiration_flow_rate
dispense_flow_rates: dispense_flow_rates dispense_flow_rates: dispense_flow_rates
handles:
input:
- handler_key: liquid-input
label: Liquid Input
data_type: resource
io_type: target
data_source: handle
data_key: liquid
output:
- handler_key: liquid-output
label: Liquid Output
data_type: resource
io_type: source
data_source: executor
data_key: liquid
schema: schema:
type: object type: object
properties: properties:
@@ -272,3 +288,101 @@ liquid_handler.revvity:
status: status status: status
result: result:
success: success success: success
liquid_handler.biomek:
description: Biomek液体处理器设备基于pylabrobot控制
icon: icon_yiyezhan.webp
class:
module: unilabos.devices.liquid_handling.biomek:LiquidHandlerBiomek
type: python
status_types: {}
action_value_mappings:
create_protocol:
type: LiquidHandlerProtocolCreation
goal:
protocol_name: protocol_name
protocol_description: protocol_description
protocol_version: protocol_version
protocol_author: protocol_author
protocol_date: protocol_date
protocol_type: protocol_type
none_keys: none_keys
feedback: {}
result: {}
run_protocol:
type: EmptyIn
goal: {}
feedback: {}
result: {}
transfer_liquid:
type: LiquidHandlerTransfer
goal:
asp_vols: asp_vols
dis_vols: dis_vols
sources: sources
targets: targets
tip_racks: tip_racks
use_channels: use_channels
asp_flow_rates: asp_flow_rates
dis_flow_rates: dis_flow_rates
offsets: offsets
touch_tip: touch_tip
liquid_height: liquid_height
blow_out_air_volume: blow_out_air_volume
spread: spread
is_96_well: is_96_well
mix_stage: mix_stage
mix_times: mix_times
mix_vol: mix_vol
mix_rate: mix_rate
mix_liquid_height: mix_liquid_height
delays: delays
none_keys: none_keys
feedback: {}
result: {}
handles:
input:
- handler_key: liquid-input
label: Liquid Input
data_type: resource
io_type: target
data_source: handle
data_key: liquid
output:
- handler_key: liquid-output
label: Liquid Output
data_type: resource
io_type: source
data_source: executor
data_key: liquid
transfer_biomek:
type: LiquidHandlerTransferBiomek
goal:
source: source
target: target
tip_rack: tip_rack
volume: volume
aspirate_techniques: aspirate_techniques
dispense_techniques: dispense_techniques
feedback: {}
result: {}
handles:
input:
- handler_key: liquid-input
label: Liquid Input
data_type: resource
io_type: target
data_source: handle
data_key: liquid
output:
- handler_key: liquid-output
label: Liquid Output
data_type: resource
io_type: source
data_source: executor
data_key: liquid
schema:
type: object
properties: {}
required: []
additionalProperties: false

View File

@@ -23,20 +23,51 @@ syringe_pump_with_valve.runze:
type: string type: string
description: The position of the valve description: The position of the valve
required: required:
- status - status
- position - position
- valve_position - valve_position
additionalProperties: false additionalProperties: false
solenoid_valve.mock: solenoid_valve.mock:
description: Mock solenoid valve description: Mock solenoid valve
class: class:
module: unilabos.devices.pump_and_valve.solenoid_valve_mock:SolenoidValveMock module: unilabos.devices.pump_and_valve.solenoid_valve_mock:SolenoidValveMock
type: python type: python
status_types:
status: String
valve_position: String
action_value_mappings:
open:
type: EmptyIn
goal: {}
feedback: {}
result: {}
close:
type: EmptyIn
goal: {}
feedback: {}
result: {}
handles:
input:
- handler_key: fluid-input
label: Fluid Input
data_type: fluid
output:
- handler_key: fluid-output
label: Fluid Output
data_type: fluid
init_param_schema:
type: object
properties:
port:
type: string
description: "通信端口"
default: "COM6"
required:
- port
solenoid_valve: solenoid_valve:
description: Solenoid valve description: Solenoid valve
class: class:
module: unilabos.devices.pump_and_valve.solenoid_valve:SolenoidValve module: unilabos.devices.pump_and_valve.solenoid_valve:SolenoidValve
type: python type: python

View File

@@ -22,9 +22,76 @@ vacuum_pump.mock:
string: string string: string
feedback: {} feedback: {}
result: {} result: {}
handles:
input:
- handler_key: fluid-input
label: Fluid Input
data_type: fluid
io_type: target
data_source: handle
data_key: fluid_in
output:
- handler_key: fluid-output
label: Fluid Output
data_type: fluid
io_type: source
data_source: executor
data_key: fluid_out
init_param_schema:
type: object
properties:
port:
type: string
description: "通信端口"
default: "COM6"
required:
- port
gas_source.mock: gas_source.mock:
description: Mock gas source description: Mock gas source
class: class:
module: unilabos.devices.pump_and_valve.vacuum_pump_mock:VacuumPumpMock module: unilabos.devices.pump_and_valve.vacuum_pump_mock:VacuumPumpMock
type: python type: python
status_types:
status: String
action_value_mappings:
open:
type: EmptyIn
goal: {}
feedback: {}
result: {}
close:
type: EmptyIn
goal: {}
feedback: {}
result: {}
set_status:
type: StrSingleInput
goal:
string: string
feedback: {}
result: {}
handles:
input:
- handler_key: fluid-input
label: Fluid Input
data_type: fluid
io_type: target
data_source: handle
data_key: fluid_in
output:
- handler_key: fluid-output
label: Fluid Output
data_type: fluid
io_type: source
data_source: executor
data_key: fluid_out
init_param_schema:
type: object
properties:
port:
type: string
description: "通信端口"
default: "COM6"
required:
- port

View File

@@ -4,4 +4,4 @@ workstation:
module: unilabos.ros.nodes.presets.protocol_node:ROS2ProtocolNode module: unilabos.ros.nodes.presets.protocol_node:ROS2ProtocolNode
type: ros2 type: ros2
schema: schema:
properties: {} properties: {}

View File

@@ -25,9 +25,7 @@ class Registry:
self.ResourceCreateFromOuterEasy = self._replace_type_with_class( self.ResourceCreateFromOuterEasy = self._replace_type_with_class(
"ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource" "ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource"
) )
self.EmptyIn = self._replace_type_with_class( self.EmptyIn = self._replace_type_with_class("EmptyIn", "host_node", f"")
"EmptyIn", "host_node", f""
)
self.device_type_registry = {} self.device_type_registry = {}
self.resource_type_registry = {} self.resource_type_registry = {}
self._setup_called = False # 跟踪setup是否已调用 self._setup_called = False # 跟踪setup是否已调用
@@ -66,6 +64,7 @@ class Registry:
"goal_default": yaml.safe_load( "goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal)) io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuter.Goal))
), ),
"handles": {},
}, },
"create_resource": { "create_resource": {
"type": self.ResourceCreateFromOuterEasy, "type": self.ResourceCreateFromOuterEasy,
@@ -86,6 +85,7 @@ class Registry:
"goal_default": yaml.safe_load( "goal_default": yaml.safe_load(
io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal)) io.StringIO(get_yaml_from_goal_type(self.ResourceCreateFromOuterEasy.Goal))
), ),
"handles": {},
}, },
"test_latency": { "test_latency": {
"type": self.EmptyIn, "type": self.EmptyIn,
@@ -94,11 +94,14 @@ class Registry:
"result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"}, "result": {"latency_ms": "latency_ms", "time_diff_ms": "time_diff_ms"},
"schema": ros_action_to_json_schema(self.EmptyIn), "schema": ros_action_to_json_schema(self.EmptyIn),
"goal_default": {}, "goal_default": {},
"handles": {},
}, },
}, },
}, },
"icon": "icon_device.webp", "icon": "icon_device.webp",
"registry_type": "device", "registry_type": "device",
"handles": [],
"init_param_schema": {},
"schema": {"properties": {}, "additionalProperties": False, "type": "object"}, "schema": {"properties": {}, "additionalProperties": False, "type": "object"},
"file_path": "/", "file_path": "/",
} }
@@ -132,6 +135,10 @@ class Registry:
resource_info["description"] = "" resource_info["description"] = ""
if "icon" not in resource_info: if "icon" not in resource_info:
resource_info["icon"] = "" resource_info["icon"] = ""
if "handles" not in resource_info:
resource_info["handles"] = []
if "init_param_schema" not in resource_info:
resource_info["init_param_schema"] = {}
resource_info["registry_type"] = "resource" resource_info["registry_type"] = "resource"
self.resource_type_registry.update(data) self.resource_type_registry.update(data)
logger.debug( logger.debug(
@@ -194,6 +201,10 @@ class Registry:
device_config["description"] = "" device_config["description"] = ""
if "icon" not in device_config: if "icon" not in device_config:
device_config["icon"] = "" device_config["icon"] = ""
if "handles" not in device_config:
device_config["handles"] = []
if "init_param_schema" not in device_config:
device_config["init_param_schema"] = {}
device_config["registry_type"] = "device" device_config["registry_type"] = "device"
if "class" in device_config: if "class" in device_config:
# 处理状态类型 # 处理状态类型
@@ -206,6 +217,8 @@ class Registry:
# 处理动作值映射 # 处理动作值映射
if "action_value_mappings" in device_config["class"]: if "action_value_mappings" in device_config["class"]:
for action_name, action_config in device_config["class"]["action_value_mappings"].items(): for action_name, action_config in device_config["class"]["action_value_mappings"].items():
if "handles" not in action_config:
action_config["handles"] = []
if "type" in action_config: if "type" in action_config:
action_config["type"] = self._replace_type_with_class( action_config["type"] = self._replace_type_with_class(
action_config["type"], device_id, f"动作 {action_name}" action_config["type"], device_id, f"动作 {action_name}"

View File

@@ -131,7 +131,7 @@ _msg_converter: Dict[Type, Any] = {
Bool: lambda x: Bool(data=bool(x)), Bool: lambda x: Bool(data=bool(x)),
str: str, str: str,
String: lambda x: String(data=str(x)), String: lambda x: String(data=str(x)),
Point: lambda x: Point(x=x.x, y=x.y, z=x.z), Point: lambda x: Point(x=x.x, y=x.y, z=x.z) if not isinstance(x, dict) else Point(x=x.get("x", 0), y=x.get("y", 0), z=x.get("z", 0)),
Resource: lambda x: Resource( Resource: lambda x: Resource(
id=x.get("id", ""), id=x.get("id", ""),
name=x.get("name", ""), name=x.get("name", ""),

View File

@@ -349,6 +349,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
response = rclient.call(request) response = rclient.call(request)
# 应该先add_resource了 # 应该先add_resource了
res.response = "OK" res.response = "OK"
# 如果driver自己就有assign的方法那就使用driver自己的assign方法
if hasattr(self.driver_instance, "create_resource"):
create_resource_func = getattr(self.driver_instance, "create_resource")
create_resource_func(
resource_tracker=self.resource_tracker,
resources=request.resources,
bind_parent_id=bind_parent_id,
bind_location=location,
liquid_input_slot=LIQUID_INPUT_SLOT,
liquid_type=ADD_LIQUID_TYPE,
liquid_volume=LIQUID_VOLUME,
slot_on_deck=slot,
)
return res
# 接下来该根据bind_parent_id进行assign了目前只有plr可以进行assign不然没有办法输入到物料系统中 # 接下来该根据bind_parent_id进行assign了目前只有plr可以进行assign不然没有办法输入到物料系统中
resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) resource = self.resource_tracker.figure_resource({"name": bind_parent_id})
# request.resources = [convert_to_ros_msg(Resource, resources)] # request.resources = [convert_to_ros_msg(Resource, resources)]
@@ -791,7 +805,9 @@ class ROS2DeviceNode:
self.resource_tracker = DeviceNodeResourceTracker() self.resource_tracker = DeviceNodeResourceTracker()
# use_pylabrobot_creator 使用 cls的包路径检测 # use_pylabrobot_creator 使用 cls的包路径检测
use_pylabrobot_creator = driver_class.__module__.startswith("pylabrobot") or driver_class.__name__ == "LiquidHandlerAbstract" use_pylabrobot_creator = (driver_class.__module__.startswith("pylabrobot")
or driver_class.__name__ == "LiquidHandlerAbstract"
or driver_class.__name__ == "LiquidHandlerBiomek")
# TODO: 要在创建之前预先请求服务器是否有当前id的物料放到resource_tracker中让pylabrobot进行创建 # TODO: 要在创建之前预先请求服务器是否有当前id的物料放到resource_tracker中让pylabrobot进行创建
# 创建设备类实例 # 创建设备类实例

View File

@@ -29,6 +29,8 @@ set(action_files
"action/HeatChillStart.action" "action/HeatChillStart.action"
"action/HeatChillStop.action" "action/HeatChillStop.action"
"action/LiquidHandlerProtocolCreation.action"
"action/LiquidHandlerAspirate.action" "action/LiquidHandlerAspirate.action"
"action/LiquidHandlerDiscardTips.action" "action/LiquidHandlerDiscardTips.action"
"action/LiquidHandlerDispense.action" "action/LiquidHandlerDispense.action"
@@ -43,6 +45,7 @@ set(action_files
"action/LiquidHandlerReturnTips96.action" "action/LiquidHandlerReturnTips96.action"
"action/LiquidHandlerStamp.action" "action/LiquidHandlerStamp.action"
"action/LiquidHandlerTransfer.action" "action/LiquidHandlerTransfer.action"
"action/LiquidHandlerTransferBiomek.action"
"action/LiquidHandlerAdd.action" "action/LiquidHandlerAdd.action"
"action/LiquidHandlerMix.action" "action/LiquidHandlerMix.action"

View File

@@ -0,0 +1,6 @@
string source
string target
---
bool success
---

View File

@@ -0,0 +1,6 @@
int32 rpm
int32 time
---
bool success
---

View File

@@ -0,0 +1,5 @@
int32 time
---
bool success
---

View File

@@ -0,0 +1,9 @@
string protocol_name
string protocol_description
string protocol_version
string protocol_author
string protocol_date
string protocol_type
string[] none_keys
---
---

View File

@@ -0,0 +1,10 @@
string source
string target
string tip_rack
float64 volume
string aspirate_technique
string dispense_technique
---
bool success
---