From 896f287d9286cec38bfcc1b0d4fa9d390b7496cd Mon Sep 17 00:00:00 2001 From: zhangshixiang <@zhangshixiang> Date: Wed, 10 Dec 2025 15:10:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9laiyu=E7=A7=BB=E6=B6=B2=E7=AB=99?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E9=83=A8=E5=88=86=E4=BF=AE=E6=94=B9=EF=BC=8C?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E5=A4=9A=E6=AC=A1=E5=88=9D=E5=A7=8B=E5=8C=96?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../laiyu/backend/laiyu_v_backend.py | 8 ++-- .../laiyu/controllers/pipette_controller.py | 10 ++++- .../liquid_handler_abstract.py | 39 ++++++++++++++++++- 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py b/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py index d5636b2..9e824e1 100644 --- a/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py +++ b/unilabos/devices/liquid_handling/laiyu/backend/laiyu_v_backend.py @@ -153,7 +153,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend): if self.hardware_interface.tip_status == TipStatus.TIP_ATTACHED: print("已有枪头,无需重复拾取") return - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=100) + self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height,speed=100) # self.joint_state_publisher.send_resource_action(ops[0].resource.name, x, y, z, "pick",channels=use_channels) # goback() @@ -202,7 +202,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend): if self.hardware_interface.tip_status == TipStatus.NO_TIP: print("无枪头,无需丢弃") return - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z) + self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) self.hardware_interface.eject_tip self.hardware_interface.xyz_controller.move_to_work_coord_safe(z=self.hardware_interface.xyz_controller.machine_config.safe_z_height) @@ -267,7 +267,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend): return # 移动到吸液位置 - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z) + self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) self.pipette_aspirate(volume=ops[0].volume, flow_rate=flow_rate) @@ -340,7 +340,7 @@ class UniLiquidHandlerLaiyuBackend(LiquidHandlerBackend): # 移动到排液位置 - self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z) + self.hardware_interface.xyz_controller.move_to_work_coord_safe(x=x, y=-y, z=z,speed=200) self.pipette_dispense(volume=ops[0].volume, flow_rate=flow_rate) diff --git a/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py b/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py index e6ddd4f..17a47df 100644 --- a/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py +++ b/unilabos/devices/liquid_handling/laiyu/controllers/pipette_controller.py @@ -128,6 +128,7 @@ class PipetteController: baudrate=115200 ) self.pipette = SOPAPipette(self.config) + self.pipette_port = port self.tip_status = TipStatus.NO_TIP self.current_volume = 0.0 self.max_volume = 1000.0 # 默认1000ul @@ -154,7 +155,7 @@ class PipetteController: logger.info("移液器连接成功") # 连接XYZ步进电机控制器(如果提供了端口) - if self.xyz_port: + if self.xyz_port != self.pipette_port: try: self.xyz_controller = XYZController(self.xyz_port) if self.xyz_controller.connect(): @@ -168,7 +169,12 @@ class PipetteController: self.xyz_controller = None self.xyz_connected = False else: - logger.info("未配置XYZ步进电机端口,跳过运动控制器连接") + try: + self.xyz_controller = XYZController(self.xyz_port, auto_connect=False) + self.xyz_controller.serial_conn = self.pipette.serial_port + self.xyz_controller.is_connected = True + except Exception as e: + logger.info("未配置XYZ步进电机端口,跳过运动控制器连接") return True except Exception as e: diff --git a/unilabos/devices/liquid_handling/liquid_handler_abstract.py b/unilabos/devices/liquid_handling/liquid_handler_abstract.py index 66c3cef..41078ae 100644 --- a/unilabos/devices/liquid_handling/liquid_handler_abstract.py +++ b/unilabos/devices/liquid_handling/liquid_handler_abstract.py @@ -581,16 +581,51 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware): support_touch_tip = True _ros_node: BaseROS2DeviceNode - def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8): + def __init__(self, backend: LiquidHandlerBackend, deck: Deck, simulator: bool=False, channel_num:int = 8, total_height:float = 310): """Initialize a LiquidHandler. Args: backend: Backend to use. deck: Deck to use. """ + backend_type = None + if isinstance(backend, dict) and "type" in backend: + backend_dict = backend.copy() + type_str = backend_dict.pop("type") + try: + # Try to get class from string using globals (current module), or fallback to pylabrobot or unilabos namespaces + backend_cls = None + if type_str in globals(): + backend_cls = globals()[type_str] + else: + # Try resolving dotted notation, e.g. "xxx.yyy.ClassName" + components = type_str.split(".") + mod = None + if len(components) > 1: + module_name = ".".join(components[:-1]) + try: + import importlib + mod = importlib.import_module(module_name) + except ImportError: + mod = None + if mod is not None: + backend_cls = getattr(mod, components[-1], None) + if backend_cls is None: + # Try pylabrobot style import (if available) + try: + import pylabrobot + backend_cls = getattr(pylabrobot, type_str, None) + except Exception: + backend_cls = None + if backend_cls is not None and isinstance(backend_cls, type): + backend_type = backend_cls(**backend_dict) # pass the rest of dict as kwargs + except Exception as exc: + raise RuntimeError(f"Failed to convert backend type '{type_str}' to class: {exc}") + else: + backend_type = backend self._simulator = simulator self.group_info = dict() - super().__init__(backend, deck, simulator, channel_num) + super().__init__(backend_type, deck, simulator, channel_num) def post_init(self, ros_node: BaseROS2DeviceNode): self._ros_node = ros_node