diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 7cefc8e..0ecf460 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -359,9 +359,7 @@ class HTTPClient: Returns: Dict: API响应数据,包含 code 和 data (uuid, name) """ - # target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取 payload = { - "target_lab_uuid": "cf44e98c-7f3e-4175-b526-1fa338b43f65", "name": name, "data": { "workflow_uuid": workflow_uuid, diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index d02129c..aa695a0 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -28,21 +28,40 @@ from pylabrobot.resources import ( Tip, ) +from unilabos.registry.placeholder_type import ResourceSlot from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode +from unilabos.resources.resource_tracker import ResourceTreeSet + + class SimpleReturn(TypedDict): samples: list volumes: list + +class SetLiquidReturn(TypedDict): + wells: list + volumes: list + + +class SetLiquidFromPlateReturn(TypedDict): + plate: list + wells: list + volumes: list + + class LiquidHandlerMiddleware(LiquidHandler): - def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs): + def __init__( + self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool = False, channel_num: int = 8, **kwargs + ): self._simulator = simulator self.channel_num = channel_num self.pending_liquids_dict = {} joint_config = kwargs.get("joint_config", None) if simulator: if joint_config: - self._simulate_backend = UniLiquidHandlerRvizBackend(channel_num, kwargs["total_height"], - joint_config=joint_config, lh_device_id=deck.name) + self._simulate_backend = UniLiquidHandlerRvizBackend( + channel_num, kwargs["total_height"], joint_config=joint_config, lh_device_id=deck.name + ) else: self._simulate_backend = LiquidHandlerChatterboxBackend(channel_num) self._simulate_handler = LiquidHandlerAbstract(self._simulate_backend, deck, False) @@ -137,7 +156,7 @@ class LiquidHandlerMiddleware(LiquidHandler): ) await super().drop_tips(tip_spots, use_channels, offsets, allow_nonzero_volume, **backend_kwargs) self.pending_liquids_dict = {} - return + return async def return_tips( self, use_channels: Optional[list[int]] = None, allow_nonzero_volume: bool = False, **backend_kwargs @@ -159,11 +178,13 @@ class LiquidHandlerMiddleware(LiquidHandler): if not offsets or (isinstance(offsets, list) and len(offsets) != len(use_channels)): offsets = [Coordinate.zero()] * len(use_channels) if self._simulator: - return await self._simulate_handler.discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs) + return await self._simulate_handler.discard_tips( + use_channels, allow_nonzero_volume, offsets, **backend_kwargs + ) await super().discard_tips(use_channels, allow_nonzero_volume, offsets, **backend_kwargs) self.pending_liquids_dict = {} - return - + return + def _check_containers(self, resources: Sequence[Resource]): super()._check_containers(resources) @@ -180,7 +201,6 @@ class LiquidHandlerMiddleware(LiquidHandler): **backend_kwargs, ): - if self._simulator: return await self._simulate_handler.aspirate( resources, @@ -208,15 +228,16 @@ class LiquidHandlerMiddleware(LiquidHandler): res_samples = [] res_volumes = [] for resource, volume, channel in zip(resources, vols, use_channels): - res_samples.append({"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)}) + res_samples.append( + {"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)} + ) res_volumes.append(volume) self.pending_liquids_dict[channel] = { "sample_uuid": resource.unilabos_extra.get("sample_uuid", None), - "volume": volume + "volume": volume, } return SimpleReturn(samples=res_samples, volumes=res_volumes) - async def dispense( self, resources: Sequence[Container], @@ -261,7 +282,7 @@ class LiquidHandlerMiddleware(LiquidHandler): res_volumes.append(volume) return SimpleReturn(samples=res_samples, volumes=res_volumes) - + async def transfer( self, source: Well, @@ -578,10 +599,18 @@ class LiquidHandlerMiddleware(LiquidHandler): class LiquidHandlerAbstract(LiquidHandlerMiddleware): """Extended LiquidHandler with additional operations.""" + support_touch_tip = True _ros_node: BaseROS2DeviceNode - def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8, total_height:float = 310): + def __init__( + self, + backend: LiquidHandlerBackend, + deck: Deck, + simulator: bool = False, + channel_num: int = 8, + total_height: float = 310, + ): """Initialize a LiquidHandler. Args: @@ -605,6 +634,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): module_name = ".".join(components[:-1]) try: import importlib + mod = importlib.import_module(module_name) except ImportError: mod = None @@ -614,6 +644,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): # Try pylabrobot style import (if available) try: import pylabrobot + backend_cls = getattr(pylabrobot, type_str, None) except Exception: backend_cls = None @@ -631,16 +662,56 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): self._ros_node = ros_node @classmethod - def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SimpleReturn: - """Set the liquid in a well.""" - res_samples = [] + def set_liquid(cls, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SetLiquidReturn: + """Set the liquid in a well. + + 如果 liquid_names 和 volumes 为空,但 wells 不为空,直接返回 wells。 + """ res_volumes = [] + # 如果 liquid_names 和 volumes 都为空,直接返回 wells + if not liquid_names and not volumes: + return SetLiquidReturn( + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore + ) + for well, liquid_name, volume in zip(wells, liquid_names, volumes): well.set_liquids([(liquid_name, volume)]) # type: ignore - res_samples.append({"name": well.name, "sample_uuid": well.unilabos_extra.get("sample_uuid", None)}) res_volumes.append(volume) - - return SimpleReturn(samples=res_samples, volumes=res_volumes) + + return SetLiquidReturn( + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore + ) + + @classmethod + def set_liquid_from_plate( + cls, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float] + ) -> SetLiquidFromPlateReturn: + """Set the liquid in wells of a plate by well names (e.g., A1, A2, B3). + + 如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。 + """ + # 根据 well_names 获取对应的 Well 对象 + wells = [plate.get_well(name) for name in well_names] + res_volumes = [] + + # 如果 liquid_names 和 volumes 都为空,直接返回 + if not liquid_names and not volumes: + return SetLiquidFromPlateReturn( + plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore + volumes=res_volumes, + ) + + for well, liquid_name, volume in zip(wells, liquid_names, volumes): + well.set_liquids([(liquid_name, volume)]) # type: ignore + res_volumes.append(volume) + + return SetLiquidFromPlateReturn( + plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore + wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore + volumes=res_volumes, + ) + # --------------------------------------------------------------- # REMOVE LIQUID -------------------------------------------------- # --------------------------------------------------------------- @@ -655,7 +726,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): source_wells = self.group_info.get(source_group_name, []) target_wells = self.group_info.get(target_group_name, []) - + rack_info = dict() for child in self.deck.children: if issubclass(child.__class__, TipRack): @@ -666,17 +737,17 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): break else: rack_info[rack.name] = (rack, tip.maximal_volume - unit_volume) - + if len(rack_info) == 0: raise ValueError(f"No tip rack can support volume {unit_volume}.") - + rack_info = sorted(rack_info.items(), key=lambda x: x[1][1]) for child in self.deck.children: if child.name == rack_info[0][0]: target_rack = child target_rack = cast(TipRack, target_rack) available_tips = {} - for (idx, tipSpot) in enumerate(target_rack.get_all_items()): + for idx, tipSpot in enumerate(target_rack.get_all_items()): if tipSpot.has_tip(): available_tips[idx] = tipSpot continue @@ -684,10 +755,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): print("channel_num", self.channel_num) if self.channel_num == 8: - tip_prefix = list(available_tips.values())[0].name.split('_')[0] - colnum_list = [int(tip.name.split('_')[-1][1:]) for tip in available_tips.values()] + tip_prefix = list(available_tips.values())[0].name.split("_")[0] + colnum_list = [int(tip.name.split("_")[-1][1:]) for tip in available_tips.values()] available_cols = [colnum for colnum, count in dict(Counter(colnum_list)).items() if count == 8] - available_cols.sort() + available_cols.sort() available_tips_dict = {tip.name: tip for tip in available_tips.values()} tips_to_use = [available_tips_dict[f"{tip_prefix}_{chr(65 + i)}{available_cols[0]}"] for i in range(8)] print("tips_to_use", tips_to_use) @@ -698,16 +769,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): await self.dispense(target_wells, [unit_volume] * 8, use_channels=list(range(0, 8))) await self.discard_tips(use_channels=list(range(0, 8))) - elif self.channel_num == 1: - + elif self.channel_num == 1: + for num_well in range(len(target_wells)): - tip_to_use = available_tips[list(available_tips.keys())[num_well]] + tip_to_use = available_tips[list(available_tips.keys())[num_well]] print("tip_to_use", tip_to_use) await self.pick_up_tips([tip_to_use], use_channels=[0]) print("source_wells", source_wells) print("target_wells", target_wells) if len(source_wells) == 1: - await self.aspirate([source_wells[0]], [unit_volume], use_channels=[0]) + await self.aspirate([source_wells[0]], [unit_volume], use_channels=[0]) else: await self.aspirate([source_wells[num_well]], [unit_volume], use_channels=[0]) await self.dispense([target_wells[num_well]], [unit_volume], use_channels=[0]) @@ -729,7 +800,6 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): """Create a new protocol with the given metadata.""" pass - async def remove_liquid( self, vols: List[float], @@ -787,11 +857,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): await self.discard_tips() elif len(use_channels) == 8 and self.backend.num_channels == 8: - - + # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 if len(sources) % 8 != 0: - raise ValueError(f"Length of `sources` {len(sources)} must be a multiple of 8 for 8-channel mode.") + raise ValueError( + f"Length of `sources` {len(sources)} must be a multiple of 8 for 8-channel mode." + ) # 8个8个来取任务序列 @@ -800,18 +871,28 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for _ in range(len(use_channels)): tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) - current_targets = waste_liquid[i:i + 8] - current_reagent_sources = sources[i:i + 8] - current_asp_vols = vols[i:i + 8] - current_dis_vols = vols[i:i + 8] - current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8 - current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8 - current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8 - current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8 - current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8 + current_targets = waste_liquid[i : i + 8] + current_reagent_sources = sources[i : i + 8] + current_asp_vols = vols[i : i + 8] + current_dis_vols = vols[i : i + 8] + current_asp_flow_rates = flow_rates[i : i + 8] if flow_rates else [None] * 8 + current_dis_flow_rates = ( + flow_rates[-i * 8 - 8 : len(flow_rates) - i * 8] if flow_rates else [None] * 8 + ) + current_asp_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_dis_offset = offsets[-i * 8 - 8 : len(offsets) - i * 8] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_dis_liquid_height = ( + liquid_height[-i * 8 - 8 : len(liquid_height) - i * 8] if liquid_height else [None] * 8 + ) + current_asp_blow_out_air_volume = ( + blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + ) + current_dis_blow_out_air_volume = ( + blow_out_air_volume[-i * 8 - 8 : len(blow_out_air_volume) - i * 8] + if blow_out_air_volume + else [None] * 8 + ) await self.aspirate( resources=current_reagent_sources, @@ -838,7 +919,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) await self.touch_tip(current_targets) - await self.discard_tips() + await self.discard_tips() except Exception as e: traceback.print_exc() @@ -872,127 +953,136 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): # """A complete *add* (aspirate reagent → dispense into targets) operation.""" # # try: - if is_96_well: - pass # This mode is not verified. - else: - if len(asp_vols) != len(targets): - raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") - # 首先应该对任务分组,然后每次1个/8个进行操作处理 - if len(use_channels) == 1: - for _ in range(len(targets)): - tip = [] - for x in range(len(use_channels)): - tip.extend(next(self.current_tip)) - await self.pick_up_tips(tip) + if is_96_well: + pass # This mode is not verified. + else: + if len(asp_vols) != len(targets): + raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.") + # 首先应该对任务分组,然后每次1个/8个进行操作处理 + if len(use_channels) == 1: + for _ in range(len(targets)): + tip = [] + for x in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) - await self.aspirate( - resources=[reagent_sources[_]], - vols=[asp_vols[_]], - use_channels=use_channels, - flow_rates=[flow_rates[0]] if flow_rates else None, - offsets=[offsets[0]] if offsets else None, - liquid_height=[liquid_height[0]] if liquid_height else None, - blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, - spread=spread, + await self.aspirate( + resources=[reagent_sources[_]], + vols=[asp_vols[_]], + use_channels=use_channels, + flow_rates=[flow_rates[0]] if flow_rates else None, + offsets=[offsets[0]] if offsets else None, + liquid_height=[liquid_height[0]] if liquid_height else None, + blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None, + spread=spread, + ) + + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=[targets[_]], + vols=[dis_vols[_]], + use_channels=use_channels, + flow_rates=[flow_rates[1]] if flow_rates else None, + offsets=[offsets[1]] if offsets else None, + blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, + liquid_height=[liquid_height[1]] if liquid_height else None, + spread=spread, + ) + + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + # 只有在 mix_time 有效时才调用 mix + if mix_time is not None and mix_time > 0: + await self.mix( + targets=[targets[_]], + mix_time=mix_time, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, ) - - if delays is not None: - await self.custom_delay(seconds=delays[0]) - await self.dispense( - resources=[targets[_]], - vols=[dis_vols[_]], - use_channels=use_channels, - flow_rates=[flow_rates[1]] if flow_rates else None, - offsets=[offsets[1]] if offsets else None, - blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None, - liquid_height=[liquid_height[1]] if liquid_height else None, - spread=spread, + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + await self.touch_tip(targets[_]) + await self.discard_tips() + + elif len(use_channels) == 8: + # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 + if len(targets) % 8 != 0: + raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") + + for i in range(0, len(targets), 8): + tip = [] + for _ in range(len(use_channels)): + tip.extend(next(self.current_tip)) + await self.pick_up_tips(tip) + current_targets = targets[i : i + 8] + current_reagent_sources = reagent_sources[i : i + 8] + current_asp_vols = asp_vols[i : i + 8] + current_dis_vols = dis_vols[i : i + 8] + current_asp_flow_rates = flow_rates[i : i + 8] if flow_rates else [None] * 8 + current_dis_flow_rates = ( + flow_rates[-i * 8 - 8 : len(flow_rates) - i * 8] if flow_rates else [None] * 8 + ) + current_asp_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_dis_offset = offsets[-i * 8 - 8 : len(offsets) - i * 8] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_dis_liquid_height = ( + liquid_height[-i * 8 - 8 : len(liquid_height) - i * 8] if liquid_height else [None] * 8 + ) + current_asp_blow_out_air_volume = ( + blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + ) + current_dis_blow_out_air_volume = ( + blow_out_air_volume[-i * 8 - 8 : len(blow_out_air_volume) - i * 8] + if blow_out_air_volume + else [None] * 8 + ) + + await self.aspirate( + resources=current_reagent_sources, + vols=current_asp_vols, + use_channels=use_channels, + flow_rates=current_asp_flow_rates, + offsets=current_asp_offset, + liquid_height=current_asp_liquid_height, + blow_out_air_volume=current_asp_blow_out_air_volume, + spread=spread, + ) + if delays is not None: + await self.custom_delay(seconds=delays[0]) + await self.dispense( + resources=current_targets, + vols=current_dis_vols, + use_channels=use_channels, + flow_rates=current_dis_flow_rates, + offsets=current_dis_offset, + liquid_height=current_dis_liquid_height, + blow_out_air_volume=current_dis_blow_out_air_volume, + spread=spread, + ) + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + + # 只有在 mix_time 有效时才调用 mix + if mix_time is not None and mix_time > 0: + await self.mix( + targets=current_targets, + mix_time=mix_time, + mix_vol=mix_vol, + offsets=offsets if offsets else None, + height_to_bottom=mix_liquid_height if mix_liquid_height else None, + mix_rate=mix_rate if mix_rate else None, ) + if delays is not None and len(delays) > 1: + await self.custom_delay(seconds=delays[1]) + await self.touch_tip(current_targets) + await self.discard_tips() - if delays is not None and len(delays) > 1: - await self.custom_delay(seconds=delays[1]) - # 只有在 mix_time 有效时才调用 mix - if mix_time is not None and mix_time > 0: - await self.mix( - targets=[targets[_]], - mix_time=mix_time, - mix_vol=mix_vol, - offsets=offsets if offsets else None, - height_to_bottom=mix_liquid_height if mix_liquid_height else None, - mix_rate=mix_rate if mix_rate else None, - ) - if delays is not None and len(delays) > 1: - await self.custom_delay(seconds=delays[1]) - await self.touch_tip(targets[_]) - await self.discard_tips() - - elif len(use_channels) == 8: - # 对于8个的情况,需要判断此时任务是不是能被8通道移液站来成功处理 - if len(targets) % 8 != 0: - raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.") - - for i in range(0, len(targets), 8): - tip = [] - for _ in range(len(use_channels)): - tip.extend(next(self.current_tip)) - await self.pick_up_tips(tip) - current_targets = targets[i:i + 8] - current_reagent_sources = reagent_sources[i:i + 8] - current_asp_vols = asp_vols[i:i + 8] - current_dis_vols = dis_vols[i:i + 8] - current_asp_flow_rates = flow_rates[i:i + 8] if flow_rates else [None] * 8 - current_dis_flow_rates = flow_rates[-i*8-8:len(flow_rates)-i*8] if flow_rates else [None] * 8 - current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8 - current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8 - current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8 - - await self.aspirate( - resources=current_reagent_sources, - vols=current_asp_vols, - use_channels=use_channels, - flow_rates=current_asp_flow_rates, - offsets=current_asp_offset, - liquid_height=current_asp_liquid_height, - blow_out_air_volume=current_asp_blow_out_air_volume, - spread=spread, - ) - if delays is not None: - await self.custom_delay(seconds=delays[0]) - await self.dispense( - resources=current_targets, - vols=current_dis_vols, - use_channels=use_channels, - flow_rates=current_dis_flow_rates, - offsets=current_dis_offset, - liquid_height=current_dis_liquid_height, - blow_out_air_volume=current_dis_blow_out_air_volume, - spread=spread, - ) - if delays is not None and len(delays) > 1: - await self.custom_delay(seconds=delays[1]) - - # 只有在 mix_time 有效时才调用 mix - if mix_time is not None and mix_time > 0: - await self.mix( - targets=current_targets, - mix_time=mix_time, - mix_vol=mix_vol, - offsets=offsets if offsets else None, - height_to_bottom=mix_liquid_height if mix_liquid_height else None, - mix_rate=mix_rate if mix_rate else None, - ) - if delays is not None and len(delays) > 1: - await self.custom_delay(seconds=delays[1]) - await self.touch_tip(current_targets) - await self.discard_tips() - - - # except Exception as e: - # traceback.print_exc() - # raise RuntimeError(f"Liquid addition failed: {e}") from e + # except Exception as e: + # traceback.print_exc() + # raise RuntimeError(f"Liquid addition failed: {e}") from e # --------------------------------------------------------------- # TRANSFER LIQUID ------------------------------------------------ @@ -1050,12 +1140,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): Number of mix cycles. If *None* (default) no mixing occurs regardless of mix_stage. """ - + # 确保 use_channels 有默认值 if use_channels is None: # 默认使用设备所有通道(例如 8 通道移液站默认就是 0-7) use_channels = list(range(self.channel_num)) if self.channel_num > 0 else [0] - + if is_96_well: pass # This mode is not verified. else: @@ -1064,7 +1154,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): asp_vols = [float(asp_vols)] else: asp_vols = [float(v) for v in asp_vols] - + if isinstance(dis_vols, (int, float)): dis_vols = [float(dis_vols)] else: @@ -1081,37 +1171,79 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): pass if mix_times is not None: mix_times = int(mix_times) - + # 识别传输模式(mix_times 为 None 也应该能正常移液,只是不做 mix) num_sources = len(sources) num_targets = len(targets) - + if num_sources == 1 and num_targets > 1: # 模式1: 一对多 (1 source -> N targets) await self._transfer_one_to_many( - sources[0], targets, tip_racks, use_channels, - asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, - offsets, touch_tip, liquid_height, blow_out_air_volume, - spread, mix_stage, mix_times, mix_vol, mix_rate, - mix_liquid_height, delays + sources[0], + targets, + tip_racks, + use_channels, + asp_vols, + dis_vols, + asp_flow_rates, + dis_flow_rates, + offsets, + touch_tip, + liquid_height, + blow_out_air_volume, + spread, + mix_stage, + mix_times, + mix_vol, + mix_rate, + mix_liquid_height, + delays, ) elif num_sources > 1 and num_targets == 1: # 模式2: 多对一 (N sources -> 1 target) await self._transfer_many_to_one( - sources, targets[0], tip_racks, use_channels, - asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, - offsets, touch_tip, liquid_height, blow_out_air_volume, - spread, mix_stage, mix_times, mix_vol, mix_rate, - mix_liquid_height, delays + sources, + targets[0], + tip_racks, + use_channels, + asp_vols, + dis_vols, + asp_flow_rates, + dis_flow_rates, + offsets, + touch_tip, + liquid_height, + blow_out_air_volume, + spread, + mix_stage, + mix_times, + mix_vol, + mix_rate, + mix_liquid_height, + delays, ) elif num_sources == num_targets: # 模式3: 一对一 (N sources -> N targets) await self._transfer_one_to_one( - sources, targets, tip_racks, use_channels, - asp_vols, dis_vols, asp_flow_rates, dis_flow_rates, - offsets, touch_tip, liquid_height, blow_out_air_volume, - spread, mix_stage, mix_times, mix_vol, mix_rate, - mix_liquid_height, delays + sources, + targets, + tip_racks, + use_channels, + asp_vols, + dis_vols, + asp_flow_rates, + dis_flow_rates, + offsets, + touch_tip, + liquid_height, + blow_out_air_volume, + spread, + mix_stage, + mix_times, + mix_vol, + mix_rate, + mix_liquid_height, + delays, ) else: raise ValueError( @@ -1174,7 +1306,9 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): flow_rates=[asp_flow_rates[_]] if asp_flow_rates and len(asp_flow_rates) > _ else None, offsets=[offsets[_]] if offsets and len(offsets) > _ else None, liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None, - blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None, + blow_out_air_volume=( + [blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None + ), spread=spread, ) if delays is not None: @@ -1185,7 +1319,9 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): use_channels=use_channels, flow_rates=[dis_flow_rates[_]] if dis_flow_rates and len(dis_flow_rates) > _ else None, offsets=[offsets[_]] if offsets and len(offsets) > _ else None, - blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None, + blow_out_air_volume=( + [blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None + ), liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None, spread=spread, ) @@ -1214,18 +1350,18 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for _ in range(len(use_channels)): tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) - current_targets = targets[i:i + 8] - current_reagent_sources = sources[i:i + 8] - current_asp_vols = asp_vols[i:i + 8] - current_dis_vols = dis_vols[i:i + 8] - current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None - current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None + current_targets = targets[i : i + 8] + current_reagent_sources = sources[i : i + 8] + current_asp_vols = asp_vols[i : i + 8] + current_dis_vols = dis_vols[i : i + 8] + current_asp_flow_rates = asp_flow_rates[i : i + 8] if asp_flow_rates else None + current_asp_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_dis_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_dis_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + current_dis_flow_rates = dis_flow_rates[i : i + 8] if dis_flow_rates else None if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: await self.mix( @@ -1275,7 +1411,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) await self.touch_tip(current_targets) - await self.discard_tips([0,1,2,3,4,5,6,7]) + await self.discard_tips([0, 1, 2, 3, 4, 5, 6, 7]) async def _transfer_one_to_many( self, @@ -1307,7 +1443,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): asp_vol = asp_vols[0] if asp_vols[0] >= total_asp_vol else total_asp_vol else: raise ValueError("For one-to-many mode, `asp_vols` should be a single value or list with one element.") - + if len(dis_vols) != len(targets): raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.") @@ -1324,7 +1460,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): targets=[target], mix_time=mix_times, mix_vol=mix_vol, - offsets=offsets[idx:idx + 1] if offsets and len(offsets) > idx else None, + offsets=offsets[idx : idx + 1] if offsets and len(offsets) > idx else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) @@ -1337,13 +1473,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None, offsets=[offsets[0]] if offsets and len(offsets) > 0 else None, liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None, - blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None, + blow_out_air_volume=( + [blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None + ), spread=spread, ) - + if delays is not None: await self.custom_delay(seconds=delays[0]) - + # 分多次分液到不同的目标容器 for idx, target in enumerate(targets): await self.dispense( @@ -1352,7 +1490,9 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): use_channels=use_channels, flow_rates=[dis_flow_rates[idx]] if dis_flow_rates and len(dis_flow_rates) > idx else None, offsets=[offsets[idx]] if offsets and len(offsets) > idx else None, - blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None, + blow_out_air_volume=( + [blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None + ), liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None, spread=spread, ) @@ -1363,46 +1503,54 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): targets=[target], mix_time=mix_times, mix_vol=mix_vol, - offsets=offsets[idx:idx+1] if offsets else None, + offsets=offsets[idx : idx + 1] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) if touch_tip: await self.touch_tip([target]) - + await self.discard_tips(use_channels=use_channels) - + elif len(use_channels) == 8: # 8通道模式:需要确保目标数量是8的倍数 if len(targets) % 8 != 0: raise ValueError(f"For 8-channel mode, number of targets {len(targets)} must be a multiple of 8.") - + # 每次处理8个目标 for i in range(0, len(targets), 8): tip = [] for _ in range(len(use_channels)): tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) - - current_targets = targets[i:i + 8] - current_dis_vols = dis_vols[i:i + 8] - + + current_targets = targets[i : i + 8] + current_dis_vols = dis_vols[i : i + 8] + # 8个通道都从同一个源容器吸液,每个通道的吸液体积等于对应的分液体积 - current_asp_flow_rates = asp_flow_rates[0:1] * 8 if asp_flow_rates and len(asp_flow_rates) > 0 else None + current_asp_flow_rates = ( + asp_flow_rates[0:1] * 8 if asp_flow_rates and len(asp_flow_rates) > 0 else None + ) current_asp_offset = offsets[0:1] * 8 if offsets and len(offsets) > 0 else [None] * 8 - current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8 - current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8 - + current_asp_liquid_height = ( + liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8 + ) + current_asp_blow_out_air_volume = ( + blow_out_air_volume[0:1] * 8 + if blow_out_air_volume and len(blow_out_air_volume) > 0 + else [None] * 8 + ) + if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: await self.mix( targets=current_targets, mix_time=mix_times, mix_vol=mix_vol, - offsets=offsets[i:i + 8] if offsets else None, + offsets=offsets[i : i + 8] if offsets else None, height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - + # 从源容器吸液(8个通道都从同一个源,但每个通道的吸液体积不同) await self.aspirate( resources=[source] * 8, # 8个通道都从同一个源 @@ -1414,16 +1562,16 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): blow_out_air_volume=current_asp_blow_out_air_volume, spread=spread, ) - + if delays is not None: await self.custom_delay(seconds=delays[0]) - + # 分液到8个目标 - current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None - current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - + current_dis_flow_rates = dis_flow_rates[i : i + 8] if dis_flow_rates else None + current_dis_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_dis_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_dis_blow_out_air_volume = blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + await self.dispense( resources=current_targets, vols=current_dis_vols, @@ -1434,10 +1582,10 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): liquid_height=current_dis_liquid_height, spread=spread, ) - + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - + if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: await self.mix( targets=current_targets, @@ -1447,11 +1595,11 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - + if touch_tip: await self.touch_tip(current_targets) - - await self.discard_tips([0,1,2,3,4,5,6,7]) + + await self.discard_tips([0, 1, 2, 3, 4, 5, 6, 7]) async def _transfer_many_to_one( self, @@ -1479,7 +1627,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): # 验证和扩展体积参数 if len(asp_vols) != len(sources): raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `sources` {len(sources)}.") - + # 支持两种模式: # 1. dis_vols 为单个值:所有源汇总,使用总吸液体积或指定分液体积 # 2. dis_vols 长度等于 asp_vols:每个源按不同比例分液(按比例混合) @@ -1509,7 +1657,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - + # 从每个源容器吸液并分液到目标容器 for idx, source in enumerate(sources): tip = [] @@ -1524,13 +1672,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): flow_rates=[asp_flow_rates[idx]] if asp_flow_rates and len(asp_flow_rates) > idx else None, offsets=[offsets[idx]] if offsets and len(offsets) > idx else None, liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None, - blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None, + blow_out_air_volume=( + [blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None + ), spread=spread, ) - + if delays is not None: await self.custom_delay(seconds=delays[0]) - + # 分液到目标容器 if use_proportional_mixing: # 按不同比例混合:使用对应的 dis_vols @@ -1538,15 +1688,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): dis_flow_rate = dis_flow_rates[idx] if dis_flow_rates and len(dis_flow_rates) > idx else None dis_offset = offsets[idx] if offsets and len(offsets) > idx else None dis_liquid_height = liquid_height[idx] if liquid_height and len(liquid_height) > idx else None - dis_blow_out = blow_out_air_volume[idx] if blow_out_air_volume and len(blow_out_air_volume) > idx else None + dis_blow_out = ( + blow_out_air_volume[idx] if blow_out_air_volume and len(blow_out_air_volume) > idx else None + ) else: # 标准模式:分液体积等于吸液体积 dis_vol = asp_vols[idx] dis_flow_rate = dis_flow_rates[0] if dis_flow_rates and len(dis_flow_rates) > 0 else None dis_offset = offsets[0] if offsets and len(offsets) > 0 else None dis_liquid_height = liquid_height[0] if liquid_height and len(liquid_height) > 0 else None - dis_blow_out = blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None - + dis_blow_out = ( + blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None + ) + await self.dispense( resources=[target], vols=[dis_vol], @@ -1557,12 +1711,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): liquid_height=[dis_liquid_height] if dis_liquid_height is not None else None, spread=spread, ) - + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - + await self.discard_tips(use_channels=use_channels) - + # 最后在目标容器中混合(如果需要) if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: await self.mix( @@ -1573,15 +1727,15 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - + if touch_tip: await self.touch_tip([target]) - + elif len(use_channels) == 8: # 8通道模式:需要确保源数量是8的倍数 if len(sources) % 8 != 0: raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.") - + # 每次处理8个源 if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0: await self.mix( @@ -1598,14 +1752,14 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): for _ in range(len(use_channels)): tip.extend(next(self.current_tip)) await self.pick_up_tips(tip) - - current_sources = sources[i:i + 8] - current_asp_vols = asp_vols[i:i + 8] - current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None - current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 - + + current_sources = sources[i : i + 8] + current_asp_vols = asp_vols[i : i + 8] + current_asp_flow_rates = asp_flow_rates[i : i + 8] if asp_flow_rates else None + current_asp_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_asp_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_asp_blow_out_air_volume = blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + # 从8个源容器吸液 await self.aspirate( resources=current_sources, @@ -1617,26 +1771,30 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): liquid_height=current_asp_liquid_height, spread=spread, ) - + if delays is not None: await self.custom_delay(seconds=delays[0]) - + # 分液到目标容器(每个通道分液到同一个目标) if use_proportional_mixing: # 按比例混合:使用对应的 dis_vols - current_dis_vols = dis_vols[i:i + 8] - current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None - current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8 - current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8 + current_dis_vols = dis_vols[i : i + 8] + current_dis_flow_rates = dis_flow_rates[i : i + 8] if dis_flow_rates else None + current_dis_offset = offsets[i : i + 8] if offsets else [None] * 8 + current_dis_liquid_height = liquid_height[i : i + 8] if liquid_height else [None] * 8 + current_dis_blow_out_air_volume = ( + blow_out_air_volume[i : i + 8] if blow_out_air_volume else [None] * 8 + ) else: # 标准模式:每个通道分液体积等于其吸液体积 current_dis_vols = current_asp_vols current_dis_flow_rates = dis_flow_rates[0:1] * 8 if dis_flow_rates else None current_dis_offset = offsets[0:1] * 8 if offsets else [None] * 8 current_dis_liquid_height = liquid_height[0:1] * 8 if liquid_height else [None] * 8 - current_dis_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume else [None] * 8 - + current_dis_blow_out_air_volume = ( + blow_out_air_volume[0:1] * 8 if blow_out_air_volume else [None] * 8 + ) + await self.dispense( resources=[target] * 8, # 8个通道都分到同一个目标 vols=current_dis_vols, @@ -1647,12 +1805,12 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): liquid_height=current_dis_liquid_height, spread=spread, ) - + if delays is not None and len(delays) > 1: await self.custom_delay(seconds=delays[1]) - - await self.discard_tips([0,1,2,3,4,5,6,7]) - + + await self.discard_tips([0, 1, 2, 3, 4, 5, 6, 7]) + # 最后在目标容器中混合(如果需要) if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0: await self.mix( @@ -1663,7 +1821,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): height_to_bottom=mix_liquid_height if mix_liquid_height else None, mix_rate=mix_rate if mix_rate else None, ) - + if touch_tip: await self.touch_tip([target]) @@ -1671,7 +1829,6 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): # traceback.print_exc() # raise RuntimeError(f"Liquid addition failed: {e}") from e - # --------------------------------------------------------------- # Helper utilities # --------------------------------------------------------------- @@ -1692,7 +1849,6 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): print(f"Current time: {time.strftime('%H:%M:%S')}") async def touch_tip(self, targets: Sequence[Container]): - """Touch the tip to the side of the well.""" if not self.support_touch_tip: diff --git a/unilabos/devices/liquid_handling/prcxi/prcxi.py b/unilabos/devices/liquid_handling/prcxi/prcxi.py index e0c7e80..4f96255 100644 --- a/unilabos/devices/liquid_handling/prcxi/prcxi.py +++ b/unilabos/devices/liquid_handling/prcxi/prcxi.py @@ -30,9 +30,30 @@ from pylabrobot.liquid_handling.standard import ( ResourceMove, ResourceDrop, ) -from pylabrobot.resources import ResourceHolder, ResourceStack, Tip, Deck, Plate, Well, TipRack, Resource, Container, Coordinate, TipSpot, Trash, PlateAdapter, TubeRack +from pylabrobot.resources import ( + ResourceHolder, + ResourceStack, + Tip, + Deck, + Plate, + Well, + TipRack, + Resource, + Container, + Coordinate, + TipSpot, + Trash, + PlateAdapter, + TubeRack, +) -from unilabos.devices.liquid_handling.liquid_handler_abstract import LiquidHandlerAbstract, SimpleReturn +from unilabos.devices.liquid_handling.liquid_handler_abstract import ( + LiquidHandlerAbstract, + SimpleReturn, + SetLiquidReturn, + SetLiquidFromPlateReturn, +) +from unilabos.registry.placeholder_type import ResourceSlot from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode @@ -80,6 +101,7 @@ class PRCXI9300Deck(Deck): self.slots[slot - 1] = resource super().assign_child_resource(resource, location=self.slot_locations[slot - 1]) + class PRCXI9300Container(Plate): """PRCXI 9300 的专用 Container 类,继承自 Plate,用于槽位定位和未知模块。 @@ -108,20 +130,29 @@ class PRCXI9300Container(Plate): def serialize_state(self) -> Dict[str, Dict[str, Any]]: data = super().serialize_state() data.update(self._unilabos_state) - return data + return data + + class PRCXI9300Plate(Plate): - """ + """ 专用孔板类: 1. 继承自 PLR 原生 Plate,保留所有物理特性。 2. 增加 material_info 参数,用于在初始化时直接绑定 Unilab UUID。 """ - def __init__(self, name: str, size_x: float, size_y: float, size_z: float, - category: str = "plate", - ordered_items: collections.OrderedDict = None, - ordering: Optional[collections.OrderedDict] = None, - model: Optional[str] = None, - material_info: Optional[Dict[str, Any]] = None, - **kwargs): + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "plate", + ordered_items: collections.OrderedDict = None, + ordering: Optional[collections.OrderedDict] = None, + model: Optional[str] = None, + material_info: Optional[Dict[str, Any]] = None, + **kwargs, + ): # 如果 ordered_items 不为 None,直接使用 if ordered_items is not None: items = ordered_items @@ -142,40 +173,34 @@ class PRCXI9300Plate(Plate): else: items = None ordering_param = None - + # 根据情况传递不同的参数 if items is not None: - super().__init__(name, size_x, size_y, size_z, - ordered_items=items, - category=category, - model=model, **kwargs) + super().__init__( + name, size_x, size_y, size_z, ordered_items=items, category=category, model=model, **kwargs + ) elif ordering_param is not None: # 传递 ordering 参数,让 Plate 自己创建 Well 对象 - super().__init__(name, size_x, size_y, size_z, - ordering=ordering_param, - category=category, - model=model, **kwargs) + super().__init__( + name, size_x, size_y, size_z, ordering=ordering_param, category=category, model=model, **kwargs + ) else: - super().__init__(name, size_x, size_y, size_z, - category=category, - model=model, **kwargs) - + super().__init__(name, size_x, size_y, size_z, category=category, model=model, **kwargs) + self._unilabos_state = {} if material_info: self._unilabos_state["Material"] = material_info - def load_state(self, state: Dict[str, Any]) -> None: super().load_state(state) self._unilabos_state = state - def serialize_state(self) -> Dict[str, Dict[str, Any]]: try: data = super().serialize_state() except AttributeError: data = {} - if hasattr(self, '_unilabos_state') and self._unilabos_state: + if hasattr(self, "_unilabos_state") and self._unilabos_state: safe_state = {} for k, v in self._unilabos_state.items(): # 如果是 Material 字典,深入检查 @@ -188,23 +213,32 @@ class PRCXI9300Plate(Plate): else: # 打印日志提醒(可选) # print(f"Warning: Removing non-serializable key {mk} from {self.name}") - pass + pass safe_state[k] = safe_material # 其他顶层属性也进行类型检查 elif isinstance(v, (str, int, float, bool, list, dict, type(None))): safe_state[k] = v - + data.update(safe_state) - return data # 其他顶层属性也进行类型检查 + return data # 其他顶层属性也进行类型检查 + + class PRCXI9300TipRack(TipRack): - """ 专用吸头盒类 """ - def __init__(self, name: str, size_x: float, size_y: float, size_z: float, - category: str = "tip_rack", - ordered_items: collections.OrderedDict = None, - ordering: Optional[collections.OrderedDict] = None, - model: Optional[str] = None, - material_info: Optional[Dict[str, Any]] = None, - **kwargs): + """专用吸头盒类""" + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "tip_rack", + ordered_items: collections.OrderedDict = None, + ordering: Optional[collections.OrderedDict] = None, + model: Optional[str] = None, + material_info: Optional[Dict[str, Any]] = None, + **kwargs, + ): # 如果 ordered_items 不为 None,直接使用 if ordered_items is not None: items = ordered_items @@ -225,27 +259,23 @@ class PRCXI9300TipRack(TipRack): else: items = None ordering_param = None - + # 根据情况传递不同的参数 if items is not None: - super().__init__(name, size_x, size_y, size_z, - ordered_items=items, - category=category, - model=model, **kwargs) + super().__init__( + name, size_x, size_y, size_z, ordered_items=items, category=category, model=model, **kwargs + ) elif ordering_param is not None: # 传递 ordering 参数,让 TipRack 自己创建 Tip 对象 - super().__init__(name, size_x, size_y, size_z, - ordering=ordering_param, - category=category, - model=model, **kwargs) + super().__init__( + name, size_x, size_y, size_z, ordering=ordering_param, category=category, model=model, **kwargs + ) else: - super().__init__(name, size_x, size_y, size_z, - category=category, - model=model, **kwargs) + super().__init__(name, size_x, size_y, size_z, category=category, model=model, **kwargs) self._unilabos_state = {} if material_info: self._unilabos_state["Material"] = material_info - + def load_state(self, state: Dict[str, Any]) -> None: super().load_state(state) self._unilabos_state = state @@ -255,7 +285,7 @@ class PRCXI9300TipRack(TipRack): data = super().serialize_state() except AttributeError: data = {} - if hasattr(self, '_unilabos_state') and self._unilabos_state: + if hasattr(self, "_unilabos_state") and self._unilabos_state: safe_state = {} for k, v in self._unilabos_state.items(): # 如果是 Material 字典,深入检查 @@ -268,26 +298,33 @@ class PRCXI9300TipRack(TipRack): else: # 打印日志提醒(可选) # print(f"Warning: Removing non-serializable key {mk} from {self.name}") - pass + pass safe_state[k] = safe_material # 其他顶层属性也进行类型检查 elif isinstance(v, (str, int, float, bool, list, dict, type(None))): safe_state[k] = v - + data.update(safe_state) return data - + + class PRCXI9300Trash(Trash): """PRCXI 9300 的专用 Trash 类,继承自 Trash。 该类定义了 PRCXI 9300 的工作台布局和槽位信息。 """ - def __init__(self, name: str, size_x: float, size_y: float, size_z: float, - category: str = "trash", - material_info: Optional[Dict[str, Any]] = None, - **kwargs): - + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "trash", + material_info: Optional[Dict[str, Any]] = None, + **kwargs, + ): + if name != "trash": print(f"Warning: PRCXI9300Trash usually expects name='trash' for backend logic, but got '{name}'.") super().__init__(name, size_x, size_y, size_z, **kwargs) @@ -306,7 +343,7 @@ class PRCXI9300Trash(Trash): data = super().serialize_state() except AttributeError: data = {} - if hasattr(self, '_unilabos_state') and self._unilabos_state: + if hasattr(self, "_unilabos_state") and self._unilabos_state: safe_state = {} for k, v in self._unilabos_state.items(): # 如果是 Material 字典,深入检查 @@ -319,29 +356,37 @@ class PRCXI9300Trash(Trash): else: # 打印日志提醒(可选) # print(f"Warning: Removing non-serializable key {mk} from {self.name}") - pass + pass safe_state[k] = safe_material # 其他顶层属性也进行类型检查 elif isinstance(v, (str, int, float, bool, list, dict, type(None))): safe_state[k] = v - + data.update(safe_state) return data + class PRCXI9300TubeRack(TubeRack): """ 专用管架类:用于 EP 管架、试管架等。 继承自 PLR 的 TubeRack,并支持注入 material_info (UUID)。 """ - def __init__(self, name: str, size_x: float, size_y: float, size_z: float, - category: str = "tube_rack", - items: Optional[Dict[str, Any]] = None, - ordered_items: Optional[OrderedDict] = None, - ordering: Optional[OrderedDict] = None, - model: Optional[str] = None, - material_info: Optional[Dict[str, Any]] = None, - **kwargs): - + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "tube_rack", + items: Optional[Dict[str, Any]] = None, + ordered_items: Optional[OrderedDict] = None, + ordering: Optional[OrderedDict] = None, + model: Optional[str] = None, + material_info: Optional[Dict[str, Any]] = None, + **kwargs, + ): + # 如果 ordered_items 不为 None,直接使用 if ordered_items is not None: items_to_pass = ordered_items @@ -367,24 +412,16 @@ class PRCXI9300TubeRack(TubeRack): else: items_to_pass = None ordering_param = None - + # 根据情况传递不同的参数 if items_to_pass is not None: - super().__init__(name, size_x, size_y, size_z, - ordered_items=items_to_pass, - model=model, - **kwargs) + super().__init__(name, size_x, size_y, size_z, ordered_items=items_to_pass, model=model, **kwargs) elif ordering_param is not None: # 传递 ordering 参数,让 TubeRack 自己创建 Tube 对象 - super().__init__(name, size_x, size_y, size_z, - ordering=ordering_param, - model=model, - **kwargs) + super().__init__(name, size_x, size_y, size_z, ordering=ordering_param, model=model, **kwargs) else: - super().__init__(name, size_x, size_y, size_z, - model=model, - **kwargs) - + super().__init__(name, size_x, size_y, size_z, model=model, **kwargs) + self._unilabos_state = {} if material_info: self._unilabos_state["Material"] = material_info @@ -394,7 +431,7 @@ class PRCXI9300TubeRack(TubeRack): data = super().serialize_state() except AttributeError: data = {} - if hasattr(self, '_unilabos_state') and self._unilabos_state: + if hasattr(self, "_unilabos_state") and self._unilabos_state: safe_state = {} for k, v in self._unilabos_state.items(): # 如果是 Material 字典,深入检查 @@ -407,33 +444,41 @@ class PRCXI9300TubeRack(TubeRack): else: # 打印日志提醒(可选) # print(f"Warning: Removing non-serializable key {mk} from {self.name}") - pass + pass safe_state[k] = safe_material # 其他顶层属性也进行类型检查 elif isinstance(v, (str, int, float, bool, list, dict, type(None))): safe_state[k] = v - + data.update(safe_state) return data + class PRCXI9300PlateAdapter(PlateAdapter): """ 专用板式适配器类:用于承载 Plate 的底座(如 PCR 适配器、磁吸架等)。 支持注入 material_info (UUID)。 """ - def __init__(self, name: str, size_x: float, size_y: float, size_z: float, - category: str = "plate_adapter", - model: Optional[str] = None, - material_info: Optional[Dict[str, Any]] = None, - # 参数给予默认值 (标准96孔板尺寸) - adapter_hole_size_x: float = 127.76, - adapter_hole_size_y: float = 85.48, - adapter_hole_size_z: float = 10.0, # 假设凹槽深度或板子放置高度 - dx: Optional[float] = None, - dy: Optional[float] = None, - dz: float = 0.0, # 默认Z轴偏移 - **kwargs): - + + def __init__( + self, + name: str, + size_x: float, + size_y: float, + size_z: float, + category: str = "plate_adapter", + model: Optional[str] = None, + material_info: Optional[Dict[str, Any]] = None, + # 参数给予默认值 (标准96孔板尺寸) + adapter_hole_size_x: float = 127.76, + adapter_hole_size_y: float = 85.48, + adapter_hole_size_z: float = 10.0, # 假设凹槽深度或板子放置高度 + dx: Optional[float] = None, + dy: Optional[float] = None, + dz: float = 0.0, # 默认Z轴偏移 + **kwargs, + ): + # 自动居中计算:如果未指定 dx/dy,则根据适配器尺寸和孔尺寸计算居中位置 if dx is None: dx = (size_x - adapter_hole_size_x) / 2 @@ -441,20 +486,20 @@ class PRCXI9300PlateAdapter(PlateAdapter): dy = (size_y - adapter_hole_size_y) / 2 super().__init__( - name=name, - size_x=size_x, - size_y=size_y, - size_z=size_z, + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, dx=dx, dy=dy, dz=dz, adapter_hole_size_x=adapter_hole_size_x, adapter_hole_size_y=adapter_hole_size_y, adapter_hole_size_z=adapter_hole_size_z, - model=model, - **kwargs + model=model, + **kwargs, ) - + self._unilabos_state = {} if material_info: self._unilabos_state["Material"] = material_info @@ -464,7 +509,7 @@ class PRCXI9300PlateAdapter(PlateAdapter): data = super().serialize_state() except AttributeError: data = {} - if hasattr(self, '_unilabos_state') and self._unilabos_state: + if hasattr(self, "_unilabos_state") and self._unilabos_state: safe_state = {} for k, v in self._unilabos_state.items(): # 如果是 Material 字典,深入检查 @@ -477,15 +522,16 @@ class PRCXI9300PlateAdapter(PlateAdapter): else: # 打印日志提醒(可选) # print(f"Warning: Removing non-serializable key {mk} from {self.name}") - pass + pass safe_state[k] = safe_material # 其他顶层属性也进行类型检查 elif isinstance(v, (str, int, float, bool, list, dict, type(None))): safe_state[k] = v - + data.update(safe_state) return data + class PRCXI9300Handler(LiquidHandlerAbstract): support_touch_tip = False @@ -518,7 +564,9 @@ class PRCXI9300Handler(LiquidHandlerAbstract): if "Material" in child.children[0]._unilabos_state: number = int(child.name.replace("T", "")) tablets_info.append( - WorkTablets(Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"]) + WorkTablets( + Number=number, Code=f"T{number}", Material=child.children[0]._unilabos_state["Material"] + ) ) if is_9320: print("当前设备是9320") @@ -538,9 +586,14 @@ class PRCXI9300Handler(LiquidHandlerAbstract): super().post_init(ros_node) self._unilabos_backend.post_init(ros_node) - def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SimpleReturn: + def set_liquid(self, wells: list[Well], liquid_names: list[str], volumes: list[float]) -> SetLiquidReturn: return super().set_liquid(wells, liquid_names, volumes) + def set_liquid_from_plate( + self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float] + ) -> SetLiquidFromPlateReturn: + return super().set_liquid_from_plate(plate, well_names, liquid_names, volumes) + def set_group(self, group_name: str, wells: List[Well], volumes: List[float]): return super().set_group(group_name, wells, volumes) @@ -799,7 +852,8 @@ class PRCXI9300Handler(LiquidHandlerAbstract): return await self._unilabos_backend.shaker_action(time, module_no, amplitude, is_wait) async def heater_action(self, temperature: float, time: int): - return await self._unilabos_backend.heater_action(temperature, time) + return await self._unilabos_backend.heater_action(temperature, time) + async def move_plate( self, plate: Plate, @@ -822,10 +876,11 @@ class PRCXI9300Handler(LiquidHandlerAbstract): drop_direction, pickup_direction, pickup_distance_from_top, - target_plate_number = to, + target_plate_number=to, **backend_kwargs, ) + class PRCXI9300Backend(LiquidHandlerBackend): """PRCXI 9300 的后端实现,继承自 LiquidHandlerBackend。 @@ -878,31 +933,28 @@ class PRCXI9300Backend(LiquidHandlerBackend): self.steps_todo_list.append(step) return step - async def pick_up_resource(self, pickup: ResourcePickup, **backend_kwargs): - - resource=pickup.resource - offset=pickup.offset - pickup_distance_from_top=pickup.pickup_distance_from_top - direction=pickup.direction + + resource = pickup.resource + offset = pickup.offset + pickup_distance_from_top = pickup.pickup_distance_from_top + direction = pickup.direction plate_number = int(resource.parent.name.replace("T", "")) is_whole_plate = True balance_height = 0 step = self.api_client.clamp_jaw_pick_up(plate_number, is_whole_plate, balance_height) - + self.steps_todo_list.append(step) return step async def drop_resource(self, drop: ResourceDrop, **backend_kwargs): - plate_number = None target_plate_number = backend_kwargs.get("target_plate_number", None) if target_plate_number is not None: plate_number = int(target_plate_number.name.replace("T", "")) - is_whole_plate = True balance_height = 0 if plate_number is None: @@ -911,7 +963,6 @@ class PRCXI9300Backend(LiquidHandlerBackend): self.steps_todo_list.append(step) return step - async def heater_action(self, temperature: float, time: int): print(f"\n\nHeater action: temperature={temperature}, time={time}\n\n") # return await self.api_client.heater_action(temperature, time) @@ -968,7 +1019,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): error_code = self.api_client.get_error_code() if error_code: print(f"PRCXI9300 error code detected: {error_code}") - + # 清除错误代码 self.api_client.clear_error_code() print("PRCXI9300 error code cleared.") @@ -976,11 +1027,11 @@ class PRCXI9300Backend(LiquidHandlerBackend): # 执行重置 print("Starting PRCXI9300 reset...") self.api_client.call("IAutomation", "Reset") - + # 检查重置状态并等待完成 while not self.is_reset_ok: print("Waiting for PRCXI9300 to reset...") - if hasattr(self, '_ros_node') and self._ros_node is not None: + if hasattr(self, "_ros_node") and self._ros_node is not None: await self._ros_node.sleep(1) else: await asyncio.sleep(1) @@ -998,7 +1049,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): """Pick up tips from the specified resource.""" # INSERT_YOUR_CODE # Ensure use_channels is converted to a list of ints if it's an array - if hasattr(use_channels, 'tolist'): + if hasattr(use_channels, "tolist"): _use_channels = use_channels.tolist() else: _use_channels = list(use_channels) if use_channels is not None else None @@ -1052,7 +1103,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): async def drop_tips(self, ops: List[Drop], use_channels: List[int] = None): """Pick up tips from the specified resource.""" - if hasattr(use_channels, 'tolist'): + if hasattr(use_channels, "tolist"): _use_channels = use_channels.tolist() else: _use_channels = list(use_channels) if use_channels is not None else None @@ -1135,7 +1186,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): none_keys: List[str] = [], ): """Mix liquid in the specified resources.""" - + plate_indexes = [] for op in targets: deck = op.parent.parent.parent @@ -1178,7 +1229,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int] = None): """Aspirate liquid from the specified resources.""" - if hasattr(use_channels, 'tolist'): + if hasattr(use_channels, "tolist"): _use_channels = use_channels.tolist() else: _use_channels = list(use_channels) if use_channels is not None else None @@ -1235,7 +1286,7 @@ class PRCXI9300Backend(LiquidHandlerBackend): async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int] = None): """Dispense liquid into the specified resources.""" - if hasattr(use_channels, 'tolist'): + if hasattr(use_channels, "tolist"): _use_channels = use_channels.tolist() else: _use_channels = list(use_channels) if use_channels is not None else None @@ -1416,7 +1467,6 @@ class PRCXI9300Api: time.sleep(1) return success - def call(self, service: str, method: str, params: Optional[list] = None) -> Any: payload = json.dumps( {"ServiceName": service, "MethodName": method, "Paramters": params or []}, separators=(",", ":") @@ -1543,7 +1593,7 @@ class PRCXI9300Api: assist_fun5: str = "", liquid_method: str = "NormalDispense", axis: str = "Left", - ) -> Dict[str, Any]: + ) -> Dict[str, Any]: return { "StepAxis": axis, "Function": "Imbibing", @@ -1621,7 +1671,7 @@ class PRCXI9300Api: assist_fun5: str = "", liquid_method: str = "NormalDispense", axis: str = "Left", - ) -> Dict[str, Any]: + ) -> Dict[str, Any]: return { "StepAxis": axis, "Function": "Blending", @@ -1681,11 +1731,11 @@ class PRCXI9300Api: "LiquidDispensingMethod": liquid_method, } - def clamp_jaw_pick_up(self, + def clamp_jaw_pick_up( + self, plate_no: int, is_whole_plate: bool, balance_height: int, - ) -> Dict[str, Any]: return { "StepAxis": "ClampingJaw", @@ -1695,7 +1745,7 @@ class PRCXI9300Api: "HoleRow": 1, "HoleCol": 1, "BalanceHeight": balance_height, - "PlateOrHoleNum": f"T{plate_no}" + "PlateOrHoleNum": f"T{plate_no}", } def clamp_jaw_drop( @@ -1703,7 +1753,6 @@ class PRCXI9300Api: plate_no: int, is_whole_plate: bool, balance_height: int, - ) -> Dict[str, Any]: return { "StepAxis": "ClampingJaw", @@ -1713,7 +1762,7 @@ class PRCXI9300Api: "HoleRow": 1, "HoleCol": 1, "BalanceHeight": balance_height, - "PlateOrHoleNum": f"T{plate_no}" + "PlateOrHoleNum": f"T{plate_no}", } def shaker_action(self, time: int, module_no: int, amplitude: int, is_wait: bool): @@ -1726,6 +1775,7 @@ class PRCXI9300Api: "AssistFun4": is_wait, } + class DefaultLayout: def __init__(self, product_name: str = "PRCXI9300"): @@ -2104,7 +2154,9 @@ if __name__ == "__main__": size_y=50, size_z=10, category="tip_rack", - ordered_items=collections.OrderedDict({k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()}), + ordered_items=collections.OrderedDict( + {k: f"{child_prefix}_{k}" for k, v in tip_racks["ordering"].items()} + ), ) tip_rack_serialized = tip_rack.serialize() tip_rack_serialized["parent_name"] = deck.name @@ -2299,43 +2351,37 @@ if __name__ == "__main__": A = tree_to_list([resource_plr_to_ulab(deck)]) with open("deck.json", "w", encoding="utf-8") as f: - A.insert(0, { - "id": "PRCXI", - "name": "PRCXI", - "parent": None, - "type": "device", - "class": "liquid_handler.prcxi", - "position": { - "x": 0, - "y": 0, - "z": 0 - }, - "config": { - "deck": { - "_resource_child_name": "PRCXI_Deck", - "_resource_type": "unilabos.devices.liquid_handling.prcxi.prcxi:PRCXI9300Deck" + A.insert( + 0, + { + "id": "PRCXI", + "name": "PRCXI", + "parent": None, + "type": "device", + "class": "liquid_handler.prcxi", + "position": {"x": 0, "y": 0, "z": 0}, + "config": { + "deck": { + "_resource_child_name": "PRCXI_Deck", + "_resource_type": "unilabos.devices.liquid_handling.prcxi.prcxi:PRCXI9300Deck", + }, + "host": "192.168.0.121", + "port": 9999, + "timeout": 10.0, + "axis": "Right", + "channel_num": 1, + "setup": False, + "debug": True, + "simulator": True, + "matrix_id": "5de524d0-3f95-406c-86dd-f83626ebc7cb", + "is_9320": True, }, - "host": "192.168.0.121", - "port": 9999, - "timeout": 10.0, - "axis": "Right", - "channel_num": 1, - "setup": False, - "debug": True, - "simulator": True, - "matrix_id": "5de524d0-3f95-406c-86dd-f83626ebc7cb", - "is_9320": True + "data": {}, + "children": ["PRCXI_Deck"], }, - "data": {}, - "children": [ - "PRCXI_Deck" - ] - }) + ) A[1]["parent"] = "PRCXI" - json.dump({ - "nodes": A, - "links": [] - }, f, indent=4, ensure_ascii=False) + json.dump({"nodes": A, "links": []}, f, indent=4, ensure_ascii=False) handler = PRCXI9300Handler( deck=deck, @@ -2377,7 +2423,6 @@ if __name__ == "__main__": time.sleep(5) os._exit(0) - prcxi_api = PRCXI9300Api(host="192.168.0.121", port=9999) prcxi_api.list_matrices() prcxi_api.get_all_materials() diff --git a/unilabos/registry/devices/liquid_handler.yaml b/unilabos/registry/devices/liquid_handler.yaml index 7e3aaeb..b2612e7 100644 --- a/unilabos/registry/devices/liquid_handler.yaml +++ b/unilabos/registry/devices/liquid_handler.yaml @@ -9284,7 +9284,13 @@ liquid_handler.prcxi: data_source: handle data_type: resource handler_key: input_wells - label: InputWells + label: 待设定液体孔 + output: + - data_key: wells.@flatten + data_source: executor + data_type: resource + handler_key: output_wells + label: 已设定液体孔 placeholder_keys: wells: unilabos_resources result: {} @@ -9400,6 +9406,163 @@ liquid_handler.prcxi: title: LiquidHandlerSetLiquid type: object type: LiquidHandlerSetLiquid + set_liquid_from_plate: + feedback: {} + goal: {} + goal_default: + liquid_names: null + plate: null + volumes: null + well_names: null + handles: + input: + - data_key: plate + data_source: handle + data_type: resource + handler_key: input_plate + label: 待设定液体板 + output: + - data_key: plate.@flatten + data_source: executor + data_type: resource + handler_key: output_plate + label: 已设定液体板 + - data_key: wells.@flatten + data_source: executor + data_type: resource + handler_key: output_wells + label: 已设定液体孔 + - data_key: volumes + data_source: executor + data_type: number_array + handler_key: output_volumes + label: 各孔设定体积 + placeholder_keys: + plate: unilabos_resources + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + liquid_names: + items: + type: string + type: array + plate: + properties: + category: + type: string + children: + items: + type: string + type: array + config: + type: string + data: + type: string + id: + type: string + name: + type: string + parent: + type: string + pose: + properties: + orientation: + properties: + w: + type: number + x: + type: number + y: + type: number + z: + type: number + required: + - x + - y + - z + - w + title: orientation + type: object + position: + properties: + x: + type: number + y: + type: number + z: + type: number + required: + - x + - y + - z + title: position + type: object + required: + - position + - orientation + title: pose + type: object + sample_id: + type: string + type: + type: string + required: + - id + - name + - sample_id + - children + - parent + - type + - category + - pose + - config + - data + title: plate + type: object + volumes: + items: + type: number + type: array + well_names: + items: + type: string + type: array + required: + - plate + - well_names + - liquid_names + - volumes + type: object + result: + properties: + plate: + items: {} + title: Plate + type: array + volumes: + items: + type: number + title: Volumes + type: array + wells: + items: {} + title: Wells + type: array + required: + - plate + - wells + - volumes + title: SetLiquidFromPlateReturn + type: object + required: + - goal + title: set_liquid_from_plate参数 + type: object + type: UniLabJsonCommand set_tiprack: feedback: {} goal: diff --git a/unilabos/resources/resource_tracker.py b/unilabos/resources/resource_tracker.py index d1bc3a5..8a0fef3 100644 --- a/unilabos/resources/resource_tracker.py +++ b/unilabos/resources/resource_tracker.py @@ -997,7 +997,7 @@ class DeviceNodeResourceTracker(object): extra = name_to_extra_map[resource_name] self.set_resource_extra(res, extra) if len(extra): - logger.debug(f"设置资源Extra: {resource_name} -> {extra}") + logger.trace(f"设置资源Extra: {resource_name} -> {extra}") return 1 return 0 diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index c1cf002..ad073d9 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -1,3 +1,89 @@ +""" +工作流转换模块 - JSON 到 WorkflowGraph 的转换流程 + +==================== 输入格式 (JSON) ==================== + +{ + "workflow": [ + {"action": "transfer_liquid", "action_args": {"sources": "cell_lines", "targets": "Liquid_1", "asp_vol": 100.0, "dis_vol": 74.75, ...}}, + ... + ], + "reagent": { + "cell_lines": {"slot": 4, "well": ["A1", "A3", "A5"], "labware": "DRUG + YOYO-MEDIA"}, + "Liquid_1": {"slot": 1, "well": ["A4", "A7", "A10"], "labware": "rep 1"}, + ... + } +} + +==================== 转换步骤 ==================== + +第一步: 按 slot 去重创建 create_resource 节点(创建板子) +-------------------------------------------------------------------------------- +- 遍历所有 reagent,按 slot 去重,为每个唯一的 slot 创建一个板子 +- 生成参数: + res_id: plate_slot_{slot} + device_id: /PRCXI + class_name: PRCXI_BioER_96_wellplate + parent: /PRCXI/PRCXI_Deck/T{slot} + slot_on_deck: "{slot}" +- 输出端口: labware(用于连接 set_liquid_from_plate) +- 控制流: create_resource 之间通过 ready 端口串联 + +示例: slot=1, slot=4 -> 创建 2 个 create_resource 节点 + +第二步: 为每个 reagent 创建 set_liquid_from_plate 节点(设置液体) +-------------------------------------------------------------------------------- +- 遍历所有 reagent,为每个试剂创建 set_liquid_from_plate 节点 +- 生成参数: + plate: [](通过连接传递,来自 create_resource 的 labware) + well_names: ["A1", "A3", "A5"](来自 reagent 的 well 数组) + liquid_names: ["cell_lines", "cell_lines", "cell_lines"](与 well 数量一致) + volumes: [1e5, 1e5, 1e5](与 well 数量一致,默认体积) +- 输入连接: create_resource (labware) -> set_liquid_from_plate (input_plate) +- 输出端口: output_wells(用于连接 transfer_liquid) +- 控制流: set_liquid_from_plate 连接在所有 create_resource 之后,通过 ready 端口串联 + +第三步: 解析 workflow,创建 transfer_liquid 等动作节点 +-------------------------------------------------------------------------------- +- 遍历 workflow 数组,为每个动作创建步骤节点 +- 参数重命名: asp_vol -> asp_vols, dis_vol -> dis_vols, asp_flow_rate -> asp_flow_rates, dis_flow_rate -> dis_flow_rates +- 参数扩展: 根据 targets 的 wells 数量,将单值扩展为数组 + 例: asp_vol=100.0, targets 有 3 个 wells -> asp_vols=[100.0, 100.0, 100.0] +- 连接处理: 如果 sources/targets 已通过 set_liquid_from_plate 连接,参数值改为 [] +- 输入连接: set_liquid_from_plate (output_wells) -> transfer_liquid (sources_identifier / targets_identifier) +- 输出端口: sources_out, targets_out(用于连接下一个 transfer_liquid) + +==================== 连接关系图 ==================== + +控制流 (ready 端口串联): + create_resource_1 -> create_resource_2 -> ... -> set_liquid_1 -> set_liquid_2 -> ... -> transfer_liquid_1 -> transfer_liquid_2 -> ... + +物料流: + [create_resource] --labware--> [set_liquid_from_plate] --output_wells--> [transfer_liquid] --sources_out/targets_out--> [下一个 transfer_liquid] + (slot=1) (cell_lines) (input_plate) (sources_identifier) (sources_identifier) + (slot=4) (Liquid_1) (targets_identifier) (targets_identifier) + +==================== 端口映射 ==================== + +create_resource: + 输出: labware + +set_liquid_from_plate: + 输入: input_plate + 输出: output_plate, output_wells + +transfer_liquid: + 输入: sources -> sources_identifier, targets -> targets_identifier + 输出: sources -> sources_out, targets -> targets_out + +==================== 校验规则 ==================== + +- 检查 sources/targets 是否在 reagent 中定义 +- 检查 sources 和 targets 的 wells 数量是否匹配 +- 检查参数数组长度是否与 wells 数量一致 +- 如有问题,在 footer 中添加 [WARN: ...] 标记 +""" + import re import uuid @@ -14,13 +100,21 @@ Json = Dict[str, Any] # create_resource 节点默认参数 CREATE_RESOURCE_DEFAULTS = { "device_id": "/PRCXI", - "parent": "/PRCXI/PRCXI_Deck", + "parent_template": "/PRCXI/PRCXI_Deck/T{slot}", # {slot} 会被替换为实际的 slot 值 "class_name": "PRCXI_BioER_96_wellplate", } # 默认液体体积 (uL) DEFAULT_LIQUID_VOLUME = 1e5 +# 参数重命名映射:单数 -> 复数(用于 transfer_liquid 等动作) +PARAM_RENAME_MAPPING = { + "asp_vol": "asp_vols", + "dis_vol": "dis_vols", + "asp_flow_rate": "asp_flow_rates", + "dis_flow_rate": "dis_flow_rates", +} + # ---------------- Graph ---------------- @@ -256,73 +350,108 @@ def build_protocol_graph( action_resource_mapping: action 到 resource_name 的映射字典,可选 """ G = WorkflowGraph() - resource_last_writer = {} + resource_last_writer = {} # reagent_name -> "node_id:port" + slot_to_create_resource = {} # slot -> create_resource node_id protocol_steps = refactor_data(protocol_steps, action_resource_mapping) - # 有机化学&移液站协议图构建 - WORKSTATION_ID = workstation_name - # 为所有labware创建资源节点 - res_index = 0 + # ==================== 第一步:按 slot 去重创建 create_resource 节点 ==================== + # 收集所有唯一的 slot + slots_info = {} # slot -> {labware, res_id} for labware_id, item in labware_info.items(): + slot = str(item.get("slot", "")) + if slot and slot not in slots_info: + res_id = f"plate_slot_{slot}" + slots_info[slot] = { + "labware": item.get("labware", ""), + "res_id": res_id, + } + + # 为每个唯一的 slot 创建 create_resource 节点 + res_index = 0 + last_create_resource_id = None + for slot, info in slots_info.items(): node_id = str(uuid.uuid4()) - - # res_id 不能有空格,替换为下划线 - res_id = str(labware_id).replace(" ", "_") - - # 从 reagent 数据中获取 well 信息 - wells = item.get("well", []) - slot = str(item.get("slot", "")) # slot_on_deck 是字符串 - well_count = len(wells) if wells else 1 - - # 判断节点类型 - if "Rack" in str(labware_id) or "Tip" in str(labware_id): - lab_node_type = "Labware" - description = f"Prepare Labware: {labware_id}" - liquid_input_slot = wells if wells else [-1] - liquid_type = [] - liquid_volume = [] - elif item.get("type") == "hardware" or "reactor" in str(labware_id).lower(): - if "reactor" not in str(labware_id).lower(): - continue - lab_node_type = "Sample" - description = f"Prepare Reactor: {labware_id}" - liquid_input_slot = wells if wells else [-1] - liquid_type = [] - liquid_volume = [] - else: - lab_node_type = "Reagent" - description = f"Add Reagent to Flask: {labware_id}" - # liquid_input_slot, liquid_type, liquid_volume 数量与 wells 保持一致 - liquid_input_slot = wells if wells else [-1] - liquid_type = [res_id] * well_count - liquid_volume = [DEFAULT_LIQUID_VOLUME] * well_count + res_id = info["res_id"] res_index += 1 G.add_node( node_id, template_name="create_resource", resource_name="host_node", - name=f"Res {res_index}", - description=description, - lab_node_type=lab_node_type, + name=f"Plate {res_index}", + description=f"Create plate on slot {slot}", + lab_node_type="Labware", footer="create_resource-host_node", param={ "res_id": res_id, "device_id": CREATE_RESOURCE_DEFAULTS["device_id"], "class_name": CREATE_RESOURCE_DEFAULTS["class_name"], - "parent": CREATE_RESOURCE_DEFAULTS["parent"], + "parent": CREATE_RESOURCE_DEFAULTS["parent_template"].format(slot=slot), "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, - "liquid_input_slot": liquid_input_slot, - "liquid_type": liquid_type, - "liquid_volume": liquid_volume, "slot_on_deck": slot, }, ) - # create_resource 节点输出 liquid_slots,用于连接 transfer_liquid 的 sources/targets - resource_last_writer[labware_id] = f"{node_id}:liquid_slots" + slot_to_create_resource[slot] = node_id - last_control_node_id = None + # create_resource 之间通过 ready 串联 + if last_create_resource_id is not None: + G.add_edge(last_create_resource_id, node_id, source_port="ready", target_port="ready") + last_create_resource_id = node_id + + # ==================== 第二步:为每个 reagent 创建 set_liquid_from_plate 节点 ==================== + set_liquid_index = 0 + last_set_liquid_id = last_create_resource_id # set_liquid_from_plate 连接在 create_resource 之后 + + for labware_id, item in labware_info.items(): + # 跳过 Tip/Rack 类型 + if "Rack" in str(labware_id) or "Tip" in str(labware_id): + continue + if item.get("type") == "hardware": + continue + + slot = str(item.get("slot", "")) + wells = item.get("well", []) + if not wells or not slot: + continue + + # res_id 不能有空格 + res_id = str(labware_id).replace(" ", "_") + well_count = len(wells) + + node_id = str(uuid.uuid4()) + set_liquid_index += 1 + + G.add_node( + node_id, + template_name="set_liquid_from_plate", + resource_name="liquid_handler.prcxi", + name=f"SetLiquid {set_liquid_index}", + description=f"Set liquid: {labware_id}", + lab_node_type="Reagent", + footer="set_liquid_from_plate-liquid_handler.prcxi", + param={ + "plate": [], # 通过连接传递 + "well_names": wells, # 孔位名数组,如 ["A1", "A3", "A5"] + "liquid_names": [res_id] * well_count, + "volumes": [DEFAULT_LIQUID_VOLUME] * well_count, + }, + ) + + # ready 连接:上一个节点 -> set_liquid_from_plate + if last_set_liquid_id is not None: + G.add_edge(last_set_liquid_id, node_id, source_port="ready", target_port="ready") + last_set_liquid_id = node_id + + # 物料流:create_resource 的 labware -> set_liquid_from_plate 的 input_plate + create_res_node_id = slot_to_create_resource.get(slot) + if create_res_node_id: + G.add_edge(create_res_node_id, node_id, source_port="labware", target_port="input_plate") + + # set_liquid_from_plate 的输出 output_wells 用于连接 transfer_liquid + resource_last_writer[labware_id] = f"{node_id}:output_wells" + + last_control_node_id = last_set_liquid_id # 端口名称映射:JSON 字段名 -> 实际 handle key INPUT_PORT_MAPPING = { @@ -348,18 +477,20 @@ def build_protocol_graph( "compound": "compound", } + # 需要根据 wells 数量扩展的参数列表(复数形式) + EXPAND_BY_WELLS_PARAMS = ["asp_vols", "dis_vols", "asp_flow_rates", "dis_flow_rates"] + # 处理协议步骤 for step in protocol_steps: node_id = str(uuid.uuid4()) - G.add_node(node_id, **step) + params = step.get("param", {}).copy() # 复制一份,避免修改原数据 + connected_params = set() # 记录被连接的参数 + warnings = [] # 收集警告信息 - # 控制流 - if last_control_node_id is not None: - G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") - last_control_node_id = node_id - - # 物料流 - params = step.get("param", {}) + # 参数重命名:单数 -> 复数 + for old_name, new_name in PARAM_RENAME_MAPPING.items(): + if old_name in params: + params[new_name] = params.pop(old_name) # 处理输入连接 for param_key, target_port in INPUT_PORT_MAPPING.items(): @@ -367,10 +498,71 @@ def build_protocol_graph( if resource_name and resource_name in resource_last_writer: source_node, source_port = resource_last_writer[resource_name].split(":") G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) + connected_params.add(param_key) + elif resource_name and resource_name not in resource_last_writer: + # 资源名在 labware_info 中不存在 + warnings.append(f"{param_key}={resource_name} 未找到") + + # 获取 targets 对应的 wells 数量,用于扩展参数 + targets_name = params.get("targets") + sources_name = params.get("sources") + targets_wells_count = 1 + sources_wells_count = 1 + + if targets_name and targets_name in labware_info: + target_wells = labware_info[targets_name].get("well", []) + targets_wells_count = len(target_wells) if target_wells else 1 + elif targets_name: + warnings.append(f"targets={targets_name} 未在 reagent 中定义") + + if sources_name and sources_name in labware_info: + source_wells = labware_info[sources_name].get("well", []) + sources_wells_count = len(source_wells) if source_wells else 1 + elif sources_name: + warnings.append(f"sources={sources_name} 未在 reagent 中定义") + + # 检查 sources 和 targets 的 wells 数量是否匹配 + if targets_wells_count != sources_wells_count and targets_name and sources_name: + warnings.append(f"wells 数量不匹配: sources={sources_wells_count}, targets={targets_wells_count}") + + # 使用 targets 的 wells 数量来扩展参数 + wells_count = targets_wells_count + + # 扩展单值参数为数组(根据 targets 的 wells 数量) + for expand_param in EXPAND_BY_WELLS_PARAMS: + if expand_param in params: + value = params[expand_param] + # 如果是单个值,扩展为数组 + if not isinstance(value, list): + params[expand_param] = [value] * wells_count + # 如果已经是数组但长度不对,记录警告 + elif len(value) != wells_count: + warnings.append(f"{expand_param} 数量({len(value)})与 wells({wells_count})不匹配") + + # 如果 sources/targets 已通过连接传递,将参数值改为空数组 + for param_key in connected_params: + if param_key in params: + params[param_key] = [] + + # 更新 step 的 param 和 footer + step_copy = step.copy() + step_copy["param"] = params + + # 如果有警告,修改 footer 添加警告标记(警告放前面) + if warnings: + original_footer = step.get("footer", "") + step_copy["footer"] = f"[WARN: {'; '.join(warnings)}] {original_footer}" + + G.add_node(node_id, **step_copy) + + # 控制流 + if last_control_node_id is not None: + G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") + last_control_node_id = node_id # 处理输出:更新 resource_last_writer for param_key, output_port in OUTPUT_PORT_MAPPING.items(): - resource_name = params.get(param_key) + resource_name = step.get("param", {}).get(param_key) # 使用原始参数值 if resource_name: resource_last_writer[resource_name] = f"{node_id}:{output_port}"