From 2493fb9f948dffbd3694fba15bddc51c7a0d9f52 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sun, 14 Sep 2025 00:22:39 +0800 Subject: [PATCH] Update runze pump format --- .../devices/pump_and_valve/runze_backbone.py | 158 +++++++++--------- .../pump_and_valve/runze_multiple_backbone.py | 3 + 2 files changed, 80 insertions(+), 81 deletions(-) diff --git a/unilabos/devices/pump_and_valve/runze_backbone.py b/unilabos/devices/pump_and_valve/runze_backbone.py index 36cb235a..ce6d834c 100644 --- a/unilabos/devices/pump_and_valve/runze_backbone.py +++ b/unilabos/devices/pump_and_valve/runze_backbone.py @@ -1,10 +1,9 @@ -import asyncio from threading import Lock, Event -from enum import Enum -from dataclasses import dataclass import time -import traceback -from typing import Any, Union, Optional, overload +from dataclasses import dataclass +from enum import Enum +from threading import Lock, Event +from typing import Union, Optional import serial.tools.list_ports from serial import Serial @@ -18,47 +17,47 @@ class RunzeSyringePumpMode(Enum): pulse_freq_grades = { - 6000: "0" , - 5600: "1" , - 5000: "2" , - 4400: "3" , - 3800: "4" , - 3200: "5" , - 2600: "6" , - 2200: "7" , - 2000: "8" , - 1800: "9" , + 6000: "0", + 5600: "1", + 5000: "2", + 4400: "3", + 3800: "4", + 3200: "5", + 2600: "6", + 2200: "7", + 2000: "8", + 1800: "9", 1600: "10", 1400: "11", 1200: "12", 1000: "13", - 800 : "14", - 600 : "15", - 400 : "16", - 200 : "17", - 190 : "18", - 180 : "19", - 170 : "20", - 160 : "21", - 150 : "22", - 140 : "23", - 130 : "24", - 120 : "25", - 110 : "26", - 100 : "27", - 90 : "28", - 80 : "29", - 70 : "30", - 60 : "31", - 50 : "32", - 40 : "33", - 30 : "34", - 20 : "35", - 18 : "36", - 16 : "37", - 14 : "38", - 12 : "39", - 10 : "40", + 800: "14", + 600: "15", + 400: "16", + 200: "17", + 190: "18", + 180: "19", + 170: "20", + 160: "21", + 150: "22", + 140: "23", + 130: "24", + 120: "25", + 110: "26", + 100: "27", + 90: "28", + 80: "29", + 70: "30", + 60: "31", + 50: "32", + 40: "33", + 30: "34", + 20: "35", + 18: "36", + 16: "37", + 14: "38", + 12: "39", + 10: "40", } @@ -70,7 +69,7 @@ class RunzeSyringePumpConnectionError(Exception): class RunzeSyringePumpInfo: port: str address: str = "1" - + max_volume: float = 25.0 mode: RunzeSyringePumpMode = RunzeSyringePumpMode.Normal @@ -82,16 +81,16 @@ class RunzeSyringePump: def __init__(self, port: str, address: str = "1", max_volume: float = 25.0, mode: RunzeSyringePumpMode = None): self.port = port self.address = address - + self.max_volume = max_volume self.total_steps = self.total_steps_vel = 6000 - + self._status = "Idle" self._mode = mode self._max_velocity = 0 self._valve_position = "I" self._position = 0 - + try: # if port in serial_ports and serial_ports[port].is_open: # self.hardware_interface = serial_ports[port] @@ -100,11 +99,8 @@ class RunzeSyringePump: # baudrate=9600, # port=port # ) - self.hardware_interface = Serial( - baudrate=9600, - port=port - ) - + self.hardware_interface = Serial(baudrate=9600, port=port) + except (OSError, SerialException) as e: # raise RunzeSyringePumpConnectionError from e self.hardware_interface = port @@ -114,13 +110,13 @@ class RunzeSyringePump: self._error_event = Event() self._query_lock = Lock() self._run_lock = Lock() - + def _adjust_total_steps(self): self.total_steps = 6000 if self.mode == RunzeSyringePumpMode.Normal else 48000 self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000 - + def send_command(self, full_command: str): - full_command_data = bytearray(full_command, 'ascii') + full_command_data = bytearray(full_command, "ascii") response = self.hardware_interface.write(full_command_data) time.sleep(0.05) output = self._receive(self.hardware_interface.read_until(b"\n")) @@ -131,9 +127,9 @@ class RunzeSyringePump: if self._closing: raise RunzeSyringePumpConnectionError - run = 'R' if not "?" in command else '' + run = "R" if "?" not in command else "" full_command = f"/{self.address}{command}{run}\r\n" - + output = self.send_command(full_command)[3:-3] return output @@ -161,7 +157,7 @@ class RunzeSyringePump: time.sleep(0.5) # Wait for 0.5 seconds before polling again status = self.get_status() - if status == 'Idle': + if status == "Idle": break finally: pass @@ -177,7 +173,7 @@ class RunzeSyringePump: # # self.set_mode(self.mode) # self.mode = self.get_mode() return response - + # Settings def set_baudrate(self, baudrate): @@ -187,32 +183,32 @@ class RunzeSyringePump: return self._run("U47") else: raise ValueError("Unsupported baudrate") - + # Device Status @property def status(self) -> str: return self._status - + def _standardize_status(self, status_raw): return "Idle" if status_raw == "`" else "Busy" - + def get_status(self): status_raw = self._query("Q") self._status = self._standardize_status(status_raw) return self._status - + # Mode Settings and Queries - + @property def mode(self) -> int: return self._mode - + # def set_mode(self, mode: RunzeSyringePumpMode): # self.mode = mode # self._adjust_total_steps() # command = f"N{mode.value}" # return self._run(command) - + # def get_mode(self): # response = self._query("?28") # status_raw, mode = response[0], int(response[1]) @@ -221,11 +217,11 @@ class RunzeSyringePump: # return self.mode # Speed Settings and Queries - + @property def max_velocity(self) -> float: return self._max_velocity - + def set_max_velocity(self, velocity: float): self._max_velocity = velocity pulse_freq = int(velocity / self.max_volume * self.total_steps_vel) @@ -238,10 +234,10 @@ class RunzeSyringePump: self._status = self._standardize_status(status_raw) self._max_velocity = pulse_freq / self.total_steps_vel * self.max_volume return self._max_velocity - + def set_velocity_grade(self, velocity: Union[int, str]): return self._run(f"S{velocity}") - + def get_velocity_grade(self): response = self._query("?2") status_raw, pulse_freq = response[0], int(response[1:]) @@ -265,21 +261,21 @@ class RunzeSyringePump: self._status = self._standardize_status(status_raw) velocity = pulse_freq / self.total_steps_vel * self.max_volume return pulse_freq, velocity - + # Operations - + # Valve Setpoint and Queries @property def valve_position(self) -> str: return self._valve_position - + def set_valve_position(self, position: Union[int, str, float]): - if type(position) == float: + if isinstance(position, float): position = round(position / 120) - command = f"I{position}" if type(position) == int or ord(position) <= 57 else position.upper() + command = f"I{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper() response = self._run(command) - self._valve_position = f"{position}" if type(position) == int or ord(position) <= 57 else position.upper() + self._valve_position = f"{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper() return response def get_valve_position(self) -> str: @@ -288,9 +284,9 @@ class RunzeSyringePump: self._valve_position = pos_valve self._status = self._standardize_status(status_raw) return pos_valve - + # Plunger Setpoint and Queries - + @property def position(self) -> float: return self._position @@ -321,7 +317,7 @@ class RunzeSyringePump: velocity_cmd = "" pos_step = int(position / self.max_volume * self.total_steps) return self._run(f"{velocity_cmd}A{pos_step}") - + def pull_plunger(self, volume: float): """ Pull a fixed volume (unit: ml) @@ -334,7 +330,7 @@ class RunzeSyringePump: """ pos_step = int(volume / self.max_volume * self.total_steps) return self._run(f"P{pos_step}") - + def push_plunger(self, volume: float): """ Push a fixed volume (unit: ml) @@ -355,7 +351,7 @@ class RunzeSyringePump: def stop_operation(self): return self._run("T") - + # Queries def query_command_buffer_status(self): @@ -391,4 +387,4 @@ class RunzeSyringePump: if __name__ == "__main__": r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0) - r.initialize() \ No newline at end of file + r.initialize() diff --git a/unilabos/devices/pump_and_valve/runze_multiple_backbone.py b/unilabos/devices/pump_and_valve/runze_multiple_backbone.py index f829796a..36cc414c 100644 --- a/unilabos/devices/pump_and_valve/runze_multiple_backbone.py +++ b/unilabos/devices/pump_and_valve/runze_multiple_backbone.py @@ -176,6 +176,9 @@ class RunzeMultiplePump: return output def _receive(self, data: bytes) -> str: + """ + Do not change this method. + """ if not data: return "" ascii_string = "".join(chr(byte) for byte in data)