对于PRCXI9320的transfer_group,一对多和多对多

This commit is contained in:
Guangxin Zhang
2025-09-15 00:29:16 +08:00
parent 1cd07915e7
commit 94cdcbf24e
4 changed files with 844 additions and 254 deletions

View File

@@ -1,5 +1,5 @@
from __future__ import annotations
import re
import traceback
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from collections import Counter
@@ -558,14 +558,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
# ---------------------------------------------------------------
def set_group(self, group_name: str, wells: List[Well], volumes: List[float]):
if len(wells) != 8:
if self.channel_num == 8 and len(wells) != 8:
raise RuntimeError(f"Expected 8 wells, got {len(wells)}")
self.group_info[group_name] = wells
self.set_liquid(wells, [group_name] * len(wells), volumes)
async def transfer_group(self, source_group_name: str, target_group_name: str, unit_volume: float):
source_wells = self.group_info.get(source_group_name, [])
target_wells = self.group_info.get(target_group_name, [])
rack_info = dict()
for child in self.deck.children:
if issubclass(child.__class__, TipRack):
@@ -576,6 +578,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
break
else:
rack_info[rack.name] = (rack, tip.maximal_volume - unit_volume)
if len(rack_info) == 0:
raise ValueError(f"No tip rack can support volume {unit_volume}.")
@@ -583,24 +586,40 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
for child in self.deck.children:
if child.name == rack_info[0][0]:
target_rack = child
target_rack = cast(TipRack, target_rack)
available_tips = {}
for (idx, name), tipSpot in zip(target_rack._ordering.items(), target_rack.get_all_items()):
for (idx, tipSpot) in enumerate(target_rack.get_all_items()):
if tipSpot.has_tip():
available_tips[idx] = tipSpot
continue
# 一般移动液体有两种方式,一对多和多对多
if self.channel_num == 8:
colnum_list = [int(holename[1:]) for holename in available_tips.keys()]
available_cols = [colnum for colnum, count in dict(Counter(colnum_list)).items() if count == 8]
available_cols.sort() # 这样就确定了列号
tips_to_use = [available_tips[f"{chr(65 + i)}{available_cols[0]}"] for i in range(8)]
tip_prefix = list(available_tips.values())[0].name.split('_')[0]
colnum_list = [int(tip.name.split('_')[-1][1:]) for tip in available_tips.values()]
available_cols = [colnum for colnum, count in dict(Counter(colnum_list)).items() if count == 8]
available_cols.sort()
available_tips_dict = {tip.name: tip for tip in available_tips.values()}
tips_to_use = [available_tips_dict[f"{tip_prefix}_{chr(65 + i)}{available_cols[0]}"] for i in range(8)]
await self.pick_up_tips(tips_to_use, use_channels=list(range(0, 8)))
await self.aspirate(source_wells, [unit_volume] * 8, use_channels=list(range(0, 8)))
await self.dispense(target_wells, [unit_volume] * 8, use_channels=list(range(0, 8)))
await self.discard_tips(use_channels=list(range(0, 8)))
await self.pick_up_tips(tips_to_use, use_channels=list(range(0, 8)))
await self.aspirate(source_wells, [10] * 8, use_channels=list(range(0, 8)))
await self.dispense(target_wells, [10] * 8, use_channels=list(range(0, 8)))
await self.discard_tips(use_channels=list(range(0, 8)))
elif self.channel_num == 1:
for num_well in range(len(target_wells)):
tip_to_use = available_tips[list(available_tips.keys())[num_well]]
await self.pick_up_tips([tip_to_use], use_channels=[0])
if len(source_wells) == 1:
await self.aspirate([source_wells[0]], [unit_volume], use_channels=[0])
else:
await self.aspirate([source_wells[num_well]], [unit_volume], use_channels=[0])
await self.dispense([target_wells[num_well]], [unit_volume], use_channels=[0])
await self.discard_tips(use_channels=[0])
else:
raise ValueError(f"Unsupported channel number {self.channel_num}.")
async def create_protocol(
self,