From 23cf713a8055dade91caaecc4d8d2165ce70a28e Mon Sep 17 00:00:00 2001 From: Guangxin Zhang Date: Wed, 16 Jul 2025 12:52:35 +0800 Subject: [PATCH] Update prcxi.py --- .../liquid_handler_abstract.py | 307 +++++++++++++----- .../devices/liquid_handling/prcxi/prcxi.py | 59 +++- 2 files changed, 265 insertions(+), 101 deletions(-) diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index dc8757b..045f5a1 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -119,46 +119,114 @@ class LiquidHandlerAbstract(LiquidHandler): else: if len(asp_vols) != len(targets): raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") - tip = next(self.current_tip) - await self.pick_up_tips(tip) + + # 首先应该对任务分组,然后每次1个/8个进行操作处理 + if len(use_channels) == 1: + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) - for _ in range(len(targets)): - await self.aspirate( - resources=reagent_sources, - vols=[asp_vols[_]], - use_channels=use_channels, - flow_rates=[flow_rates[0]] if flow_rates else None, - offsets=[offsets[0]] if offsets else None, - liquid_height=[liquid_height[0]] if liquid_height else None, - blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, - spread=spread, - ) - if delays is not None: - await self.custom_delay(seconds=delays[0]) - await self.dispense( - resources=[targets[_]], - vols=[dis_vols[_]], - use_channels=use_channels, - flow_rates=[flow_rates[1]] if flow_rates else None, - offsets=[offsets[1]] if offsets else None, - blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, - liquid_height=[liquid_height[1]] if liquid_height else None, - spread=spread, - ) - if delays is not None: - await self.custom_delay(seconds=delays[1]) - await self.mix( - targets=targets[_], - mix_time=mix_time, - mix_vol=mix_vol, - offsets=offsets if offsets else None, - height_to_bottom=mix_liquid_height if mix_liquid_height else None, - mix_rate=mix_rate if mix_rate else None, - ) - if delays is not None: - await self.custom_delay(seconds=delays[1]) - await self.touch_tip(targets[_]) - await self.discard_tips() + for _ in range(len(targets)): + await self.aspirate( + resources=reagent_sources, + vols=[asp_vols[_]], + use_channels=use_channels, + flow_rates=[flow_rates[0]] if flow_rates else None, + offsets=[offsets[0]] if offsets else None, + liquid_height=[liquid_height[0]] if liquid_height else None, + blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=[targets[_]], + vols=[dis_vols[_]], + use_channels=use_channels, + flow_rates=[flow_rates[1]] if flow_rates else None, + offsets=[offsets[1]] if offsets else None, + blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, + liquid_height=[liquid_height[1]] if liquid_height else None, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + await self.mix( + targets=targets[_], + mix_time=mix_time, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + await self.touch_tip(targets[_]) + await self.discard_tips() + elif len(use_channels) == 8: + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 + if len(targets) % 8 != 0: + raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") + + # 8个8个来取任务序列 + + for i in range(0, len(targets), 8): + # 取出8个任务 + current_targets = targets[i:i + 8] + current_reagent_sources = reagent_sources[i:i + 8] + current_asp_vols = asp_vols[i:i + 8] + current_dis_vols = dis_vols[i:i + 8] + current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8 + current_dis_flow_rates = flow_rates[i+8 :i + 16] if flow_rates else [None] * 8 + current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8 + + await self.aspirate( + resources=current_reagent_sources, + vols=current_asp_vols, + use_channels=use_channels, + flow_rates=current_asp_flow_rates, + offsets=current_asp_offset, + liquid_height=current_asp_liquid_height, + blow_out_air_volume=current_asp_blow_out_air_volume, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=current_targets, + vols=current_dis_vols, + use_channels=use_channels, + flow_rates=current_dis_flow_rates, + offsets=current_dis_offset, + liquid_height=current_dis_liquid_height, + blow_out_air_volume=current_dis_blow_out_air_volume, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + + await self.mix( + targets=current_targets, + mix_time=mix_time, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + #await self.touch_tip(current_targets) except Exception as e: traceback.print_exc() @@ -208,56 +276,126 @@ class LiquidHandlerAbstract(LiquidHandler): """ try: - # ------------------------------------------------------------------ - # 96‑channel head mode - # ------------------------------------------------------------------ if is_96_well: - pass # This mode is not verified + pass # This mode is not verified. else: - if not (len(asp_vols) == len(sources) and len(dis_vols) == len(targets)): - raise ValueError("`sources`, `targets`, and `vols` must have the same length.") - - tip_iter = self.iter_tips(tip_racks) - for src, tgt, asp_vol, asp_flow_rate, dis_vol, dis_flow_rate in zip( - sources, targets, asp_vols, asp_flow_rates, dis_vols, dis_flow_rates - ): - tip = next(tip_iter) + if len(asp_vols) != len(targets): + raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") + + # 首先应该对任务分组,然后每次1个/8个进行操作处理 + if len(use_channels) == 1: + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) - # Aspirate from source - await self.aspirate( - resources=[src], - vols=[asp_vol], - use_channels=use_channels, - flow_rates=[asp_flow_rate], - offsets=offsets, - liquid_height=liquid_height, - blow_out_air_volume=blow_out_air_volume, - spread=spread, - ) - self.custom_delay(seconds=delays[0] if delays else 0) - # Dispense into target - await self.dispense( - resources=[tgt], - vols=[dis_vol], - use_channels=use_channels, - flow_rates=[dis_flow_rate], - offsets=offsets, - liquid_height=liquid_height, - blow_out_air_volume=blow_out_air_volume, - spread=spread, - ) - await self.mix( - targets=[tgt], - mix_time=mix_times[0] if mix_times else None, - mix_vol=mix_vol[0] if mix_vol else None, - mix_rate=mix_rate[0] if mix_rate else None, - ) - if touch_tip: - await self.touch_tip(tgt) + + for _ in range(len(targets)): + await self.aspirate( + resources=sources, + vols=[asp_vols[_]], + use_channels=use_channels, + flow_rates=[asp_flow_rates[0]] if asp_flow_rates else None, + offsets=[offsets[0]] if offsets else None, + liquid_height=[liquid_height[0]] if liquid_height else None, + blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=[targets[_]], + vols=[dis_vols[_]], + use_channels=use_channels, + flow_rates=[dis_flow_rates[1]] if dis_flow_rates else None, + offsets=[offsets[1]] if offsets else None, + blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, + liquid_height=[liquid_height[1]] if liquid_height else None, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + await self.mix( + targets=targets[_], + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + await self.touch_tip(targets[_]) await self.discard_tips() - except Exception as exc: - raise RuntimeError(f"Liquid transfer failed: {exc}") from exc + elif len(use_channels) == 8: + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + + # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 + if len(targets) % 8 != 0: + raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") + + # 8个8个来取任务序列 + + for i in range(0, len(targets), 8): + # 取出8个任务 + current_targets = targets[i:i + 8] + current_reagent_sources = sources[i:i + 8] + current_asp_vols = asp_vols[i:i + 8] + current_dis_vols = dis_vols[i:i + 8] + current_asp_flow_rates = asp_flow_rates[i:i + 8] + current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 + current_dis_offset = offsets[i+8 :i + 16] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 + current_dis_liquid_height = liquid_height[i+8 :i + 16] if liquid_height else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i+8 :i + 16] if blow_out_air_volume else [None] * 8 + current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8 + + await self.aspirate( + resources=current_reagent_sources, + vols=current_asp_vols, + use_channels=use_channels, + flow_rates=current_asp_flow_rates, + offsets=current_asp_offset, + blow_out_air_volume=current_asp_blow_out_air_volume, + liquid_height=current_asp_liquid_height, + spread=spread, + ) + + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=current_targets, + vols=current_dis_vols, + use_channels=use_channels, + flow_rates=current_dis_flow_rates, + offsets=current_dis_offset, + blow_out_air_volume=current_dis_blow_out_air_volume, + liquid_height=current_dis_liquid_height, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + + await self.mix( + targets=current_targets, + mix_time=mix_times, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, + ) + if delays is not None: + await self.custom_delay(seconds=delays[1]) + + #await self.touch_tip(current_targets) + except Exception as e: + traceback.print_exc() + raise RuntimeError(f"Liquid addition failed: {e}") from e + # --------------------------------------------------------------- # Helper utilities @@ -339,6 +477,7 @@ class LiquidHandlerAbstract(LiquidHandler): def set_tiprack(self, tip_racks: Sequence[TipRack]): """Set the tip racks for the liquid handler.""" + self.tip_racks = tip_racks tip_iter = self.iter_tips(tip_racks) self.current_tip = tip_iter diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index 56bd12e..0d68654 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -392,7 +392,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int] = None): """Pick up tips from the specified resource.""" - + if len(ops) != 8: raise ValueError(f"PRCXI9300Backend pick_up_tips: Expected 8 pickups, got {len(ops)}") @@ -430,7 +430,6 @@ class PRCXI9300Backend(LiquidHandlerBackend): ) self.steps_todo_list.append(step) - async def drop_tips(self, ops: List[Drop], use_channels: List[int] = None): """Pick up tips from the specified resource.""" @@ -569,6 +568,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): hole_numbers="1,2,3,4,5,6,7,8", ) self.steps_todo_list.append(step) + async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int] = None): @@ -1027,26 +1027,51 @@ if __name__ == "__main__": handler.set_tiprack([tip_rack]) # Set the tip rack for the handler asyncio.run(handler.setup()) # Initialize the handler and setup the connection asyncio.run(handler.create_protocol(protocol_name="Test Protocol")) # Initialize the backend and setup the connection - asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7])) - asyncio.run(handler.aspirate(well_containers.children[:8],[50]*8, [0,1,2,3,4,5,6,7])) - asyncio.run(handler.dispense(well_containers.children[:8],[50]*8,[0,1,2,3,4,5,6,7])) - asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7])) - asyncio.run(handler.mix(well_containers.children[:8], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100)) - print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info + # asyncio.run(handler.pick_up_tips(tip_rack.children[:8],[0,1,2,3,4,5,6,7])) + # asyncio.run(handler.aspirate(well_containers.children[:8],[50]*8, [0,1,2,3,4,5,6,7])) + # asyncio.run(handler.dispense(well_containers.children[:8],[50]*8,[0,1,2,3,4,5,6,7])) + # asyncio.run(handler.drop_tips(tip_rack.children[8:16],[0,1,2,3,4,5,6,7])) + # asyncio.run(handler.mix(well_containers.children[:8], mix_time=3, mix_vol=50, height_to_bottom=0.5, offsets=Coordinate(0, 0, 0), mix_rate=100)) + #print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info asyncio.run(handler.add_liquid( - asp_vols=[100]*8, - dis_vols=[100]*8, - reagent_sources=well_containers.children[-8:], - targets=well_containers.children[:8], + asp_vols=[100]*16, + dis_vols=[100]*16, + reagent_sources=well_containers.children[-16:], + targets=well_containers.children[:16], use_channels=[0, 1, 2, 3, 4, 5, 6, 7], - flow_rates=[None] * 8, - offsets=[Coordinate(0, 0, 0)] * 8, - liquid_height=[None] * 8, - blow_out_air_volume=[None] * 8, + flow_rates=[None] * 16, + offsets=[Coordinate(0, 0, 0)] * 16, + liquid_height=[None] * 16, + blow_out_air_volume=[None] * 16, + delays=None, + mix_time=3, + mix_vol=50, spread="wide", )) + + # asyncio.run(handler.transfer_liquid( + # asp_vols=[100]*16, + # dis_vols=[100]*16, + # tip_racks=[tip_rack], + # sources=well_containers.children[-16:], + # targets=well_containers.children[:16], + # use_channels=[0, 1, 2, 3, 4, 5, 6, 7], + # offsets=[Coordinate(0, 0, 0)] * 32, + # asp_flow_rates=[None] * 16, + # dis_flow_rates=[None] * 16, + # liquid_height=[None] * 32, + # blow_out_air_volume=[None] * 32, + # mix_times=3, + # mix_vol=50, + # spread="wide", + # )) + print(json.dumps(handler._unilabos_backend.steps_todo_list, indent=2)) # Print matrix info # input("pick_up_tips add step") - asyncio.run(handler.run_protocol()) # Run the protocol + #asyncio.run(handler.run_protocol()) # Run the protocol # input("Running protocol...") # input("Press Enter to continue...") # Wait for user input before proceeding # print("PRCXI9300Handler initialized with deck and host settings.") + + + +# 但是怎么丢tip?这个需要手动设置一下? \ No newline at end of file