diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 7a492939..9bc56d4d 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -147,6 +147,9 @@ class LiquidHandlerMiddleware(LiquidHandler): offsets: Optional[List[Coordinate]] = None, **backend_kwargs, ): + # 如果 use_channels 为 None,使用默认值(所有通道) + if use_channels is None: + use_channels = list(range(self.channel_num)) if not offsets or (isinstance(offsets, list) and len(offsets) != len(use_channels)): offsets = [Coordinate.zero()] * len(use_channels) if self._simulator: @@ -759,7 +762,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): blow_out_air_volume=current_dis_blow_out_air_volume, spread=spread, ) - if delays is not None: + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) await self.touch_tip(current_targets) await self.discard_tips() @@ -833,17 +836,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): spread=spread, ) - if delays is not None: + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - await self.mix( - targets=[targets[_]], - mix_time=mix_time, - mix_vol=mix_vol, - offsets=offsets if offsets else None, - height_to_bottom=mix_liquid_height if mix_liquid_height else None, - mix_rate=mix_rate if mix_rate else None, - ) - if delays is not None: + # 只有在 mix_time 有效时才调用 mix + if mix_time is not None and mix_time > 0: + await self.mix( + targets=[targets[_]], + mix_time=mix_time, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) await self.touch_tip(targets[_]) await self.discard_tips() @@ -893,18 +898,20 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): blow_out_air_volume=current_dis_blow_out_air_volume, spread=spread, ) - if delays is not None: + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - await self.mix( - targets=current_targets, - mix_time=mix_time, - mix_vol=mix_vol, - offsets=offsets if offsets else None, - height_to_bottom=mix_liquid_height if mix_liquid_height else None, - mix_rate=mix_rate if mix_rate else None, - ) - if delays is not None: + # 只有在 mix_time 有效时才调用 mix + if mix_time is not None and mix_time > 0: + await self.mix( + targets=current_targets, + mix_time=mix_time, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) await self.touch_tip(current_targets) await self.discard_tips() @@ -942,60 +949,146 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): delays: Optional[List[int]] = None, none_keys: List[str] = [], ): - """Transfer liquid from each *source* well/plate to the corresponding *target*. + """Transfer liquid with automatic mode detection. + + Supports three transfer modes: + 1. One-to-many (1 source -> N targets): Distribute from one source to multiple targets + 2. One-to-one (N sources -> N targets): Standard transfer, each source to corresponding target + 3. Many-to-one (N sources -> 1 target): Combine multiple sources into one target Parameters ---------- asp_vols, dis_vols - Single volume (µL) or list matching the number of transfers. + Single volume (µL) or list. Automatically expanded based on transfer mode. sources, targets - Same‑length sequences of containers (wells or plates). In 96‑well mode - each must contain exactly one plate. + Containers (wells or plates). Length determines transfer mode: + - len(sources) == 1, len(targets) > 1: One-to-many mode + - len(sources) == len(targets): One-to-one mode + - len(sources) > 1, len(targets) == 1: Many-to-one mode tip_racks One or more TipRacks providing fresh tips. is_96_well Set *True* to use the 96‑channel head. """ - - + + # 确保 use_channels 有默认值 + if use_channels is None: + use_channels = [0] if self.channel_num >= 1 else list(range(self.channel_num)) + if is_96_well: pass # This mode is not verified. else: - if len(asp_vols) != len(targets): - raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") + # 转换体积参数为列表 + if isinstance(asp_vols, (int, float)): + asp_vols = [float(asp_vols)] + else: + asp_vols = [float(v) for v in asp_vols] + + if isinstance(dis_vols, (int, float)): + dis_vols = [float(dis_vols)] + else: + dis_vols = [float(v) for v in dis_vols] + + # 识别传输模式 + num_sources = len(sources) + num_targets = len(targets) + + if num_sources == 1 and num_targets > 1: + # 模式1: 一对多 (1 source -> N targets) + await self._transfer_one_to_many( + sources[0], targets, tip_racks, use_channels, + asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, + offsets, touch_tip, liquid_height, blow_out_air_volume, + spread, mix_stage, mix_times, mix_vol, mix_rate, + mix_liquid_height, delays + ) + elif num_sources > 1 and num_targets == 1: + # 模式2: 多对一 (N sources -> 1 target) + await self._transfer_many_to_one( + sources, targets[0], tip_racks, use_channels, + asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, + offsets, touch_tip, liquid_height, blow_out_air_volume, + spread, mix_stage, mix_times, mix_vol, mix_rate, + mix_liquid_height, delays + ) + elif num_sources == num_targets: + # 模式3: 一对一 (N sources -> N targets) - 原有逻辑 + await self._transfer_one_to_one( + sources, targets, tip_racks, use_channels, + asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, + offsets, touch_tip, liquid_height, blow_out_air_volume, + spread, mix_stage, mix_times, mix_vol, mix_rate, + mix_liquid_height, delays + ) + else: + raise ValueError( + f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. " + "Supported modes: 1->N, N->1, or N->N." + ) - # 首先应该对任务分组,然后每次1个/8个进行操作处理 - if len(use_channels) == 1: - for _ in range(len(targets)): - tip = [] - for ___ in range(len(use_channels)): - tip.extend(next(self.current_tip)) - await self.pick_up_tips(tip) + async def _transfer_one_to_one( + self, + sources: Sequence[Container], + targets: Sequence[Container], + tip_racks: Sequence[TipRack], + use_channels: List[int], + asp_vols: List[float], + dis_vols: List[float], + asp_flow_rates: Optional[List[Optional[float]]], + dis_flow_rates: Optional[List[Optional[float]]], + offsets: Optional[List[Coordinate]], + touch_tip: bool, + liquid_height: Optional[List[Optional[float]]], + blow_out_air_volume: Optional[List[Optional[float]]], + spread: Literal["wide", "tight", "custom"], + mix_stage: Optional[Literal["none", "before", "after", "both"]], + mix_times: Optional[int], + mix_vol: Optional[int], + mix_rate: Optional[int], + mix_liquid_height: Optional[float], + delays: Optional[List[int]], + ): + """一对一传输模式:N sources -> N targets""" + # 验证参数长度 + if len(asp_vols) != len(targets): + raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") + if len(dis_vols) != len(targets): + raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.") + if len(sources) != len(targets): + raise ValueError(f"Length of `sources` {len(sources)} must match `targets` {len(targets)}.") - await self.aspirate( - resources=[sources[_]], - vols=[asp_vols[_]], - use_channels=use_channels, - flow_rates=[asp_flow_rates[0]] if asp_flow_rates else None, - offsets=[offsets[0]] if offsets else None, - liquid_height=[liquid_height[0]] if liquid_height else None, - blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, - spread=spread, - ) - if delays is not None: - await self.custom_delay(seconds=delays[0]) - await self.dispense( - resources=[targets[_]], - vols=[dis_vols[_]], - use_channels=use_channels, - flow_rates=[dis_flow_rates[1]] if dis_flow_rates else None, - offsets=[offsets[1]] if offsets else None, - blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, - liquid_height=[liquid_height[1]] if liquid_height else None, - spread=spread, - ) - if delays is not None: - await self.custom_delay(seconds=delays[1]) + if len(use_channels) == 1: + for _ in range(len(targets)): + tip = [] + for ___ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + await self.aspirate( + resources=[sources[_]], + vols=[asp_vols[_]], + use_channels=use_channels, + flow_rates=[asp_flow_rates[_]] if asp_flow_rates and len(asp_flow_rates) > _ else None, + offsets=[offsets[_]] if offsets and len(offsets) > _ else None, + liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None, + blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=[targets[_]], + vols=[dis_vols[_]], + use_channels=use_channels, + flow_rates=[dis_flow_rates[_]] if dis_flow_rates and len(dis_flow_rates) > _ else None, + offsets=[offsets[_]] if offsets and len(offsets) > _ else None, + blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None, + liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None, + spread=spread, + ) + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: await self.mix( targets=[targets[_]], mix_time=mix_times, @@ -1004,63 +1097,60 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - if delays is not None: - await self.custom_delay(seconds=delays[1]) - await self.touch_tip(targets[_]) - await self.discard_tips() + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + await self.touch_tip(targets[_]) + await self.discard_tips(use_channels=use_channels) - elif len(use_channels) == 8: - # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 - if len(targets) % 8 != 0: - raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") + elif len(use_channels) == 8: + if len(targets) % 8 != 0: + raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") - # 8个8个来取任务序列 + for i in range(0, len(targets), 8): + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + current_targets = targets[i:i + 8] + current_reagent_sources = sources[i:i + 8] + current_asp_vols = asp_vols[i:i + 8] + current_dis_vols = dis_vols[i:i + 8] + current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None + current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None - for i in range(0, len(targets), 8): - # 取出8个任务 - tip = [] - for _ in range(len(use_channels)): - tip.extend(next(self.current_tip)) - await self.pick_up_tips(tip) - current_targets = targets[i:i + 8] - current_reagent_sources = sources[i:i + 8] - current_asp_vols = asp_vols[i:i + 8] - current_dis_vols = dis_vols[i:i + 8] - current_asp_flow_rates = asp_flow_rates[i:i + 8] - current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8 - current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8 - current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8 - current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8 + await self.aspirate( + resources=current_reagent_sources, + vols=current_asp_vols, + use_channels=use_channels, + flow_rates=current_asp_flow_rates, + offsets=current_asp_offset, + blow_out_air_volume=current_asp_blow_out_air_volume, + liquid_height=current_asp_liquid_height, + spread=spread, + ) - await self.aspirate( - resources=current_reagent_sources, - vols=current_asp_vols, - use_channels=use_channels, - flow_rates=current_asp_flow_rates, - offsets=current_asp_offset, - blow_out_air_volume=current_asp_blow_out_air_volume, - liquid_height=current_asp_liquid_height, - spread=spread, - ) - - if delays is not None: - await self.custom_delay(seconds=delays[0]) - await self.dispense( - resources=current_targets, - vols=current_dis_vols, - use_channels=use_channels, - flow_rates=current_dis_flow_rates, - offsets=current_dis_offset, - blow_out_air_volume=current_dis_blow_out_air_volume, - liquid_height=current_dis_liquid_height, - spread=spread, - ) - if delays is not None: - await self.custom_delay(seconds=delays[1]) + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=current_targets, + vols=current_dis_vols, + use_channels=use_channels, + flow_rates=current_dis_flow_rates, + offsets=current_dis_offset, + blow_out_air_volume=current_dis_blow_out_air_volume, + liquid_height=current_dis_liquid_height, + spread=spread, + ) + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: await self.mix( targets=current_targets, mix_time=mix_times, @@ -1069,10 +1159,363 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - if delays is not None: - await self.custom_delay(seconds=delays[1]) + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + await self.touch_tip(current_targets) + await self.discard_tips([0,1,2,3,4,5,6,7]) + + async def _transfer_one_to_many( + self, + source: Container, + targets: Sequence[Container], + tip_racks: Sequence[TipRack], + use_channels: List[int], + asp_vols: List[float], + dis_vols: List[float], + asp_flow_rates: Optional[List[Optional[float]]], + dis_flow_rates: Optional[List[Optional[float]]], + offsets: Optional[List[Coordinate]], + touch_tip: bool, + liquid_height: Optional[List[Optional[float]]], + blow_out_air_volume: Optional[List[Optional[float]]], + spread: Literal["wide", "tight", "custom"], + mix_stage: Optional[Literal["none", "before", "after", "both"]], + mix_times: Optional[int], + mix_vol: Optional[int], + mix_rate: Optional[int], + mix_liquid_height: Optional[float], + delays: Optional[List[int]], + ): + """一对多传输模式:1 source -> N targets""" + # 验证和扩展体积参数 + if len(asp_vols) == 1: + # 如果只提供一个吸液体积,计算总吸液体积(所有分液体积之和) + total_asp_vol = sum(dis_vols) + asp_vol = asp_vols[0] if asp_vols[0] >= total_asp_vol else total_asp_vol + else: + raise ValueError("For one-to-many mode, `asp_vols` should be a single value or list with one element.") + + if len(dis_vols) != len(targets): + raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.") + + if len(use_channels) == 1: + # 单通道模式:一次吸液,多次分液 + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + # 从源容器吸液(总体积) + await self.aspirate( + resources=[source], + vols=[asp_vol], + use_channels=use_channels, + flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None, + offsets=[offsets[0]] if offsets and len(offsets) > 0 else None, + liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None, + blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None, + spread=spread, + ) + + if delays is not None: + await self.custom_delay(seconds=delays[0]) + + # 分多次分液到不同的目标容器 + for idx, target in enumerate(targets): + await self.dispense( + resources=[target], + vols=[dis_vols[idx]], + use_channels=use_channels, + flow_rates=[dis_flow_rates[idx]] if dis_flow_rates and len(dis_flow_rates) > idx else None, + offsets=[offsets[idx]] if offsets and len(offsets) > idx else None, + blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None, + liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None, + spread=spread, + ) + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=[target], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[idx:idx+1] if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if touch_tip: + await self.touch_tip([target]) + + await self.discard_tips(use_channels=use_channels) + + elif len(use_channels) == 8: + # 8通道模式:需要确保目标数量是8的倍数 + if len(targets) % 8 != 0: + raise ValueError(f"For 8-channel mode, number of targets {len(targets)} must be a multiple of 8.") + + # 每次处理8个目标 + for i in range(0, len(targets), 8): + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + current_targets = targets[i:i + 8] + current_dis_vols = dis_vols[i:i + 8] + + # 8个通道都从同一个源容器吸液,每个通道的吸液体积等于对应的分液体积 + current_asp_flow_rates = asp_flow_rates[0:1] * 8 if asp_flow_rates and len(asp_flow_rates) > 0 else None + current_asp_offset = offsets[0:1] * 8 if offsets and len(offsets) > 0 else [None] * 8 + current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8 + + # 从源容器吸液(8个通道都从同一个源,但每个通道的吸液体积不同) + await self.aspirate( + resources=[source] * 8, # 8个通道都从同一个源 + vols=current_dis_vols, # 每个通道的吸液体积等于对应的分液体积 + use_channels=use_channels, + flow_rates=current_asp_flow_rates, + offsets=current_asp_offset, + liquid_height=current_asp_liquid_height, + blow_out_air_volume=current_asp_blow_out_air_volume, + spread=spread, + ) + + if delays is not None: + await self.custom_delay(seconds=delays[0]) + + # 分液到8个目标 + current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None + current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + + await self.dispense( + resources=current_targets, + vols=current_dis_vols, + use_channels=use_channels, + flow_rates=current_dis_flow_rates, + offsets=current_dis_offset, + blow_out_air_volume=current_dis_blow_out_air_volume, + liquid_height=current_dis_liquid_height, + spread=spread, + ) + + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=current_targets, + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + + if touch_tip: await self.touch_tip(current_targets) - await self.discard_tips([0,1,2,3,4,5,6,7]) + + await self.discard_tips([0,1,2,3,4,5,6,7]) + + async def _transfer_many_to_one( + self, + sources: Sequence[Container], + target: Container, + tip_racks: Sequence[TipRack], + use_channels: List[int], + asp_vols: List[float], + dis_vols: List[float], + asp_flow_rates: Optional[List[Optional[float]]], + dis_flow_rates: Optional[List[Optional[float]]], + offsets: Optional[List[Coordinate]], + touch_tip: bool, + liquid_height: Optional[List[Optional[float]]], + blow_out_air_volume: Optional[List[Optional[float]]], + spread: Literal["wide", "tight", "custom"], + mix_stage: Optional[Literal["none", "before", "after", "both"]], + mix_times: Optional[int], + mix_vol: Optional[int], + mix_rate: Optional[int], + mix_liquid_height: Optional[float], + delays: Optional[List[int]], + ): + """多对一传输模式:N sources -> 1 target(汇总/混合)""" + # 验证和扩展体积参数 + if len(asp_vols) != len(sources): + raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `sources` {len(sources)}.") + + # 支持两种模式: + # 1. dis_vols 为单个值:所有源汇总,使用总吸液体积或指定分液体积 + # 2. dis_vols 长度等于 asp_vols:每个源按不同比例分液(按比例混合) + if len(dis_vols) == 1: + # 模式1:使用单个分液体积 + total_dis_vol = sum(asp_vols) + dis_vol = dis_vols[0] if dis_vols[0] >= total_dis_vol else total_dis_vol + use_proportional_mixing = False + elif len(dis_vols) == len(asp_vols): + # 模式2:按不同比例混合 + use_proportional_mixing = True + else: + raise ValueError( + f"For many-to-one mode, `dis_vols` should be a single value or list with length {len(asp_vols)} " + f"(matching `asp_vols`). Got length {len(dis_vols)}." + ) + + if len(use_channels) == 1: + # 单通道模式:多次吸液,一次分液 + # 先混合前(如果需要) + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: + # 注意:在吸液前混合源容器通常不常见,这里跳过 + pass + + # 从每个源容器吸液并分液到目标容器 + for idx, source in enumerate(sources): + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + await self.aspirate( + resources=[source], + vols=[asp_vols[idx]], + use_channels=use_channels, + flow_rates=[asp_flow_rates[idx]] if asp_flow_rates and len(asp_flow_rates) > idx else None, + offsets=[offsets[idx]] if offsets and len(offsets) > idx else None, + liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None, + blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None, + spread=spread, + ) + + if delays is not None: + await self.custom_delay(seconds=delays[0]) + + # 分液到目标容器 + if use_proportional_mixing: + # 按不同比例混合:使用对应的 dis_vols + dis_vol = dis_vols[idx] + dis_flow_rate = dis_flow_rates[idx] if dis_flow_rates and len(dis_flow_rates) > idx else None + dis_offset = offsets[idx] if offsets and len(offsets) > idx else None + dis_liquid_height = liquid_height[idx] if liquid_height and len(liquid_height) > idx else None + dis_blow_out = blow_out_air_volume[idx] if blow_out_air_volume and len(blow_out_air_volume) > idx else None + else: + # 标准模式:分液体积等于吸液体积 + dis_vol = asp_vols[idx] + dis_flow_rate = dis_flow_rates[0] if dis_flow_rates and len(dis_flow_rates) > 0 else None + dis_offset = offsets[0] if offsets and len(offsets) > 0 else None + dis_liquid_height = liquid_height[0] if liquid_height and len(liquid_height) > 0 else None + dis_blow_out = blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None + + await self.dispense( + resources=[target], + vols=[dis_vol], + use_channels=use_channels, + flow_rates=[dis_flow_rate] if dis_flow_rate is not None else None, + offsets=[dis_offset] if dis_offset is not None else None, + blow_out_air_volume=[dis_blow_out] if dis_blow_out is not None else None, + liquid_height=[dis_liquid_height] if dis_liquid_height is not None else None, + spread=spread, + ) + + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + + await self.discard_tips(use_channels=use_channels) + + # 最后在目标容器中混合(如果需要) + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=[target], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[0:1] if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + + if touch_tip: + await self.touch_tip([target]) + + elif len(use_channels) == 8: + # 8通道模式:需要确保源数量是8的倍数 + if len(sources) % 8 != 0: + raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.") + + # 每次处理8个源 + for i in range(0, len(sources), 8): + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + current_sources = sources[i:i + 8] + current_asp_vols = asp_vols[i:i + 8] + current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None + current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + + # 从8个源容器吸液 + await self.aspirate( + resources=current_sources, + vols=current_asp_vols, + use_channels=use_channels, + flow_rates=current_asp_flow_rates, + offsets=current_asp_offset, + blow_out_air_volume=current_asp_blow_out_air_volume, + liquid_height=current_asp_liquid_height, + spread=spread, + ) + + if delays is not None: + await self.custom_delay(seconds=delays[0]) + + # 分液到目标容器(每个通道分液到同一个目标) + if use_proportional_mixing: + # 按比例混合:使用对应的 dis_vols + current_dis_vols = dis_vols[i:i + 8] + current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None + current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + else: + # 标准模式:每个通道分液体积等于其吸液体积 + current_dis_vols = current_asp_vols + current_dis_flow_rates = dis_flow_rates[0:1] * 8 if dis_flow_rates else None + current_dis_offset = offsets[0:1] * 8 if offsets else [None] * 8 + current_dis_liquid_height = liquid_height[0:1] * 8 if liquid_height else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume else [None] * 8 + + await self.dispense( + resources=[target] * 8, # 8个通道都分到同一个目标 + vols=current_dis_vols, + use_channels=use_channels, + flow_rates=current_dis_flow_rates, + offsets=current_dis_offset, + blow_out_air_volume=current_dis_blow_out_air_volume, + liquid_height=current_dis_liquid_height, + spread=spread, + ) + + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + + await self.discard_tips([0,1,2,3,4,5,6,7]) + + # 最后在目标容器中混合(如果需要) + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: + await self.mix( + targets=[target], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets[0:1] if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + + if touch_tip: + await self.touch_tip([target]) # except Exception as e: # traceback.print_exc() diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index fb19af03..ae6db267 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -1144,7 +1144,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): queried_resources = [] for resource_data in resource_inputs: plr_resource = await self.get_resource_with_dir( - resource_ids=resource_data["id"], with_children=True + resource_id=resource_data["id"], with_children=True ) queried_resources.append(plr_resource)