Compare commits

...

6 Commits

Author SHA1 Message Date
Xuwznln
9e214c56c1 Update runze_multiple_backbone 2025-09-14 01:04:50 +08:00
Xuwznln
bdf27a7e82 Correct runze multiple backbone 2025-09-14 00:40:29 +08:00
Xuwznln
2493fb9f94 Update runze pump format 2025-09-14 00:22:39 +08:00
Xuwznln
c7a0ff67a9 support multiple backbone
(cherry picked from commit 4771ff2347)
2025-09-14 00:21:54 +08:00
Xuwznln
711a7c65fa remove runze multiple software obtainer
(cherry picked from commit 8bcc92a394)
2025-09-14 00:21:53 +08:00
Xuwznln
cde7956896 runze multiple pump support
(cherry picked from commit 49354fcf39)
2025-09-14 00:21:52 +08:00
2 changed files with 463 additions and 81 deletions

View File

@@ -1,10 +1,9 @@
import asyncio
from threading import Lock, Event
from enum import Enum
from dataclasses import dataclass
import time
import traceback
from typing import Any, Union, Optional, overload
from dataclasses import dataclass
from enum import Enum
from threading import Lock, Event
from typing import Union, Optional
import serial.tools.list_ports
from serial import Serial
@@ -18,47 +17,47 @@ class RunzeSyringePumpMode(Enum):
pulse_freq_grades = {
6000: "0" ,
5600: "1" ,
5000: "2" ,
4400: "3" ,
3800: "4" ,
3200: "5" ,
2600: "6" ,
2200: "7" ,
2000: "8" ,
1800: "9" ,
6000: "0",
5600: "1",
5000: "2",
4400: "3",
3800: "4",
3200: "5",
2600: "6",
2200: "7",
2000: "8",
1800: "9",
1600: "10",
1400: "11",
1200: "12",
1000: "13",
800 : "14",
600 : "15",
400 : "16",
200 : "17",
190 : "18",
180 : "19",
170 : "20",
160 : "21",
150 : "22",
140 : "23",
130 : "24",
120 : "25",
110 : "26",
100 : "27",
90 : "28",
80 : "29",
70 : "30",
60 : "31",
50 : "32",
40 : "33",
30 : "34",
20 : "35",
18 : "36",
16 : "37",
14 : "38",
12 : "39",
10 : "40",
800: "14",
600: "15",
400: "16",
200: "17",
190: "18",
180: "19",
170: "20",
160: "21",
150: "22",
140: "23",
130: "24",
120: "25",
110: "26",
100: "27",
90: "28",
80: "29",
70: "30",
60: "31",
50: "32",
40: "33",
30: "34",
20: "35",
18: "36",
16: "37",
14: "38",
12: "39",
10: "40",
}
@@ -70,7 +69,7 @@ class RunzeSyringePumpConnectionError(Exception):
class RunzeSyringePumpInfo:
port: str
address: str = "1"
max_volume: float = 25.0
mode: RunzeSyringePumpMode = RunzeSyringePumpMode.Normal
@@ -82,16 +81,16 @@ class RunzeSyringePump:
def __init__(self, port: str, address: str = "1", max_volume: float = 25.0, mode: RunzeSyringePumpMode = None):
self.port = port
self.address = address
self.max_volume = max_volume
self.total_steps = self.total_steps_vel = 6000
self._status = "Idle"
self._mode = mode
self._max_velocity = 0
self._valve_position = "I"
self._position = 0
try:
# if port in serial_ports and serial_ports[port].is_open:
# self.hardware_interface = serial_ports[port]
@@ -100,11 +99,8 @@ class RunzeSyringePump:
# baudrate=9600,
# port=port
# )
self.hardware_interface = Serial(
baudrate=9600,
port=port
)
self.hardware_interface = Serial(baudrate=9600, port=port)
except (OSError, SerialException) as e:
# raise RunzeSyringePumpConnectionError from e
self.hardware_interface = port
@@ -114,13 +110,13 @@ class RunzeSyringePump:
self._error_event = Event()
self._query_lock = Lock()
self._run_lock = Lock()
def _adjust_total_steps(self):
self.total_steps = 6000 if self.mode == RunzeSyringePumpMode.Normal else 48000
self.total_steps_vel = 48000 if self.mode == RunzeSyringePumpMode.AccuratePosVel else 6000
def send_command(self, full_command: str):
full_command_data = bytearray(full_command, 'ascii')
full_command_data = bytearray(full_command, "ascii")
response = self.hardware_interface.write(full_command_data)
time.sleep(0.05)
output = self._receive(self.hardware_interface.read_until(b"\n"))
@@ -131,9 +127,9 @@ class RunzeSyringePump:
if self._closing:
raise RunzeSyringePumpConnectionError
run = 'R' if not "?" in command else ''
run = "R" if "?" not in command else ""
full_command = f"/{self.address}{command}{run}\r\n"
output = self.send_command(full_command)[3:-3]
return output
@@ -161,7 +157,7 @@ class RunzeSyringePump:
time.sleep(0.5) # Wait for 0.5 seconds before polling again
status = self.get_status()
if status == 'Idle':
if status == "Idle":
break
finally:
pass
@@ -177,7 +173,7 @@ class RunzeSyringePump:
# # self.set_mode(self.mode)
# self.mode = self.get_mode()
return response
# Settings
def set_baudrate(self, baudrate):
@@ -187,32 +183,32 @@ class RunzeSyringePump:
return self._run("U47")
else:
raise ValueError("Unsupported baudrate")
# Device Status
@property
def status(self) -> str:
return self._status
def _standardize_status(self, status_raw):
return "Idle" if status_raw == "`" else "Busy"
def get_status(self):
status_raw = self._query("Q")
self._status = self._standardize_status(status_raw)
return self._status
# Mode Settings and Queries
@property
def mode(self) -> int:
return self._mode
# def set_mode(self, mode: RunzeSyringePumpMode):
# self.mode = mode
# self._adjust_total_steps()
# command = f"N{mode.value}"
# return self._run(command)
# def get_mode(self):
# response = self._query("?28")
# status_raw, mode = response[0], int(response[1])
@@ -221,11 +217,11 @@ class RunzeSyringePump:
# return self.mode
# Speed Settings and Queries
@property
def max_velocity(self) -> float:
return self._max_velocity
def set_max_velocity(self, velocity: float):
self._max_velocity = velocity
pulse_freq = int(velocity / self.max_volume * self.total_steps_vel)
@@ -238,10 +234,10 @@ class RunzeSyringePump:
self._status = self._standardize_status(status_raw)
self._max_velocity = pulse_freq / self.total_steps_vel * self.max_volume
return self._max_velocity
def set_velocity_grade(self, velocity: Union[int, str]):
return self._run(f"S{velocity}")
def get_velocity_grade(self):
response = self._query("?2")
status_raw, pulse_freq = response[0], int(response[1:])
@@ -265,21 +261,21 @@ class RunzeSyringePump:
self._status = self._standardize_status(status_raw)
velocity = pulse_freq / self.total_steps_vel * self.max_volume
return pulse_freq, velocity
# Operations
# Valve Setpoint and Queries
@property
def valve_position(self) -> str:
return self._valve_position
def set_valve_position(self, position: Union[int, str, float]):
if type(position) == float:
if isinstance(position, float):
position = round(position / 120)
command = f"I{position}" if type(position) == int or ord(position) <= 57 else position.upper()
command = f"I{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper()
response = self._run(command)
self._valve_position = f"{position}" if type(position) == int or ord(position) <= 57 else position.upper()
self._valve_position = f"{position}" if isinstance(position, int) or ord(position) <= 57 else position.upper()
return response
def get_valve_position(self) -> str:
@@ -288,9 +284,9 @@ class RunzeSyringePump:
self._valve_position = pos_valve
self._status = self._standardize_status(status_raw)
return pos_valve
# Plunger Setpoint and Queries
@property
def position(self) -> float:
return self._position
@@ -321,7 +317,7 @@ class RunzeSyringePump:
velocity_cmd = ""
pos_step = int(position / self.max_volume * self.total_steps)
return self._run(f"{velocity_cmd}A{pos_step}")
def pull_plunger(self, volume: float):
"""
Pull a fixed volume (unit: ml)
@@ -334,7 +330,7 @@ class RunzeSyringePump:
"""
pos_step = int(volume / self.max_volume * self.total_steps)
return self._run(f"P{pos_step}")
def push_plunger(self, volume: float):
"""
Push a fixed volume (unit: ml)
@@ -355,7 +351,7 @@ class RunzeSyringePump:
def stop_operation(self):
return self._run("T")
# Queries
def query_command_buffer_status(self):
@@ -391,4 +387,4 @@ class RunzeSyringePump:
if __name__ == "__main__":
r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0)
r.initialize()
r.initialize()

View File

@@ -0,0 +1,386 @@
"""
Runze Syringe Pump Controller (SY-03B-T08)
本模块用于控制润泽注射泵 SY-03B-T08 型号的多泵系统。
支持通过串口同时控制多个具有不同地址的泵。
泵每次连接前要先进行初始化。
基础用法:
# 创建控制器实例
pump_controller = RunzeMultiplePump("COM3") # 或 "/dev/ttyUSB0" (Linux)
# 初始化特定地址的泵
pump_controller.initialize("1")
# 设置阀门位置
pump_controller.set_valve_position("1", 1) # 设置到位置1
# 移动到绝对位置
pump_controller.set_position("1", 10.0) # 移动到10ml位置
# 推拉柱塞操作
pump_controller.pull_plunger("1", 5.0) # 吸取5ml
pump_controller.push_plunger("1", 5.0) # 推出5ml
# 关闭连接
pump_controller.close()
支持的泵地址: 1-8 (字符串格式,如 "1", "2", "3" 等)
默认最大容量: 25.0 ml
通信协议: RS485, 9600波特率
"""
from threading import Lock, Event
import time
from dataclasses import dataclass
from enum import Enum
from typing import Union, Optional, List, Dict
import serial.tools.list_ports
from serial import Serial
from serial.serialutil import SerialException
class RunzeSyringePumpMode(Enum):
Normal = 0
AccuratePos = 1
AccuratePosVel = 2
pulse_freq_grades = {
6000: "0",
5600: "1",
5000: "2",
4400: "3",
3800: "4",
3200: "5",
2600: "6",
2200: "7",
2000: "8",
1800: "9",
1600: "10",
1400: "11",
1200: "12",
1000: "13",
800: "14",
600: "15",
400: "16",
200: "17",
190: "18",
180: "19",
170: "20",
160: "21",
150: "22",
140: "23",
130: "24",
120: "25",
110: "26",
100: "27",
90: "28",
80: "29",
70: "30",
60: "31",
50: "32",
40: "33",
30: "34",
20: "35",
18: "36",
16: "37",
14: "38",
12: "39",
10: "40",
}
class RunzeSyringePumpConnectionError(Exception):
pass
@dataclass
class PumpConfig:
address: str
max_volume: float = 25.0
mode: RunzeSyringePumpMode = RunzeSyringePumpMode.Normal
class RunzeMultiplePump:
"""
Multi-address Runze Syringe Pump Controller
Supports controlling multiple pumps on the same serial port with different addresses.
"""
def __init__(self, port: str):
"""
Initialize multiple pump controller
Args:
port (str): Serial port path
"""
self.port = port
# Default pump parameters
self.max_volume = 25.0
self.total_steps = 6000
self.total_steps_vel = 6000
# Connection management
try:
self.hardware_interface = Serial(baudrate=9600, port=port, timeout=1.0)
print(f"✓ 成功连接到串口: {port}")
except (OSError, SerialException) as e:
print(f"✗ 串口连接失败: {e}")
raise RunzeSyringePumpConnectionError(f"无法连接到串口 {port}: {e}") from e
# Thread safety
self._query_lock = Lock()
self._run_lock = Lock()
self._closing = False
# Pump status tracking
self._pump_status: Dict[str, str] = {} # address -> status
def _adjust_total_steps(self, mode: RunzeSyringePumpMode):
total_steps = 6000 if mode == RunzeSyringePumpMode.Normal else 48000
total_steps_vel = 48000 if mode == RunzeSyringePumpMode.AccuratePosVel else 6000
return total_steps, total_steps_vel
def send_command(self, full_command: str) -> str:
"""Send command to hardware and get response"""
full_command_data = bytearray(full_command, "ascii")
self.hardware_interface.write(full_command_data)
time.sleep(0.05)
response = self.hardware_interface.read_until(b"\n")
output = self._receive(response)
return output
def _query(self, address: str, command: str) -> str:
"""
Send query command to specific pump
Args:
address (str): Pump address (e.g., "1", "2", "3")
command (str): Command to send
Returns:
str: Response from pump
"""
with self._query_lock:
if self._closing:
raise RunzeSyringePumpConnectionError("Connection is closing")
run = "R" if "?" not in command else ""
full_command = f"/{address}{command}{run}\r\n" # \r\n should direct use, not \\r\\n
output = self.send_command(full_command)[3:-3]
return output
def _receive(self, data: bytes) -> str:
if not data:
return ""
ascii_string = "".join(chr(byte) for byte in data) # *Do not use decode('ascii', errors='ignore')
return ascii_string
def _run(self, address: str, command: str) -> str:
"""
Run command and wait for completion
Args:
address (str): Pump address
command (str): Command to execute
Returns:
str: Command response
"""
with self._run_lock:
try:
print(f"[泵 {address}] 执行命令: {command}")
response = self._query(address, command)
# Wait for operation completion
while True:
time.sleep(0.5)
status = self.get_status(address)
if status == "Idle":
break
except Exception as e:
print(f"[泵 {address}] 命令执行错误: {e}")
response = ""
return response
def _standardize_status(self, status_raw: str) -> str:
"""Convert raw status to standard format"""
return "Idle" if status_raw == "`" else "Busy"
# === Core Operations ===
def initialize(self, address: str) -> str:
"""Initialize specific pump"""
print(f"[泵 {address}] 正在初始化...")
response = self._run(address, "Z")
print(f"[泵 {address}] 初始化完成")
return response
# === Status Queries ===
def get_status(self, address: str) -> str:
"""Get pump status"""
try:
status_raw = self._query(address, "Q")
status = self._standardize_status(status_raw)
self._pump_status[address] = status
return status
except Exception:
return "Error"
# === Velocity Control ===
def set_max_velocity(self, address: str, velocity: float, max_volume: float = None) -> str:
"""Set maximum velocity for pump"""
if max_volume is None:
max_volume = self.max_volume
pulse_freq = int(velocity / max_volume * self.total_steps_vel)
pulse_freq = min(6000, pulse_freq)
return self._run(address, f"V{pulse_freq}")
def get_max_velocity(self, address: str, max_volume: float = None) -> float:
"""Get maximum velocity of pump"""
if max_volume is None:
max_volume = self.max_volume
response = self._query(address, "?2")
status_raw, pulse_freq = response[0], int(response[1:])
velocity = pulse_freq / self.total_steps_vel * max_volume
return velocity
def set_velocity_grade(self, address: str, velocity: Union[int, str]) -> str:
"""Set velocity grade"""
return self._run(address, f"S{velocity}")
# === Position Control ===
def get_position(self, address: str, max_volume: float = None) -> float:
"""Get current plunger position in ml"""
if max_volume is None:
max_volume = self.max_volume
response = self._query(address, "?0")
status_raw, pos_step = response[0], int(response[1:])
position = pos_step / self.total_steps * max_volume
return position
def set_position(self, address: str, position: float, max_velocity: float = None, max_volume: float = None) -> str:
"""
Move to absolute volume position
Args:
address (str): Pump address
position (float): Target position in ml
max_velocity (float): Maximum velocity in ml/s
max_volume (float): Maximum syringe volume in ml
"""
if max_volume is None:
max_volume = self.max_volume
velocity_cmd = ""
if max_velocity is not None:
pulse_freq = int(max_velocity / max_volume * self.total_steps_vel)
pulse_freq = min(6000, pulse_freq)
velocity_cmd = f"V{pulse_freq}"
pos_step = int(position / max_volume * self.total_steps)
return self._run(address, f"{velocity_cmd}A{pos_step}")
def pull_plunger(self, address: str, volume: float, max_volume: float = None) -> str:
"""Pull plunger by specified volume"""
if max_volume is None:
max_volume = self.max_volume
pos_step = int(volume / max_volume * self.total_steps)
return self._run(address, f"P{pos_step}")
def push_plunger(self, address: str, volume: float, max_volume: float = None) -> str:
"""Push plunger by specified volume"""
if max_volume is None:
max_volume = self.max_volume
pos_step = int(volume / max_volume * self.total_steps)
return self._run(address, f"D{pos_step}")
# === Valve Control ===
def set_valve_position(self, address: str, position: Union[int, str, float]) -> str:
"""Set valve position"""
if isinstance(position, float):
position = round(position / 120)
command = f"I{position}" if isinstance(position, int) or ord(str(position)) <= 57 else str(position).upper()
return self._run(address, command)
def get_valve_position(self, address: str) -> str:
"""Get current valve position"""
response = self._query(address, "?6")
status_raw, pos_valve = response[0], response[1].upper()
return pos_valve
# === Utility Functions ===
def stop_operation(self, address: str) -> str:
"""Stop current operation"""
return self._run(address, "T")
def close(self):
"""Close connection"""
if self._closing:
raise RunzeSyringePumpConnectionError("Already closing")
self._closing = True
self.hardware_interface.close()
print("✓ 串口连接已关闭")
if __name__ == "__main__":
"""
示例初始化3个泵地址1、2、3然后断开连接
"""
try:
# 请根据实际串口修改端口号
# Windows: "COM3", "COM4", 等
# Linux/Mac: "/dev/ttyUSB0", "/dev/ttyACM0", 等
port = "/dev/cn." # 修改为实际使用的串口
print("正在创建泵控制器...")
pump_controller = RunzeMultiplePump(port)
# 初始化3个泵 (地址: 1, 2, 3)
pump_addresses = ["1", "2", "3"]
for address in pump_addresses:
try:
print(f"\n正在初始化泵 {address}...")
pump_controller.initialize(address)
# 检查泵状态
status = pump_controller.get_status(address)
print(f"{address} 状态: {status}")
except Exception as e:
print(f"{address} 初始化失败: {e}")
print("\n所有泵初始化完成!")
# 断开连接
print("\n正在断开连接...")
pump_controller.close()
print("程序结束")
except RunzeSyringePumpConnectionError as e:
print(f"连接错误: {e}")
print("请检查:")
print("1. 串口是否正确")
print("2. 设备是否已连接")
print("3. 串口是否被其他程序占用")
except Exception as e:
print(f"未知错误: {e}")