Compare commits

...

6 Commits

Author SHA1 Message Date
Xuwznln
9e214c56c1 Update runze_multiple_backbone 2025-09-14 01:04:50 +08:00
Xuwznln
bdf27a7e82 Correct runze multiple backbone 2025-09-14 00:40:29 +08:00
Xuwznln
2493fb9f94 Update runze pump format 2025-09-14 00:22:39 +08:00
Xuwznln
c7a0ff67a9 support multiple backbone
(cherry picked from commit 4771ff2347)
2025-09-14 00:21:54 +08:00
Xuwznln
711a7c65fa remove runze multiple software obtainer
(cherry picked from commit 8bcc92a394)
2025-09-14 00:21:53 +08:00
Xuwznln
cde7956896 runze multiple pump support
(cherry picked from commit 49354fcf39)
2025-09-14 00:21:52 +08:00
2 changed files with 463 additions and 81 deletions

View File

@@ -1,10 +1,9 @@
import asyncio
from threading import Lock, Event from threading import Lock, Event
from enum import Enum
from dataclasses import dataclass
import time import time
import traceback from dataclasses import dataclass
from typing import Any, Union, Optional, overload from enum import Enum
from threading import Lock, Event
from typing import Union, Optional
import serial.tools.list_ports import serial.tools.list_ports
from serial import Serial from serial import Serial
@@ -18,47 +17,47 @@ class RunzeSyringePumpMode(Enum):
pulse_freq_grades = { pulse_freq_grades = {
6000: "0" , 6000: "0",
5600: "1" , 5600: "1",
5000: "2" , 5000: "2",
4400: "3" , 4400: "3",
3800: "4" , 3800: "4",
3200: "5" , 3200: "5",
2600: "6" , 2600: "6",
2200: "7" , 2200: "7",
2000: "8" , 2000: "8",
1800: "9" , 1800: "9",
1600: "10", 1600: "10",
1400: "11", 1400: "11",
1200: "12", 1200: "12",
1000: "13", 1000: "13",
800 : "14", 800: "14",
600 : "15", 600: "15",
400 : "16", 400: "16",
200 : "17", 200: "17",
190 : "18", 190: "18",
180 : "19", 180: "19",
170 : "20", 170: "20",
160 : "21", 160: "21",
150 : "22", 150: "22",
140 : "23", 140: "23",
130 : "24", 130: "24",
120 : "25", 120: "25",
110 : "26", 110: "26",
100 : "27", 100: "27",
90 : "28", 90: "28",
80 : "29", 80: "29",
70 : "30", 70: "30",
60 : "31", 60: "31",
50 : "32", 50: "32",
40 : "33", 40: "33",
30 : "34", 30: "34",
20 : "35", 20: "35",
18 : "36", 18: "36",
16 : "37", 16: "37",
14 : "38", 14: "38",
12 : "39", 12: "39",
10 : "40", 10: "40",
} }
@@ -70,7 +69,7 @@ class RunzeSyringePumpConnectionError(Exception):
class RunzeSyringePumpInfo: class RunzeSyringePumpInfo:
port: str port: str
address: str = "1" address: str = "1"
max_volume: float = 25.0 max_volume: float = 25.0
mode: RunzeSyringePumpMode = RunzeSyringePumpMode.Normal mode: RunzeSyringePumpMode = RunzeSyringePumpMode.Normal
@@ -82,16 +81,16 @@ class RunzeSyringePump:
def __init__(self, port: str, address: str = "1", max_volume: float = 25.0, mode: RunzeSyringePumpMode = None): def __init__(self, port: str, address: str = "1", max_volume: float = 25.0, mode: RunzeSyringePumpMode = None):
self.port = port self.port = port
self.address = address self.address = address
self.max_volume = max_volume self.max_volume = max_volume
self.total_steps = self.total_steps_vel = 6000 self.total_steps = self.total_steps_vel = 6000
self._status = "Idle" self._status = "Idle"
self._mode = mode self._mode = mode
self._max_velocity = 0 self._max_velocity = 0
self._valve_position = "I" self._valve_position = "I"
self._position = 0 self._position = 0
try: try:
# if port in serial_ports and serial_ports[port].is_open: # if port in serial_ports and serial_ports[port].is_open:
# self.hardware_interface = serial_ports[port] # self.hardware_interface = serial_ports[port]
@@ -100,11 +99,8 @@ class RunzeSyringePump:
# baudrate=9600, # baudrate=9600,
# port=port # port=port
# ) # )
self.hardware_interface = Serial( self.hardware_interface = Serial(baudrate=9600, port=port)
baudrate=9600,
port=port
)
except (OSError, SerialException) as e: except (OSError, SerialException) as e:
# raise RunzeSyringePumpConnectionError from e # raise RunzeSyringePumpConnectionError from e
self.hardware_interface = port self.hardware_interface = port
@@ -114,13 +110,13 @@ class RunzeSyringePump:
self._error_event = Event() self._error_event = Event()
self._query_lock = Lock() self._query_lock = Lock()
self._run_lock = Lock() self._run_lock = Lock()
def _adjust_total_steps(self): def _adjust_total_steps(self):
self.total_steps = 6000 if self.mode == RunzeSyringePumpMode.Normal else 48000 self.total_steps = 6000 if self.mode == RunzeSyringePumpMode.Normal else 48000
self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000 self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000
def send_command(self, full_command: str): def send_command(self, full_command: str):
full_command_data = bytearray(full_command, 'ascii') full_command_data = bytearray(full_command, "ascii")
response = self.hardware_interface.write(full_command_data) response = self.hardware_interface.write(full_command_data)
time.sleep(0.05) time.sleep(0.05)
output = self._receive(self.hardware_interface.read_until(b"\n")) output = self._receive(self.hardware_interface.read_until(b"\n"))
@@ -131,9 +127,9 @@ class RunzeSyringePump:
if self._closing: if self._closing:
raise RunzeSyringePumpConnectionError raise RunzeSyringePumpConnectionError
run = 'R' if not "?" in command else '' run = "R" if "?" not in command else ""
full_command = f"/{self.address}{command}{run}\r\n" full_command = f"/{self.address}{command}{run}\r\n"
output = self.send_command(full_command)[3:-3] output = self.send_command(full_command)[3:-3]
return output return output
@@ -161,7 +157,7 @@ class RunzeSyringePump:
time.sleep(0.5) # Wait for 0.5 seconds before polling again time.sleep(0.5) # Wait for 0.5 seconds before polling again
status = self.get_status() status = self.get_status()
if status == 'Idle': if status == "Idle":
break break
finally: finally:
pass pass
@@ -177,7 +173,7 @@ class RunzeSyringePump:
# # self.set_mode(self.mode) # # self.set_mode(self.mode)
# self.mode = self.get_mode() # self.mode = self.get_mode()
return response return response
# Settings # Settings
def set_baudrate(self, baudrate): def set_baudrate(self, baudrate):
@@ -187,32 +183,32 @@ class RunzeSyringePump:
return self._run("U47") return self._run("U47")
else: else:
raise ValueError("Unsupported baudrate") raise ValueError("Unsupported baudrate")
# Device Status # Device Status
@property @property
def status(self) -> str: def status(self) -> str:
return self._status return self._status
def _standardize_status(self, status_raw): def _standardize_status(self, status_raw):
return "Idle" if status_raw == "`" else "Busy" return "Idle" if status_raw == "`" else "Busy"
def get_status(self): def get_status(self):
status_raw = self._query("Q") status_raw = self._query("Q")
self._status = self._standardize_status(status_raw) self._status = self._standardize_status(status_raw)
return self._status return self._status
# Mode Settings and Queries # Mode Settings and Queries
@property @property
def mode(self) -> int: def mode(self) -> int:
return self._mode return self._mode
# def set_mode(self, mode: RunzeSyringePumpMode): # def set_mode(self, mode: RunzeSyringePumpMode):
# self.mode = mode # self.mode = mode
# self._adjust_total_steps() # self._adjust_total_steps()
# command = f"N{mode.value}" # command = f"N{mode.value}"
# return self._run(command) # return self._run(command)
# def get_mode(self): # def get_mode(self):
# response = self._query("?28") # response = self._query("?28")
# status_raw, mode = response[0], int(response[1]) # status_raw, mode = response[0], int(response[1])
@@ -221,11 +217,11 @@ class RunzeSyringePump:
# return self.mode # return self.mode
# Speed Settings and Queries # Speed Settings and Queries
@property @property
def max_velocity(self) -> float: def max_velocity(self) -> float:
return self._max_velocity return self._max_velocity
def set_max_velocity(self, velocity: float): def set_max_velocity(self, velocity: float):
self._max_velocity = velocity self._max_velocity = velocity
pulse_freq = int(velocity / self.max_volume * self.total_steps_vel) pulse_freq = int(velocity / self.max_volume * self.total_steps_vel)
@@ -238,10 +234,10 @@ class RunzeSyringePump:
self._status = self._standardize_status(status_raw) self._status = self._standardize_status(status_raw)
self._max_velocity = pulse_freq / self.total_steps_vel * self.max_volume self._max_velocity = pulse_freq / self.total_steps_vel * self.max_volume
return self._max_velocity return self._max_velocity
def set_velocity_grade(self, velocity: Union[int, str]): def set_velocity_grade(self, velocity: Union[int, str]):
return self._run(f"S{velocity}") return self._run(f"S{velocity}")
def get_velocity_grade(self): def get_velocity_grade(self):
response = self._query("?2") response = self._query("?2")
status_raw, pulse_freq = response[0], int(response[1:]) status_raw, pulse_freq = response[0], int(response[1:])
@@ -265,21 +261,21 @@ class RunzeSyringePump:
self._status = self._standardize_status(status_raw) self._status = self._standardize_status(status_raw)
velocity = pulse_freq / self.total_steps_vel * self.max_volume velocity = pulse_freq / self.total_steps_vel * self.max_volume
return pulse_freq, velocity return pulse_freq, velocity
# Operations # Operations
# Valve Setpoint and Queries # Valve Setpoint and Queries
@property @property
def valve_position(self) -> str: def valve_position(self) -> str:
return self._valve_position return self._valve_position
def set_valve_position(self, position: Union[int, str, float]): def set_valve_position(self, position: Union[int, str, float]):
if type(position) == float: if isinstance(position, float):
position = round(position / 120) position = round(position / 120)
command = f"I{position}" if type(position) == int or ord(position) <= 57 else position.upper() command = f"I{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper()
response = self._run(command) response = self._run(command)
self._valve_position = f"{position}" if type(position) == int or ord(position) <= 57 else position.upper() self._valve_position = f"{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper()
return response return response
def get_valve_position(self) -> str: def get_valve_position(self) -> str:
@@ -288,9 +284,9 @@ class RunzeSyringePump:
self._valve_position = pos_valve self._valve_position = pos_valve
self._status = self._standardize_status(status_raw) self._status = self._standardize_status(status_raw)
return pos_valve return pos_valve
# Plunger Setpoint and Queries # Plunger Setpoint and Queries
@property @property
def position(self) -> float: def position(self) -> float:
return self._position return self._position
@@ -321,7 +317,7 @@ class RunzeSyringePump:
velocity_cmd = "" velocity_cmd = ""
pos_step = int(position / self.max_volume * self.total_steps) pos_step = int(position / self.max_volume * self.total_steps)
return self._run(f"{velocity_cmd}A{pos_step}") return self._run(f"{velocity_cmd}A{pos_step}")
def pull_plunger(self, volume: float): def pull_plunger(self, volume: float):
""" """
Pull a fixed volume (unit: ml) Pull a fixed volume (unit: ml)
@@ -334,7 +330,7 @@ class RunzeSyringePump:
""" """
pos_step = int(volume / self.max_volume * self.total_steps) pos_step = int(volume / self.max_volume * self.total_steps)
return self._run(f"P{pos_step}") return self._run(f"P{pos_step}")
def push_plunger(self, volume: float): def push_plunger(self, volume: float):
""" """
Push a fixed volume (unit: ml) Push a fixed volume (unit: ml)
@@ -355,7 +351,7 @@ class RunzeSyringePump:
def stop_operation(self): def stop_operation(self):
return self._run("T") return self._run("T")
# Queries # Queries
def query_command_buffer_status(self): def query_command_buffer_status(self):
@@ -391,4 +387,4 @@ class RunzeSyringePump:
if __name__ == "__main__": if __name__ == "__main__":
r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0) r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0)
r.initialize() r.initialize()

