添加单枪头的多对多移液判定

This commit is contained in:
q434343
2026-02-13 13:46:27 +08:00
parent 5c2da9b793
commit 2e5fac26b3

View File

@@ -1159,11 +1159,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
Number of mix cycles. If *None* (default) no mixing occurs regardless of Number of mix cycles. If *None* (default) no mixing occurs regardless of
mix_stage. mix_stage.
""" """
num_sources = len(sources)
num_targets = len(targets)
len_asp_vols = len(asp_vols)
len_dis_vols = len(dis_vols)
# 确保 use_channels 有默认值 # 确保 use_channels 有默认值
if use_channels is None: if use_channels is None:
# 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7 # 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7
use_channels = list(range(self.channel_num)) if self.channel_num > 0 else [0] use_channels = list(range(self.channel_num)) if self.channel_num == 8 else [0]
elif len(use_channels) == 8:
if self.channel_num != 8:
raise ValueError(f"if channel_num is 8, use_channels length must be 8, but got {len(use_channels)}")
if num_sources%8 != 0 or num_targets%8 != 0 or len_asp_vols%8 != 0 or len_dis_vols%8 != 0:
raise ValueError(f"if channel_num is 8, sources, targets, asp_vols, and dis_vols length must be divisible by 8, but got {num_sources}, {num_targets}, {len_asp_vols}, and {len_dis_vols}")
if is_96_well: if is_96_well:
pass # This mode is not verified. pass # This mode is not verified.
@@ -1191,89 +1199,233 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
if mix_times is not None: if mix_times is not None:
mix_times = int(mix_times) mix_times = int(mix_times)
# 设置tip racks
self.set_tiprack(tip_racks)
# 识别传输模式mix_times 为 None 也应该能正常移液,只是不做 mix # 识别传输模式mix_times 为 None 也应该能正常移液,只是不做 mix
num_sources = len(sources) num_sources = len(sources)
num_targets = len(targets) num_targets = len(targets)
len_asp_vols = len(asp_vols)
len_dis_vols = len(dis_vols)
if num_sources == 1 and num_targets > 1: if num_targets != 1 and num_sources != 1:
# 模式1: 一对多 (1 source -> N targets) if len_asp_vols != num_sources and len_asp_vols != num_targets:
await self._transfer_one_to_many( raise ValueError(f"asp_vols length must be equal to sources or targets length, but got {len_asp_vols} and {num_sources} and {num_targets}")
sources[0], if len_dis_vols != num_sources and len_dis_vols != num_targets:
targets, raise ValueError(f"dis_vols length must be equal to sources or targets length, but got {len_dis_vols} and {num_sources} and {num_targets}")
tip_racks,
use_channels, if len(use_channels) == 1:
asp_vols, max_len = max(num_sources, num_targets)
dis_vols, for i in range(max_len):
asp_flow_rates,
dis_flow_rates, # 辅助函数安全地从列表中获取元素如果列表为空则返回None
offsets, def safe_get(lst, idx, default=None):
touch_tip, return [lst[idx]] if lst else default
liquid_height,
blow_out_air_volume, # 动态构建参数字典,只传递实际提供的参数
spread, kwargs = {
mix_stage, 'sources': [sources[i%num_sources]],
mix_times, 'targets': [targets[i%num_targets]],
mix_vol, 'tip_racks': tip_racks,
mix_rate, 'use_channels': use_channels,
mix_liquid_height, 'asp_vols': [asp_vols[i%len_asp_vols]],
delays, 'dis_vols': [dis_vols[i%len_dis_vols]],
) }
elif num_sources > 1 and num_targets == 1:
# 模式2: 多对一 (N sources -> 1 target) # 条件性添加可选参数
await self._transfer_many_to_one( if asp_flow_rates is not None:
sources, kwargs['asp_flow_rates'] = [asp_flow_rates[i%len_asp_vols]]
targets[0], if dis_flow_rates is not None:
tip_racks, kwargs['dis_flow_rates'] = [dis_flow_rates[i%len_dis_vols]]
use_channels, if offsets is not None:
asp_vols, kwargs['offsets'] = safe_get(offsets, i)
dis_vols, if touch_tip is not None:
asp_flow_rates, kwargs['touch_tip'] = touch_tip if touch_tip else False
dis_flow_rates, if liquid_height is not None:
offsets, kwargs['liquid_height'] = safe_get(liquid_height, i)
touch_tip, if blow_out_air_volume is not None:
liquid_height, kwargs['blow_out_air_volume'] = safe_get(blow_out_air_volume, i)
blow_out_air_volume, if spread is not None:
spread, kwargs['spread'] = spread
mix_stage, if mix_stage is not None:
mix_times, kwargs['mix_stage'] = safe_get(mix_stage, i)
mix_vol, if mix_times is not None:
mix_rate, kwargs['mix_times'] = safe_get(mix_times, i)
mix_liquid_height, if mix_vol is not None:
delays, kwargs['mix_vol'] = safe_get(mix_vol, i)
) if mix_rate is not None:
elif num_sources == num_targets: kwargs['mix_rate'] = safe_get(mix_rate, i)
# 模式3: 一对一 (N sources -> N targets) if mix_liquid_height is not None:
await self._transfer_one_to_one( kwargs['mix_liquid_height'] = safe_get(mix_liquid_height, i)
sources, if delays is not None:
targets, kwargs['delays'] = safe_get(delays, i)
tip_racks,
use_channels, await self._transfer_base_method(**kwargs)
asp_vols,
dis_vols,
asp_flow_rates,
dis_flow_rates, # if num_sources == 1 and num_targets > 1:
offsets, # # 模式1: 一对多 (1 source -> N targets)
touch_tip, # await self._transfer_one_to_many(
liquid_height, # sources,
blow_out_air_volume, # targets,
spread, # tip_racks,
mix_stage, # use_channels,
mix_times, # asp_vols,
mix_vol, # dis_vols,
mix_rate, # asp_flow_rates,
mix_liquid_height, # dis_flow_rates,
delays, # offsets,
) # touch_tip,
else: # liquid_height,
raise ValueError( # blow_out_air_volume,
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. " # spread,
"Supported modes: 1->N, N->1, or N->N." # mix_stage,
) # mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# elif num_sources > 1 and num_targets == 1:
# # 模式2: 多对一 (N sources -> 1 target)
# await self._transfer_many_to_one(
# sources,
# targets[0],
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# elif num_sources == num_targets:
# # 模式3: 一对一 (N sources -> N targets)
# await self._transfer_one_to_one(
# sources,
# targets,
# tip_racks,
# use_channels,
# asp_vols,
# dis_vols,
# asp_flow_rates,
# dis_flow_rates,
# offsets,
# touch_tip,
# liquid_height,
# blow_out_air_volume,
# spread,
# mix_stage,
# mix_times,
# mix_vol,
# mix_rate,
# mix_liquid_height,
# delays,
# )
# else:
# raise ValueError(
# f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
# "Supported modes: 1->N, N->1, or N->N."
# )
return TransferLiquidReturn( return TransferLiquidReturn(
sources=ResourceTreeSet.from_plr_resources(list(sources), known_newly_created=False).dump(), # type: ignore sources=ResourceTreeSet.from_plr_resources(list(sources), known_newly_created=False).dump(), # type: ignore
targets=ResourceTreeSet.from_plr_resources(list(targets), known_newly_created=False).dump(), # type: ignore targets=ResourceTreeSet.from_plr_resources(list(targets), known_newly_created=False).dump(), # type: ignore
) )
async def _transfer_base_method(
self,
sources: Sequence[Container],
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
**kwargs
):
# 从kwargs中提取参数提供默认值
asp_flow_rates = kwargs.get('asp_flow_rates')
dis_flow_rates = kwargs.get('dis_flow_rates')
offsets = kwargs.get('offsets')
touch_tip = kwargs.get('touch_tip', False)
liquid_height = kwargs.get('liquid_height')
blow_out_air_volume = kwargs.get('blow_out_air_volume')
spread = kwargs.get('spread', 'wide')
mix_stage = kwargs.get('mix_stage')
mix_times = kwargs.get('mix_times')
mix_vol = kwargs.get('mix_vol')
mix_rate = kwargs.get('mix_rate')
mix_liquid_height = kwargs.get('mix_liquid_height')
delays = kwargs.get('delays')
tip = []
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
await self.aspirate(
resources=[sources[0]],
vols=[asp_vols[0]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
),
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[targets[0]],
vols=[dis_vols[0]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[0]] if dis_flow_rates and len(dis_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
blow_out_air_volume=(
[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
),
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[0]],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
use_channels=use_channels,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[0])
await self.touch_tip(targets[0])
await self.discard_tips(use_channels=use_channels)
async def _transfer_one_to_one( async def _transfer_one_to_one(
self, self,