@@ -147,6 +147,9 @@ class LiquidHandlerMiddleware(LiquidHandler):
offsets : Optional [ List [ Coordinate ] ] = None ,
* * backend_kwargs ,
) :
# 如果 use_channels 为 None, 使用默认值( 所有通道)
if use_channels is None :
use_channels = list ( range ( self . channel_num ) )
if not offsets or ( isinstance ( offsets , list ) and len ( offsets ) != len ( use_channels ) ) :
offsets = [ Coordinate . zero ( ) ] * len ( use_channels )
if self . _simulator :
@@ -759,7 +762,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume = current_dis_blow_out_air_volume ,
spread = spread ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . touch_tip ( current_targets )
await self . discard_tips ( )
@@ -833,8 +836,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
spread = spread ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
# 只有在 mix_time 有效时才调用 mix
if mix_time is not None and mix_time > 0 :
await self . mix (
targets = [ targets [ _ ] ] ,
mix_time = mix_time ,
@@ -843,7 +848,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . touch_tip ( targets [ _ ] )
await self . discard_tips ( )
@@ -893,9 +898,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume = current_dis_blow_out_air_volume ,
spread = spread ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
# 只有在 mix_time 有效时才调用 mix
if mix_time is not None and mix_time > 0 :
await self . mix (
targets = current_targets ,
mix_time = mix_time ,
@@ -904,7 +911,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . touch_tip ( current_targets )
await self . discard_tips ( )
@@ -942,29 +949,114 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
delays : Optional [ List [ int ] ] = None ,
none_keys : List [ str ] = [ ] ,
) :
""" Transfer liquid from each *source* well/plate to the corresponding *target* .
""" Transfer liquid with automatic mode detection .
Supports three transfer modes:
1. One-to-many (1 source -> N targets): Distribute from one source to multiple targets
2. One-to-one (N sources -> N targets): Standard transfer, each source to corresponding target
3. Many-to-one (N sources -> 1 target): Combine multiple sources into one target
Parameters
----------
asp_vols, dis_vols
Single volume (µL) or list matching the number of transfers .
Single volume (µL) or list. Automatically expanded based on transfer mode .
sources, targets
Same‑ length sequences of c ontainers (wells or plates). In 96‑ well mode
each must contain exactly one plate.
C ontainers (wells or plates). Length determines transfer mode:
- len(sources) == 1, len(targets) > 1: One-to-many mode
- len(sources) == len(targets): One-to-one mode
- len(sources) > 1, len(targets) == 1: Many-to-one mode
tip_racks
One or more TipRacks providing fresh tips.
is_96_well
Set *True* to use the 96‑ channel head.
"""
# 确保 use_channels 有默认值
if use_channels is None :
use_channels = [ 0 ] if self . channel_num > = 1 else list ( range ( self . channel_num ) )
if is_96_well :
pass # This mode is not verified.
else :
# 转换体积参数为列表
if isinstance ( asp_vols , ( int , float ) ) :
asp_vols = [ float ( asp_vols ) ]
else :
asp_vols = [ float ( v ) for v in asp_vols ]
if isinstance ( dis_vols , ( int , float ) ) :
dis_vols = [ float ( dis_vols ) ]
else :
dis_vols = [ float ( v ) for v in dis_vols ]
# 识别传输模式
num_sources = len ( sources )
num_targets = len ( targets )
if num_sources == 1 and num_targets > 1 :
# 模式1: 一对多 (1 source -> N targets)
await self . _transfer_one_to_many (
sources [ 0 ] , targets , tip_racks , use_channels ,
asp_vols , dis_vols , asp_flow_rates , dis_flow_rates ,
offsets , touch_tip , liquid_height , blow_out_air_volume ,
spread , mix_stage , mix_times , mix_vol , mix_rate ,
mix_liquid_height , delays
)
elif num_sources > 1 and num_targets == 1 :
# 模式2: 多对一 (N sources -> 1 target)
await self . _transfer_many_to_one (
sources , targets [ 0 ] , tip_racks , use_channels ,
asp_vols , dis_vols , asp_flow_rates , dis_flow_rates ,
offsets , touch_tip , liquid_height , blow_out_air_volume ,
spread , mix_stage , mix_times , mix_vol , mix_rate ,
mix_liquid_height , delays
)
elif num_sources == num_targets :
# 模式3: 一对一 (N sources -> N targets) - 原有逻辑
await self . _transfer_one_to_one (
sources , targets , tip_racks , use_channels ,
asp_vols , dis_vols , asp_flow_rates , dis_flow_rates ,
offsets , touch_tip , liquid_height , blow_out_air_volume ,
spread , mix_stage , mix_times , mix_vol , mix_rate ,
mix_liquid_height , delays
)
else :
raise ValueError (
f " Unsupported transfer mode: { num_sources } sources -> { num_targets } targets. "
" Supported modes: 1->N, N->1, or N->N. "
)
async def _transfer_one_to_one (
self ,
sources : Sequence [ Container ] ,
targets : Sequence [ Container ] ,
tip_racks : Sequence [ TipRack ] ,
use_channels : List [ int ] ,
asp_vols : List [ float ] ,
dis_vols : List [ float ] ,
asp_flow_rates : Optional [ List [ Optional [ float ] ] ] ,
dis_flow_rates : Optional [ List [ Optional [ float ] ] ] ,
offsets : Optional [ List [ Coordinate ] ] ,
touch_tip : bool ,
liquid_height : Optional [ List [ Optional [ float ] ] ] ,
blow_out_air_volume : Optional [ List [ Optional [ float ] ] ] ,
spread : Literal [ " wide " , " tight " , " custom " ] ,
mix_stage : Optional [ Literal [ " none " , " before " , " after " , " both " ] ] ,
mix_times : Optional [ int ] ,
mix_vol : Optional [ int ] ,
mix_rate : Optional [ int ] ,
mix_liquid_height : Optional [ float ] ,
delays : Optional [ List [ int ] ] ,
) :
""" 一对一传输模式: N sources -> N targets """
# 验证参数长度
if len ( asp_vols ) != len ( targets ) :
raise ValueError ( f " Length of `asp_vols` { len ( asp_vols ) } must match `targets` { len ( targets ) } . " )
if len ( dis_vols ) != len ( targets ) :
raise ValueError ( f " Length of `dis_vols` { len ( dis_vols ) } must match `targets` { len ( targets ) } . " )
if len ( sources ) != len ( targets ) :
raise ValueError ( f " Length of `sources` { len ( sources ) } must match `targets` { len ( targets ) } . " )
# 首先应该对任务分组, 然后每次1个/8个进行操作处理
if len ( use_channels ) == 1 :
for _ in range ( len ( targets ) ) :
tip = [ ]
@@ -976,10 +1068,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
resources = [ sources [ _ ] ] ,
vols = [ asp_vols [ _ ] ] ,
use_channels = use_channels ,
flow_rates = [ asp_flow_rates [ 0 ] ] if asp_flow_rates else None ,
offsets = [ offsets [ 0 ] ] if offsets else None ,
liquid_height = [ liquid_height [ 0 ] ] if liquid_height else None ,
blow_out_air_volume = [ blow_out_air_volume [ 0 ] ] if blow_out_air_volume else None ,
flow_rates = [ asp_flow_rates [ _ ] ] if asp_flow_rates and len ( asp_flow_rates ) > _ else None ,
offsets = [ offsets [ _ ] ] if offsets and len ( offsets ) > _ else None ,
liquid_height = [ liquid_height [ _ ] ] if liquid_height and len ( liquid_height ) > _ else None ,
blow_out_air_volume = [ blow_out_air_volume [ _ ] ] if blow_out_air_volume and len ( blow_out_air_volume ) > _ else None ,
spread = spread ,
)
if delays is not None :
@@ -988,14 +1080,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
resources = [ targets [ _ ] ] ,
vols = [ dis_vols [ _ ] ] ,
use_channels = use_channels ,
flow_rates = [ dis_flow_rates [ 1 ] ] if dis_flow_rates else None ,
offsets = [ offsets [ 1 ] ] if offsets else None ,
blow_out_air_volume = [ blow_out_air_volume [ 1 ] ] if blow_out_air_volume else None ,
liquid_height = [ liquid_height [ 1 ] ] if liquid_height else None ,
flow_rates = [ dis_flow_rates [ _ ] ] if dis_flow_rates and len ( dis_flow_rates ) > _ else None ,
offsets = [ offsets [ _ ] ] if offsets and len ( offsets ) > _ else None ,
blow_out_air_volume = [ blow_out_air_volume [ _ ] ] if blow_out_air_volume and len ( blow_out_air_volume ) > _ else None ,
liquid_height = [ liquid_height [ _ ] ] if liquid_height and len ( liquid_height ) > _ else None ,
spread = spread ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
if mix_stage in [ " after " , " both " ] and mix_times is not None and mix_times > 0 :
await self . mix (
targets = [ targets [ _ ] ] ,
mix_time = mix_times ,
@@ -1004,20 +1097,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . touch_tip ( targets [ _ ] )
await self . discard_tips ( )
await self . discard_tips ( use_channels = use_channels )
elif len ( use_channels ) == 8 :
# 对于8个的情况, 需要判断此时任务是不是能被8通道移液站来成功处理
if len ( targets ) % 8 != 0 :
raise ValueError ( f " Length of `targets` { len ( targets ) } must be a multiple of 8 for 8-channel mode. " )
# 8个8个来取任务序列
for i in range ( 0 , len ( targets ) , 8 ) :
# 取出8个任务
tip = [ ]
for _ in range ( len ( use_channels ) ) :
tip . extend ( next ( self . current_tip ) )
@@ -1026,14 +1115,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
current_reagent_sources = sources [ i : i + 8 ]
current_asp_vols = asp_vols [ i : i + 8 ]
current_dis_vols = dis_vols [ i : i + 8 ]
current_asp_flow_rates = asp_flow_rates [ i : i + 8 ]
current_asp_flow_rates = asp_flow_rates [ i : i + 8 ] if asp_flow_rates else None
current_asp_offset = offsets [ i : i + 8 ] if offsets else [ None ] * 8
current_dis_offset = offsets [ - i * 8 - 8 : len ( offsets ) - i * 8 ] if offsets else [ None ] * 8
current_dis_offset = offsets [ i : i + 8 ] if offsets else [ None ] * 8
current_asp_liquid_height = liquid_height [ i : i + 8 ] if liquid_height else [ None ] * 8
current_dis_liquid_height = liquid_height [ - i * 8 - 8 : len ( liquid_height ) - i * 8 ] if liquid_height else [ None ] * 8
current_dis_liquid_height = liquid_height [ i : i + 8 ] if liquid_height else [ None ] * 8
current_asp_blow_out_air_volume = blow_out_air_volume [ i : i + 8 ] if blow_out_air_volume else [ None ] * 8
current_dis_blow_out_air_volume = blow_out_air_volume [ - i * 8 - 8 : len ( blow_out_air_volume ) - i * 8 ] if blow_out_air_volume else [ None ] * 8
current_dis_flow_rates = dis_flow_rates [ i : i + 8 ] if dis_flow_rates else [ None ] * 8
current_dis_blow_out_air_volume = blow_out_air_volume [ i : i + 8 ] if blow_out_air_volume else [ None ] * 8
current_dis_flow_rates = dis_flow_rates [ i : i + 8 ] if dis_flow_rates else None
await self . aspirate (
resources = current_reagent_sources ,
@@ -1058,9 +1147,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
liquid_height = current_dis_liquid_height ,
spread = spread ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
if mix_stage in [ " after " , " both " ] and mix_times is not None and mix_times > 0 :
await self . mix (
targets = current_targets ,
mix_time = mix_times ,
@@ -1069,11 +1159,364 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if delays is not None :
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . touch_tip ( current_targets )
await self . discard_tips ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] )
async def _transfer_one_to_many (
self ,
source : Container ,
targets : Sequence [ Container ] ,
tip_racks : Sequence [ TipRack ] ,
use_channels : List [ int ] ,
asp_vols : List [ float ] ,
dis_vols : List [ float ] ,
asp_flow_rates : Optional [ List [ Optional [ float ] ] ] ,
dis_flow_rates : Optional [ List [ Optional [ float ] ] ] ,
offsets : Optional [ List [ Coordinate ] ] ,
touch_tip : bool ,
liquid_height : Optional [ List [ Optional [ float ] ] ] ,
blow_out_air_volume : Optional [ List [ Optional [ float ] ] ] ,
spread : Literal [ " wide " , " tight " , " custom " ] ,
mix_stage : Optional [ Literal [ " none " , " before " , " after " , " both " ] ] ,
mix_times : Optional [ int ] ,
mix_vol : Optional [ int ] ,
mix_rate : Optional [ int ] ,
mix_liquid_height : Optional [ float ] ,
delays : Optional [ List [ int ] ] ,
) :
""" 一对多传输模式: 1 source -> N targets """
# 验证和扩展体积参数
if len ( asp_vols ) == 1 :
# 如果只提供一个吸液体积,计算总吸液体积(所有分液体积之和)
total_asp_vol = sum ( dis_vols )
asp_vol = asp_vols [ 0 ] if asp_vols [ 0 ] > = total_asp_vol else total_asp_vol
else :
raise ValueError ( " For one-to-many mode, `asp_vols` should be a single value or list with one element. " )
if len ( dis_vols ) != len ( targets ) :
raise ValueError ( f " Length of `dis_vols` { len ( dis_vols ) } must match `targets` { len ( targets ) } . " )
if len ( use_channels ) == 1 :
# 单通道模式:一次吸液,多次分液
tip = [ ]
for _ in range ( len ( use_channels ) ) :
tip . extend ( next ( self . current_tip ) )
await self . pick_up_tips ( tip )
# 从源容器吸液(总体积)
await self . aspirate (
resources = [ source ] ,
vols = [ asp_vol ] ,
use_channels = use_channels ,
flow_rates = [ asp_flow_rates [ 0 ] ] if asp_flow_rates and len ( asp_flow_rates ) > 0 else None ,
offsets = [ offsets [ 0 ] ] if offsets and len ( offsets ) > 0 else None ,
liquid_height = [ liquid_height [ 0 ] ] if liquid_height and len ( liquid_height ) > 0 else None ,
blow_out_air_volume = [ blow_out_air_volume [ 0 ] ] if blow_out_air_volume and len ( blow_out_air_volume ) > 0 else None ,
spread = spread ,
)
if delays is not None :
await self . custom_delay ( seconds = delays [ 0 ] )
# 分多次分液到不同的目标容器
for idx , target in enumerate ( targets ) :
await self . dispense (
resources = [ target ] ,
vols = [ dis_vols [ idx ] ] ,
use_channels = use_channels ,
flow_rates = [ dis_flow_rates [ idx ] ] if dis_flow_rates and len ( dis_flow_rates ) > idx else None ,
offsets = [ offsets [ idx ] ] if offsets and len ( offsets ) > idx else None ,
blow_out_air_volume = [ blow_out_air_volume [ idx ] ] if blow_out_air_volume and len ( blow_out_air_volume ) > idx else None ,
liquid_height = [ liquid_height [ idx ] ] if liquid_height and len ( liquid_height ) > idx else None ,
spread = spread ,
)
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
if mix_stage in [ " after " , " both " ] and mix_times is not None and mix_times > 0 :
await self . mix (
targets = [ target ] ,
mix_time = mix_times ,
mix_vol = mix_vol ,
offsets = offsets [ idx : idx + 1 ] if offsets else None ,
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if touch_tip :
await self . touch_tip ( [ target ] )
await self . discard_tips ( use_channels = use_channels )
elif len ( use_channels ) == 8 :
# 8通道模式: 需要确保目标数量是8的倍数
if len ( targets ) % 8 != 0 :
raise ValueError ( f " For 8-channel mode, number of targets { len ( targets ) } must be a multiple of 8. " )
# 每次处理8个目标
for i in range ( 0 , len ( targets ) , 8 ) :
tip = [ ]
for _ in range ( len ( use_channels ) ) :
tip . extend ( next ( self . current_tip ) )
await self . pick_up_tips ( tip )
current_targets = targets [ i : i + 8 ]
current_dis_vols = dis_vols [ i : i + 8 ]
# 8个通道都从同一个源容器吸液, 每个通道的吸液体积等于对应的分液体积
current_asp_flow_rates = asp_flow_rates [ 0 : 1 ] * 8 if asp_flow_rates and len ( asp_flow_rates ) > 0 else None
current_asp_offset = offsets [ 0 : 1 ] * 8 if offsets and len ( offsets ) > 0 else [ None ] * 8
current_asp_liquid_height = liquid_height [ 0 : 1 ] * 8 if liquid_height and len ( liquid_height ) > 0 else [ None ] * 8
current_asp_blow_out_air_volume = blow_out_air_volume [ 0 : 1 ] * 8 if blow_out_air_volume and len ( blow_out_air_volume ) > 0 else [ None ] * 8
# 从源容器吸液( 8个通道都从同一个源, 但每个通道的吸液体积不同)
await self . aspirate (
resources = [ source ] * 8 , # 8个通道都从同一个源
vols = current_dis_vols , # 每个通道的吸液体积等于对应的分液体积
use_channels = use_channels ,
flow_rates = current_asp_flow_rates ,
offsets = current_asp_offset ,
liquid_height = current_asp_liquid_height ,
blow_out_air_volume = current_asp_blow_out_air_volume ,
spread = spread ,
)
if delays is not None :
await self . custom_delay ( seconds = delays [ 0 ] )
# 分液到8个目标
current_dis_flow_rates = dis_flow_rates [ i : i + 8 ] if dis_flow_rates else None
current_dis_offset = offsets [ i : i + 8 ] if offsets else [ None ] * 8
current_dis_liquid_height = liquid_height [ i : i + 8 ] if liquid_height else [ None ] * 8
current_dis_blow_out_air_volume = blow_out_air_volume [ i : i + 8 ] if blow_out_air_volume else [ None ] * 8
await self . dispense (
resources = current_targets ,
vols = current_dis_vols ,
use_channels = use_channels ,
flow_rates = current_dis_flow_rates ,
offsets = current_dis_offset ,
blow_out_air_volume = current_dis_blow_out_air_volume ,
liquid_height = current_dis_liquid_height ,
spread = spread ,
)
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
if mix_stage in [ " after " , " both " ] and mix_times is not None and mix_times > 0 :
await self . mix (
targets = current_targets ,
mix_time = mix_times ,
mix_vol = mix_vol ,
offsets = offsets if offsets else None ,
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if touch_tip :
await self . touch_tip ( current_targets )
await self . discard_tips ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] )
async def _transfer_many_to_one (
self ,
sources : Sequence [ Container ] ,
target : Container ,
tip_racks : Sequence [ TipRack ] ,
use_channels : List [ int ] ,
asp_vols : List [ float ] ,
dis_vols : List [ float ] ,
asp_flow_rates : Optional [ List [ Optional [ float ] ] ] ,
dis_flow_rates : Optional [ List [ Optional [ float ] ] ] ,
offsets : Optional [ List [ Coordinate ] ] ,
touch_tip : bool ,
liquid_height : Optional [ List [ Optional [ float ] ] ] ,
blow_out_air_volume : Optional [ List [ Optional [ float ] ] ] ,
spread : Literal [ " wide " , " tight " , " custom " ] ,
mix_stage : Optional [ Literal [ " none " , " before " , " after " , " both " ] ] ,
mix_times : Optional [ int ] ,
mix_vol : Optional [ int ] ,
mix_rate : Optional [ int ] ,
mix_liquid_height : Optional [ float ] ,
delays : Optional [ List [ int ] ] ,
) :
""" 多对一传输模式: N sources -> 1 target( 汇总/混合) """
# 验证和扩展体积参数
if len ( asp_vols ) != len ( sources ) :
raise ValueError ( f " Length of `asp_vols` { len ( asp_vols ) } must match `sources` { len ( sources ) } . " )
# 支持两种模式:
# 1. dis_vols 为单个值:所有源汇总,使用总吸液体积或指定分液体积
# 2. dis_vols 长度等于 asp_vols: 每个源按不同比例分液( 按比例混合)
if len ( dis_vols ) == 1 :
# 模式1: 使用单个分液体积
total_dis_vol = sum ( asp_vols )
dis_vol = dis_vols [ 0 ] if dis_vols [ 0 ] > = total_dis_vol else total_dis_vol
use_proportional_mixing = False
elif len ( dis_vols ) == len ( asp_vols ) :
# 模式2: 按不同比例混合
use_proportional_mixing = True
else :
raise ValueError (
f " For many-to-one mode, `dis_vols` should be a single value or list with length { len ( asp_vols ) } "
f " (matching `asp_vols`). Got length { len ( dis_vols ) } . "
)
if len ( use_channels ) == 1 :
# 单通道模式:多次吸液,一次分液
# 先混合前(如果需要)
if mix_stage in [ " before " , " both " ] and mix_times is not None and mix_times > 0 :
# 注意:在吸液前混合源容器通常不常见,这里跳过
pass
# 从每个源容器吸液并分液到目标容器
for idx , source in enumerate ( sources ) :
tip = [ ]
for _ in range ( len ( use_channels ) ) :
tip . extend ( next ( self . current_tip ) )
await self . pick_up_tips ( tip )
await self . aspirate (
resources = [ source ] ,
vols = [ asp_vols [ idx ] ] ,
use_channels = use_channels ,
flow_rates = [ asp_flow_rates [ idx ] ] if asp_flow_rates and len ( asp_flow_rates ) > idx else None ,
offsets = [ offsets [ idx ] ] if offsets and len ( offsets ) > idx else None ,
liquid_height = [ liquid_height [ idx ] ] if liquid_height and len ( liquid_height ) > idx else None ,
blow_out_air_volume = [ blow_out_air_volume [ idx ] ] if blow_out_air_volume and len ( blow_out_air_volume ) > idx else None ,
spread = spread ,
)
if delays is not None :
await self . custom_delay ( seconds = delays [ 0 ] )
# 分液到目标容器
if use_proportional_mixing :
# 按不同比例混合:使用对应的 dis_vols
dis_vol = dis_vols [ idx ]
dis_flow_rate = dis_flow_rates [ idx ] if dis_flow_rates and len ( dis_flow_rates ) > idx else None
dis_offset = offsets [ idx ] if offsets and len ( offsets ) > idx else None
dis_liquid_height = liquid_height [ idx ] if liquid_height and len ( liquid_height ) > idx else None
dis_blow_out = blow_out_air_volume [ idx ] if blow_out_air_volume and len ( blow_out_air_volume ) > idx else None
else :
# 标准模式:分液体积等于吸液体积
dis_vol = asp_vols [ idx ]
dis_flow_rate = dis_flow_rates [ 0 ] if dis_flow_rates and len ( dis_flow_rates ) > 0 else None
dis_offset = offsets [ 0 ] if offsets and len ( offsets ) > 0 else None
dis_liquid_height = liquid_height [ 0 ] if liquid_height and len ( liquid_height ) > 0 else None
dis_blow_out = blow_out_air_volume [ 0 ] if blow_out_air_volume and len ( blow_out_air_volume ) > 0 else None
await self . dispense (
resources = [ target ] ,
vols = [ dis_vol ] ,
use_channels = use_channels ,
flow_rates = [ dis_flow_rate ] if dis_flow_rate is not None else None ,
offsets = [ dis_offset ] if dis_offset is not None else None ,
blow_out_air_volume = [ dis_blow_out ] if dis_blow_out is not None else None ,
liquid_height = [ dis_liquid_height ] if dis_liquid_height is not None else None ,
spread = spread ,
)
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . discard_tips ( use_channels = use_channels )
# 最后在目标容器中混合(如果需要)
if mix_stage in [ " after " , " both " ] and mix_times is not None and mix_times > 0 :
await self . mix (
targets = [ target ] ,
mix_time = mix_times ,
mix_vol = mix_vol ,
offsets = offsets [ 0 : 1 ] if offsets else None ,
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if touch_tip :
await self . touch_tip ( [ target ] )
elif len ( use_channels ) == 8 :
# 8通道模式: 需要确保源数量是8的倍数
if len ( sources ) % 8 != 0 :
raise ValueError ( f " For 8-channel mode, number of sources { len ( sources ) } must be a multiple of 8. " )
# 每次处理8个源
for i in range ( 0 , len ( sources ) , 8 ) :
tip = [ ]
for _ in range ( len ( use_channels ) ) :
tip . extend ( next ( self . current_tip ) )
await self . pick_up_tips ( tip )
current_sources = sources [ i : i + 8 ]
current_asp_vols = asp_vols [ i : i + 8 ]
current_asp_flow_rates = asp_flow_rates [ i : i + 8 ] if asp_flow_rates else None
current_asp_offset = offsets [ i : i + 8 ] if offsets else [ None ] * 8
current_asp_liquid_height = liquid_height [ i : i + 8 ] if liquid_height else [ None ] * 8
current_asp_blow_out_air_volume = blow_out_air_volume [ i : i + 8 ] if blow_out_air_volume else [ None ] * 8
# 从8个源容器吸液
await self . aspirate (
resources = current_sources ,
vols = current_asp_vols ,
use_channels = use_channels ,
flow_rates = current_asp_flow_rates ,
offsets = current_asp_offset ,
blow_out_air_volume = current_asp_blow_out_air_volume ,
liquid_height = current_asp_liquid_height ,
spread = spread ,
)
if delays is not None :
await self . custom_delay ( seconds = delays [ 0 ] )
# 分液到目标容器(每个通道分液到同一个目标)
if use_proportional_mixing :
# 按比例混合:使用对应的 dis_vols
current_dis_vols = dis_vols [ i : i + 8 ]
current_dis_flow_rates = dis_flow_rates [ i : i + 8 ] if dis_flow_rates else None
current_dis_offset = offsets [ i : i + 8 ] if offsets else [ None ] * 8
current_dis_liquid_height = liquid_height [ i : i + 8 ] if liquid_height else [ None ] * 8
current_dis_blow_out_air_volume = blow_out_air_volume [ i : i + 8 ] if blow_out_air_volume else [ None ] * 8
else :
# 标准模式:每个通道分液体积等于其吸液体积
current_dis_vols = current_asp_vols
current_dis_flow_rates = dis_flow_rates [ 0 : 1 ] * 8 if dis_flow_rates else None
current_dis_offset = offsets [ 0 : 1 ] * 8 if offsets else [ None ] * 8
current_dis_liquid_height = liquid_height [ 0 : 1 ] * 8 if liquid_height else [ None ] * 8
current_dis_blow_out_air_volume = blow_out_air_volume [ 0 : 1 ] * 8 if blow_out_air_volume else [ None ] * 8
await self . dispense (
resources = [ target ] * 8 , # 8个通道都分到同一个目标
vols = current_dis_vols ,
use_channels = use_channels ,
flow_rates = current_dis_flow_rates ,
offsets = current_dis_offset ,
blow_out_air_volume = current_dis_blow_out_air_volume ,
liquid_height = current_dis_liquid_height ,
spread = spread ,
)
if delays is not None and len ( delays ) > 1 :
await self . custom_delay ( seconds = delays [ 1 ] )
await self . discard_tips ( [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 ] )
# 最后在目标容器中混合(如果需要)
if mix_stage in [ " after " , " both " ] and mix_times is not None and mix_times > 0 :
await self . mix (
targets = [ target ] ,
mix_time = mix_times ,
mix_vol = mix_vol ,
offsets = offsets [ 0 : 1 ] if offsets else None ,
height_to_bottom = mix_liquid_height if mix_liquid_height else None ,
mix_rate = mix_rate if mix_rate else None ,
)
if touch_tip :
await self . touch_tip ( [ target ] )
# except Exception as e:
# traceback.print_exc()
# raise RuntimeError(f"Liquid addition failed: {e}") from e