View File

@@ -0,0 +1,386 @@
"""
Runze Syringe Pump Controller (SY-03B-T08)
本模块用于控制润泽注射泵 SY-03B-T08 型号的多泵系统。
支持通过串口同时控制多个具有不同地址的泵。
泵每次连接前要先进行初始化。
基础用法:
# 创建控制器实例
pump_controller = RunzeMultiplePump("COM3") # 或 "/dev/ttyUSB0" (Linux)
# 初始化特定地址的泵
pump_controller.initialize("1")
# 设置阀门位置
pump_controller.set_valve_position("1", 1) # 设置到位置1
# 移动到绝对位置
pump_controller.set_position("1", 10.0) # 移动到10ml位置
# 推拉柱塞操作
pump_controller.pull_plunger("1", 5.0) # 吸取5ml
pump_controller.push_plunger("1", 5.0) # 推出5ml
# 关闭连接
pump_controller.close()
支持的泵地址: 1-8 (字符串格式,如 "1", "2", "3" 等)
默认最大容量: 25.0 ml
通信协议: RS485, 9600波特率
"""
from threading import Lock, Event
import time
from dataclasses import dataclass
from enum import Enum
from typing import Union, Optional, List, Dict
import serial.tools.list_ports
from serial import Serial
from serial.serialutil import SerialException
class RunzeSyringePumpMode(Enum):
Normal = 0
AccuratePos = 1
AccuratePosVel = 2
pulse_freq_grades = {
6000: "0",
5600: "1",
5000: "2",
4400: "3",
3800: "4",
3200: "5",
2600: "6",
2200: "7",
2000: "8",
1800: "9",
1600: "10",
1400: "11",
1200: "12",
1000: "13",
800: "14",
600: "15",
400: "16",
200: "17",
190: "18",
180: "19",
170: "20",
160: "21",
150: "22",
140: "23",
130: "24",
120: "25",
110: "26",
100: "27",
90: "28",
80: "29",
70: "30",
60: "31",
50: "32",
40: "33",
30: "34",
20: "35",
18: "36",
16: "37",
14: "38",
12: "39",
10: "40",
}
class RunzeSyringePumpConnectionError(Exception):
pass
@dataclass
class PumpConfig:
address: str
max_volume: float = 25.0
mode: RunzeSyringePumpMode = RunzeSyringePumpMode.Normal
class RunzeMultiplePump:
"""
Multi-address Runze Syringe Pump Controller
Supports controlling multiple pumps on the same serial port with different addresses.
"""
def __init__(self, port: str):
"""
Initialize multiple pump controller
Args:
port (str): Serial port path
"""
self.port = port
# Default pump parameters
self.max_volume = 25.0
self.total_steps = 6000
self.total_steps_vel = 6000
# Connection management
try:
self.hardware_interface = Serial(baudrate=9600, port=port, timeout=1.0)
print(f"✓ 成功连接到串口: {port}")
except (OSError, SerialException) as e:
print(f"✗ 串口连接失败: {e}")
raise RunzeSyringePumpConnectionError(f"无法连接到串口 {port}: {e}") from e
# Thread safety
self._query_lock = Lock()
self._run_lock = Lock()
self._closing = False
# Pump status tracking
self._pump_status: Dict[str, str] = {} # address -> status
def _adjust_total_steps(self, mode: RunzeSyringePumpMode):
total_steps = 6000 if mode == RunzeSyringePumpMode.Normal else 48000
total_steps_vel = 48000 if mode == RunzeSyringePumpMode.AccuratePosVel else 6000
return total_steps, total_steps_vel
def send_command(self, full_command: str) -> str:
"""Send command to hardware and get response"""
full_command_data = bytearray(full_command, "ascii")
self.hardware_interface.write(full_command_data)
time.sleep(0.05)
response = self.hardware_interface.read_until(b"\n")
output = self._receive(response)
return output
def _query(self, address: str, command: str) -> str:
"""
Send query command to specific pump
Args:
address (str): Pump address (e.g., "1", "2", "3")
command (str): Command to send
Returns:
str: Response from pump
"""
with self._query_lock:
if self._closing:
raise RunzeSyringePumpConnectionError("Connection is closing")
run = "R" if "?" not in command else ""
full_command = f"/{address}{command}{run}\r\n" # \r\n should direct use, not \\r\\n
output = self.send_command(full_command)[3:-3]
return output
def _receive(self, data: bytes) -> str:
if not data:
return ""
ascii_string = "".join(chr(byte) for byte in data) # *Do not use decode('ascii', errors='ignore')
return ascii_string
def _run(self, address: str, command: str) -> str:
"""
Run command and wait for completion
Args:
address (str): Pump address
command (str): Command to execute
Returns:
str: Command response
"""
with self._run_lock:
try:
print(f"[泵 {address}] 执行命令: {command}")
response = self._query(address, command)
# Wait for operation completion
while True:
time.sleep(0.5)
status = self.get_status(address)
if status == "Idle":
break
except Exception as e:
print(f"[泵 {address}] 命令执行错误: {e}")
response = ""
return response
def _standardize_status(self, status_raw: str) -> str:
"""Convert raw status to standard format"""
return "Idle" if status_raw == "`" else "Busy"
# === Core Operations ===
def initialize(self, address: str) -> str:
"""Initialize specific pump"""
print(f"[泵 {address}] 正在初始化...")
response = self._run(address, "Z")
print(f"[泵 {address}] 初始化完成")
return response
# === Status Queries ===
def get_status(self, address: str) -> str:
"""Get pump status"""
try:
status_raw = self._query(address, "Q")
status = self._standardize_status(status_raw)
self._pump_status[address] = status
return status
except Exception:
return "Error"
# === Velocity Control ===
def set_max_velocity(self, address: str, velocity: float, max_volume: float = None) -> str:
"""Set maximum velocity for pump"""
if max_volume is None:
max_volume = self.max_volume
pulse_freq = int(velocity / max_volume * self.total_steps_vel)
pulse_freq = min(6000, pulse_freq)
return self._run(address, f"V{pulse_freq}")
def get_max_velocity(self, address: str, max_volume: float = None) -> float:
"""Get maximum velocity of pump"""
if max_volume is None:
max_volume = self.max_volume
response = self._query(address, "?2")
status_raw, pulse_freq = response[0], int(response[1:])
velocity = pulse_freq / self.total_steps_vel * max_volume
return velocity
def set_velocity_grade(self, address: str, velocity: Union[int, str]) -> str:
"""Set velocity grade"""
return self._run(address, f"S{velocity}")
# === Position Control ===
def get_position(self, address: str, max_volume: float = None) -> float:
"""Get current plunger position in ml"""
if max_volume is None:
max_volume = self.max_volume
response = self._query(address, "?0")
status_raw, pos_step = response[0], int(response[1:])
position = pos_step / self.total_steps * max_volume
return position
def set_position(self, address: str, position: float, max_velocity: float = None, max_volume: float = None) -> str:
"""
Move to absolute volume position
Args:
address (str): Pump address
position (float): Target position in ml
max_velocity (float): Maximum velocity in ml/s
max_volume (float): Maximum syringe volume in ml
"""
if max_volume is None:
max_volume = self.max_volume
velocity_cmd = ""
if max_velocity is not None:
pulse_freq = int(max_velocity / max_volume * self.total_steps_vel)
pulse_freq = min(6000, pulse_freq)
velocity_cmd = f"V{pulse_freq}"
pos_step = int(position / max_volume * self.total_steps)
return self._run(address, f"{velocity_cmd}A{pos_step}")
def pull_plunger(self, address: str, volume: float, max_volume: float = None) -> str:
"""Pull plunger by specified volume"""
if max_volume is None:
max_volume = self.max_volume
pos_step = int(volume / max_volume * self.total_steps)
return self._run(address, f"P{pos_step}")
def push_plunger(self, address: str, volume: float, max_volume: float = None) -> str:
"""Push plunger by specified volume"""
if max_volume is None:
max_volume = self.max_volume
pos_step = int(volume / max_volume * self.total_steps)
return self._run(address, f"D{pos_step}")
# === Valve Control ===
def set_valve_position(self, address: str, position: Union[int, str, float]) -> str:
"""Set valve position"""
if isinstance(position, float):
position = round(position / 120)
command = f"I{position}" if isinstance(position, int) or ord(str(position)) <= 57 else str(position).upper()
return self._run(address, command)
def get_valve_position(self, address: str) -> str:
"""Get current valve position"""
response = self._query(address, "?6")
status_raw, pos_valve = response[0], response[1].upper()
return pos_valve
# === Utility Functions ===
def stop_operation(self, address: str) -> str:
"""Stop current operation"""
return self._run(address, "T")
def close(self):
"""Close connection"""
if self._closing:
raise RunzeSyringePumpConnectionError("Already closing")
self._closing = True
self.hardware_interface.close()
print("✓ 串口连接已关闭")
if __name__ == "__main__":
"""
示例初始化3个泵地址1、2、3然后断开连接
"""
try:
# 请根据实际串口修改端口号
# Windows: "COM3", "COM4", 等
# Linux/Mac: "/dev/ttyUSB0", "/dev/ttyACM0", 等
port = "/dev/cn." # 修改为实际使用的串口
print("正在创建泵控制器...")
pump_controller = RunzeMultiplePump(port)
# 初始化3个泵 (地址: 1, 2, 3)
pump_addresses = ["1", "2", "3"]
for address in pump_addresses:
try:
print(f"\n正在初始化泵 {address}...")
pump_controller.initialize(address)
# 检查泵状态
status = pump_controller.get_status(address)
print(f"{address} 状态: {status}")
except Exception as e:
print(f"{address} 初始化失败: {e}")
print("\n所有泵初始化完成!")
# 断开连接
print("\n正在断开连接...")
pump_controller.close()
print("程序结束")
except RunzeSyringePumpConnectionError as e:
print(f"连接错误: {e}")
print("请检查:")
print("1. 串口是否正确")
print("2. 设备是否已连接")
print("3. 串口是否被其他程序占用")
except Exception as e:
print(f"未知错误: {e}")