feat: Add SyringePump (SY-03B) driver with unified serial/TCP transport for chinwe device, including registry and test configurations.

This commit is contained in:
ZiWei
2025-12-19 03:05:11 +08:00
parent 152d3a7563
commit 39cc280c91
3 changed files with 968 additions and 257 deletions

View File

@@ -1,282 +1,636 @@
import sys # -*- coding: utf-8 -*-
import threading """
Contains drivers for:
1. SyringePump: Runze Fluid SY-03B (ASCII)
2. EmmMotor: Emm V5.0 Closed-loop Stepper (Modbus-RTU variant)
3. XKCSensor: XKC Non-contact Level Sensor (Modbus-RTU)
"""
import socket
import serial import serial
import serial.tools.list_ports
import re
import time import time
from typing import Optional, List, Dict, Tuple import threading
import struct
import re
import traceback
import queue
from typing import Optional, Dict, List, Any
class ChinweDevice: try:
""" from unilabos.device_comms.universal_driver import UniversalDriver
ChinWe设备控制类 except ImportError:
提供串口通信、电机控制、传感器数据读取等功能 import logging
""" class UniversalDriver:
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
def __init__(self, port: str, baudrate: int = 115200, debug: bool = False): def execute_command_from_outer(self, command: str):
""" pass
初始化ChinWe设备
Args: # ==============================================================================
port: 串口名称如果为None则自动检测 # 1. Transport Layer (通信层)
baudrate: 波特率默认115200 # ==============================================================================
class TransportManager:
""" """
self.debug = debug 统一通信管理类。
自动识别 串口 (Serial) 或 网络 (TCP) 连接。
"""
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 3.0, logger=None):
self.port = port self.port = port
self.baudrate = baudrate self.baudrate = baudrate
self.serial_port: Optional[serial.Serial] = None self.timeout = timeout
self._voltage: float = 0.0 self.logger = logger
self._ec_value: float = 0.0 self.lock = threading.RLock() # 线程锁,确保多设备共用一个连接时不冲突
self._ec_adc_value: int = 0
self.is_tcp = False
self.serial = None
self.socket = None
# 简单判断: 如果包含 ':' (如 192.168.1.1:8899) 或者看起来像 IP则认为是 TCP
if ':' in self.port or (self.port.count('.') == 3 and not self.port.startswith('/')):
self.is_tcp = True
self._connect_tcp()
else:
self._connect_serial()
def _log(self, msg):
if self.logger:
pass
# self.logger.debug(f"[Transport] {msg}")
def _connect_tcp(self):
try:
if ':' in self.port:
host, p = self.port.split(':')
self.tcp_host = host
self.tcp_port = int(p)
else:
self.tcp_host = self.port
self.tcp_port = 8899 # 默认端口
# if self.logger: self.logger.info(f"Connecting TCP {self.tcp_host}:{self.tcp_port} ...")
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((self.tcp_host, self.tcp_port))
except Exception as e:
raise ConnectionError(f"TCP connection failed: {e}")
def _connect_serial(self):
try:
# if self.logger: self.logger.info(f"Opening Serial {self.port} (Baud: {self.baudrate}) ...")
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout
)
except Exception as e:
raise ConnectionError(f"Serial open failed: {e}")
def close(self):
"""关闭连接"""
if self.is_tcp and self.socket:
try: self.socket.close()
except: pass
elif not self.is_tcp and self.serial and self.serial.is_open:
self.serial.close()
def clear_buffer(self):
"""清空缓冲区 (Thread-safe)"""
with self.lock:
if self.is_tcp:
self.socket.setblocking(False)
try:
while True:
if not self.socket.recv(1024): break
except: pass
finally: self.socket.settimeout(self.timeout)
else:
self.serial.reset_input_buffer()
def write(self, data: bytes):
"""发送原始字节"""
with self.lock:
if self.is_tcp:
self.socket.sendall(data)
else:
self.serial.write(data)
def read(self, size: int) -> bytes:
"""读取指定长度字节"""
if self.is_tcp:
data = b''
start = time.time()
while len(data) < size:
if time.time() - start > self.timeout: break
try:
chunk = self.socket.recv(size - len(data))
if not chunk: break
data += chunk
except socket.timeout: break
return data
else:
return self.serial.read(size)
def send_ascii_command(self, command: str) -> str:
"""
发送 ASCII 字符串命令 (如注射泵指令),读取直到 '\r'
"""
with self.lock:
data = command.encode('ascii') if isinstance(command, str) else command
self.clear_buffer()
self.write(data)
# Read until \r
if self.is_tcp:
resp = b''
start = time.time()
while True:
if time.time() - start > self.timeout: break
try:
char = self.socket.recv(1)
if not char: break
resp += char
if char == b'\r': break
except: break
return resp.decode('ascii', errors='ignore').strip()
else:
return self.serial.read_until(b'\r').decode('ascii', errors='ignore').strip()
# ==============================================================================
# 2. Syringe Pump Driver (注射泵)
# ==============================================================================
class SyringePump:
"""SY-03B 注射泵驱动 (ASCII协议)"""
CMD_INITIALIZE = "Z{speed},{drain_port},{output_port}R"
CMD_SWITCH_VALVE = "I{port}R"
CMD_ASPIRATE = "P{vol}R"
CMD_DISPENSE = "D{vol}R"
CMD_DISPENSE_ALL = "A0R"
CMD_STOP = "TR"
CMD_QUERY_STATUS = "Q"
CMD_QUERY_PLUNGER = "?0"
def __init__(self, device_id: int, transport: TransportManager):
if not 1 <= device_id <= 15:
pass # Allow all IDs for now
self.id = str(device_id)
self.transport = transport
def _send(self, template: str, **kwargs) -> str:
cmd = f"/{self.id}" + template.format(**kwargs) + "\r"
return self.transport.send_ascii_command(cmd)
def is_busy(self) -> bool:
"""查询繁忙状态"""
resp = self._send(self.CMD_QUERY_STATUS)
# 响应如 /0` (Ready, 0x60) 或 /0@ (Busy, 0x40)
if len(resp) >= 3:
status_byte = ord(resp[2])
# Bit 5: 1=Ready, 0=Busy
return (status_byte & 0x20) == 0
return False
def wait_until_idle(self, timeout=30):
"""阻塞等待直到空闲"""
start = time.time()
while time.time() - start < timeout:
if not self.is_busy(): return
time.sleep(0.5)
# raise TimeoutError(f"Pump {self.id} wait idle timeout")
pass
def initialize(self, drain_port=0, output_port=0, speed=10):
"""初始化"""
self._send(self.CMD_INITIALIZE, speed=speed, drain_port=drain_port, output_port=output_port)
def switch_valve(self, port: int):
"""切换阀门 (1-8)"""
self._send(self.CMD_SWITCH_VALVE, port=port)
def aspirate(self, steps: int):
"""吸液 (相对步数)"""
self._send(self.CMD_ASPIRATE, vol=steps)
def dispense(self, steps: int):
"""排液 (相对步数)"""
self._send(self.CMD_DISPENSE, vol=steps)
def stop(self):
"""停止"""
self._send(self.CMD_STOP)
def get_position(self) -> int:
"""获取柱塞位置 (步数)"""
resp = self._send(self.CMD_QUERY_PLUNGER)
m = re.search(r'\d+', resp)
return int(m.group()) if m else -1
# ==============================================================================
# 3. Stepper Motor Driver (步进电机)
# ==============================================================================
class EmmMotor:
"""Emm V5.0 闭环步进电机驱动"""
def __init__(self, device_id: int, transport: TransportManager):
self.id = device_id
self.transport = transport
def _send(self, func_code: int, payload: list) -> bytes:
with self.transport.lock:
self.transport.clear_buffer()
# 格式: [ID] [Func] [Data...] [Check=0x6B]
body = [self.id, func_code] + payload
body.append(0x6B) # Checksum
self.transport.write(bytes(body))
# 根据指令不同,读取不同长度响应
read_len = 10 if func_code in [0x31, 0x32, 0x35, 0x24, 0x27] else 4
return self.transport.read(read_len)
def enable(self, on=True):
"""使能 (True=锁轴, False=松轴)"""
state = 1 if on else 0
self._send(0xF3, [0xAB, state, 0])
def run_speed(self, speed_rpm: int, direction=0, acc=10):
"""速度模式运行"""
sp = struct.pack('>H', int(speed_rpm))
self._send(0xF6, [direction, sp[0], sp[1], acc, 0])
def run_position(self, pulses: int, speed_rpm: int, direction=0, acc=10, absolute=False):
"""位置模式运行"""
sp = struct.pack('>H', int(speed_rpm))
pl = struct.pack('>I', int(pulses))
is_abs = 1 if absolute else 0
self._send(0xFD, [direction, sp[0], sp[1], acc, pl[0], pl[1], pl[2], pl[3], is_abs, 0])
def stop(self):
"""停止"""
self._send(0xFE, [0x98, 0])
def set_zero(self):
"""清零位置"""
self._send(0x0A, [])
def get_position(self) -> int:
"""获取当前脉冲位置"""
resp = self._send(0x32, [])
if len(resp) >= 8:
sign = resp[2]
val = struct.unpack('>I', resp[3:7])[0]
return -val if sign == 1 else val
return 0
# ==============================================================================
# 4. Liquid Sensor Driver (液位传感器)
# ==============================================================================
class XKCSensor:
"""XKC RS485 液位传感器 (Modbus RTU)"""
def __init__(self, device_id: int, transport: TransportManager, threshold: int = 300):
self.id = device_id
self.transport = transport
self.threshold = threshold
def _crc(self, data: bytes) -> bytes:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001: crc = (crc >> 1) ^ 0xA001
else: crc >>= 1
return struct.pack('<H', crc)
def read_level(self) -> Optional[Dict[str, Any]]:
"""
读取液位。
返回: {'level': bool, 'rssi': int}
"""
with self.transport.lock:
self.transport.clear_buffer()
# Modbus Read Registers: 01 03 00 01 00 02 CRC
payload = struct.pack('>HH', 0x0001, 0x0002)
msg = struct.pack('BB', self.id, 0x03) + payload
msg += self._crc(msg)
self.transport.write(msg)
# Read header
h = self.transport.read(3) # Addr, Func, Len
if len(h) < 3: return None
length = h[2]
# Read body + CRC
body = self.transport.read(length + 2)
if len(body) < length + 2:
# Firmware bug fix specific to some modules
if len(body) == 4 and length == 4:
pass
else:
return None
data = body[:-2]
if len(data) == 2:
rssi = data[1]
elif len(data) >= 4:
rssi = (data[2] << 8) | data[3]
else:
return None
return {
'level': rssi > self.threshold,
'rssi': rssi
}
# ==============================================================================
# 5. Main Device Class (ChinweDevice)
# ==============================================================================
class ChinweDevice(UniversalDriver):
"""
ChinWe 工作站主驱动
继承自 UniversalDriver管理所有子设备泵、电机、传感器
"""
def __init__(self, port: str = "192.168.1.200:8899", baudrate: int = 9600,
pump_ids: List[int] = None, motor_ids: List[int] = None,
sensor_id: int = 6, sensor_threshold: int = 300):
"""
初始化 ChinWe 工作站
:param port: 串口号 或 IP:Port
:param baudrate: 串口波特率
:param pump_ids: 注射泵 ID列表 (默认 [1, 2, 3])
:param motor_ids: 步进电机 ID列表 (默认 [4, 5])
:param sensor_id: 液位传感器 ID (默认 6)
"""
super().__init__()
self.port = port
self.baudrate = baudrate
self.mgr = None
self._is_connected = False self._is_connected = False
# 默认配置
if pump_ids is None: pump_ids = [1, 2, 3]
if motor_ids is None: motor_ids = [4, 5]
# 配置信息
self.pump_ids = pump_ids
self.motor_ids = motor_ids
self.sensor_id = sensor_id
self.sensor_threshold = sensor_threshold
# 子设备实例容器
self.pumps: Dict[int, SyringePump] = {}
self.motors: Dict[int, EmmMotor] = {}
self.sensor: Optional[XKCSensor] = None
# 轮询线程控制
self._stop_event = threading.Event()
self._poll_thread = None
# 实时状态缓存
self.status_cache = {
"sensor_rssi": 0,
"sensor_level": False,
"connected": False
}
# 自动连接
if self.port:
self.connect() self.connect()
def connect(self) -> bool:
if self._is_connected: return True
try:
self.logger.info(f"Connecting to {self.port}...")
self.mgr = TransportManager(self.port, baudrate=self.baudrate, logger=self.logger)
# 初始化所有泵
for pid in self.pump_ids:
self.pumps[pid] = SyringePump(pid, self.mgr)
# 初始化所有电机
for mid in self.motor_ids:
self.motors[mid] = EmmMotor(mid, self.mgr)
# 初始化传感器
self.sensor = XKCSensor(self.sensor_id, self.mgr, self.sensor_threshold)
self._is_connected = True
self.status_cache["connected"] = True
# 启动轮询线程
self._start_polling()
return True
except Exception as e:
self.logger.error(f"Connection failed: {e}")
self._is_connected = False
self.status_cache["connected"] = False
return False
def disconnect(self):
self._stop_event.set()
if self._poll_thread:
self._poll_thread.join(timeout=2.0)
if self.mgr:
self.mgr.close()
self._is_connected = False
self.status_cache["connected"] = False
self.logger.info("Disconnected.")
def _start_polling(self):
"""启动传感器轮询线程"""
if self._poll_thread and self._poll_thread.is_alive():
return
self._stop_event.clear()
self._poll_thread = threading.Thread(target=self._polling_loop, daemon=True, name="ChinwePoll")
self._poll_thread.start()
def _polling_loop(self):
"""轮询主循环"""
self.logger.info("Sensor polling started.")
error_count = 0
while not self._stop_event.is_set():
if not self._is_connected or not self.sensor:
time.sleep(1)
continue
try:
# 获取传感器数据
data = self.sensor.read_level()
if data:
self.status_cache["sensor_rssi"] = data['rssi']
self.status_cache["sensor_level"] = data['level']
error_count = 0
else:
error_count += 1
# 降低轮询频率防止总线拥塞
time.sleep(0.2)
except Exception as e:
error_count += 1
if error_count > 10: # 连续错误记录日志
# self.logger.error(f"Polling error: {e}")
error_count = 0
time.sleep(1)
# --- 对外暴露属性 (Properties) ---
@property
def sensor_level(self) -> bool:
return self.status_cache["sensor_level"]
@property
def sensor_rssi(self) -> int:
return self.status_cache["sensor_rssi"]
@property @property
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""获取连接状态""" return self._is_connected
return self._is_connected and self.serial_port and self.serial_port.is_open
@property # --- 对外功能指令 (Actions) ---
def voltage(self) -> float:
"""获取电源电压值"""
return self._voltage
@property def pump_initialize(self, pump_id: int, drain_port=0, output_port=0, speed=10):
def ec_value(self) -> float: """指定泵初始化"""
"""获取电导率值 (ms/cm)""" pump_id = int(pump_id)
return self._ec_value if pump_id in self.pumps:
self.pumps[pump_id].initialize(drain_port, output_port, speed)
@property self.pumps[pump_id].wait_until_idle()
def ec_adc_value(self) -> int:
"""获取EC ADC原始值"""
return self._ec_adc_value
@property
def device_status(self) -> Dict[str, any]:
"""
获取设备状态信息
Returns:
包含设备状态的字典
"""
return {
"connected": self.is_connected,
"port": self.port,
"baudrate": self.baudrate,
"voltage": self.voltage,
"ec_value": self.ec_value,
"ec_adc_value": self.ec_adc_value
}
def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None) -> bool:
"""
连接到串口设备
Args:
port: 串口名称如果为None则使用初始化时的port或自动检测
baudrate: 波特率如果为None则使用初始化时的baudrate
Returns:
连接是否成功
"""
if self.is_connected:
return True return True
target_port = port or self.port
target_baudrate = baudrate or self.baudrate
try:
self.serial_port = serial.Serial(target_port, target_baudrate, timeout=0.5)
self._is_connected = True
self.port = target_port
self.baudrate = target_baudrate
connect_allow_times = 5
while not self.serial_port.is_open and connect_allow_times > 0:
time.sleep(0.5)
connect_allow_times -= 1
print(f"尝试连接到 {target_port} @ {target_baudrate},剩余尝试次数: {connect_allow_times}", self.debug)
raise ValueError("串口未打开,请检查设备连接")
print(f"已连接到 {target_port} @ {target_baudrate}", self.debug)
threading.Thread(target=self._read_data, daemon=True).start()
return True
except Exception as e:
print(f"ChinweDevice连接失败: {e}")
self._is_connected = False
return False return False
def disconnect(self) -> bool: def pump_aspirate(self, pump_id: int, volume: int, valve_port: int):
""" """
断开串口连接 泵吸液 (阻塞)
:param valve_port: 阀门端口 (1-8)
Returns:
断开是否成功
""" """
if self.serial_port and self.serial_port.is_open: pump_id = int(pump_id)
try: valve_port = int(valve_port)
self.serial_port.close() if pump_id in self.pumps:
self._is_connected = False pump = self.pumps[pump_id]
print("已断开串口连接") # 1. 切换阀门
pump.switch_valve(valve_port)
pump.wait_until_idle()
# 2. 吸液
pump.aspirate(volume)
pump.wait_until_idle()
return True return True
except Exception as e:
print(f"断开连接失败: {e}")
return False
return True
def _send_motor_command(self, command: str) -> bool:
"""
发送电机控制命令
Args:
command: 电机命令字符串,例如 "M 1 CW 1.5"
Returns:
发送是否成功
"""
if not self.is_connected:
print("设备未连接")
return False return False
try: def pump_dispense(self, pump_id: int, volume: int, valve_port: int):
self.serial_port.write((command + "\n").encode('utf-8')) """
print(f"发送命令: {command}") 泵排液 (阻塞)
:param valve_port: 阀门端口 (1-8)
"""
pump_id = int(pump_id)
valve_port = int(valve_port)
if pump_id in self.pumps:
pump = self.pumps[pump_id]
# 1. 切换阀门
pump.switch_valve(valve_port)
pump.wait_until_idle()
# 2. 排液
pump.dispense(volume)
pump.wait_until_idle()
return True return True
except Exception as e:
print(f"发送命令失败: {e}")
return False return False
def rotate_motor(self, motor_id: int, turns: float, clockwise: bool = True) -> bool: def pump_valve(self, pump_id: int, port: int):
"""泵切换阀门 (阻塞)"""
pump_id = int(pump_id)
port = int(port)
if pump_id in self.pumps:
pump = self.pumps[pump_id]
pump.switch_valve(port)
pump.wait_until_idle()
return True
return False
def motor_run_continuous(self, motor_id: int, speed: int, direction: str = "顺时针"):
""" """
使电机转动指定圈数 电机一直旋转 (速度模式)
:param direction: "顺时针" or "逆时针"
Args:
motor_id: 电机ID1, 2, 3...
turns: 转动圈数,支持小数
clockwise: True为顺时针False为逆时针
Returns:
命令发送是否成功
""" """
if clockwise: motor_id = int(motor_id)
command = f"M {motor_id} CW {turns}" if motor_id not in self.motors: return False
else:
command = f"M {motor_id} CCW {turns}"
return self._send_motor_command(command)
def set_motor_speed(self, motor_id: int, speed: float) -> bool: dir_val = 0 if direction == "顺时针" else 1
self.motors[motor_id].run_speed(speed, dir_val)
return True
def motor_rotate_quarter(self, motor_id: int, speed: int = 60, direction: str = "顺时针"):
""" """
设置电机转速(如果设备支持) 电机旋转1/4圈 (阻塞)
假设电机设置为 3200 脉冲/圈1/4圈 = 800脉冲
Args:
motor_id: 电机ID1, 2, 3...
speed: 转速值
Returns:
命令发送是否成功
""" """
command = f"M {motor_id} SPEED {speed}" motor_id = int(motor_id)
return self._send_motor_command(command) if motor_id not in self.motors: return False
def _read_data(self) -> List[str]: pulses = 800
dir_val = 0 if direction == "顺时针" else 1
self.motors[motor_id].run_position(pulses, speed, dir_val, absolute=False)
# 预估时间阻塞 (单位: 分钟 -> 秒)
# Time(s) = revs / (RPM/60). revs = 0.25. time = 15 / RPM.
estimated_time = 15.0 / max(1, speed)
time.sleep(estimated_time + 0.5)
return True
def motor_stop(self, motor_id: int):
"""电机停止"""
motor_id = int(motor_id)
if motor_id in self.motors:
self.motors[motor_id].stop()
return True
return False
def wait_sensor_level(self, target_state: str = "有液", timeout: int = 30) -> bool:
""" """
读取串口数据并解析 等待传感器达到指定电平
:param target_state: "有液" or "无液"
Returns:
读取到的数据行列表
""" """
print("开始读取串口数据...") target_bool = True if target_state == "有液" else False
if not self.is_connected:
return []
data_lines = [] self.logger.info(f"Wait sensor: {target_state} ({target_bool}), timeout: {timeout}")
try: start = time.time()
while self.serial_port.in_waiting: while time.time() - start < timeout:
time.sleep(0.1) # 等待数据稳定 if self.sensor_level == target_bool:
try: return True
line = self.serial_port.readline().decode('utf-8', errors='ignore').strip() time.sleep(0.1)
if line: self.logger.warning("Wait sensor level timeout")
data_lines.append(line) return False
self._parse_sensor_data(line)
except Exception as ex:
print(f"解码数据错误: {ex}")
except Exception as e:
print(f"读取串口数据错误: {e}")
return data_lines def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool:
"""支持标准 JSON 指令调用"""
return super().execute_command_from_outer(command_dict)
def _parse_sensor_data(self, line: str) -> None:
"""
解析传感器数据
Args:
line: 接收到的数据行
"""
# 解析电源电压
if "电源电压" in line:
try:
val = float(line.split("")[1].replace("V", "").strip())
self._voltage = val
if self.debug:
print(f"电源电压更新: {val}V")
except Exception:
pass
# 解析电导率和ADC原始值支持两种格式
if "电导率" in line and "ADC原始值" in line:
try:
# 支持格式如电导率2.50ms/cm, ADC原始值2052
ec_match = re.search(r"电导率[:]\s*([\d\.]+)", line)
adc_match = re.search(r"ADC原始值[:]\s*(\d+)", line)
if ec_match:
ec_val = float(ec_match.group(1))
self._ec_value = ec_val
if self.debug:
print(f"电导率更新: {ec_val:.2f} ms/cm")
if adc_match:
adc_val = int(adc_match.group(1))
self._ec_adc_value = adc_val
if self.debug:
print(f"EC ADC原始值更新: {adc_val}")
except Exception:
pass
# 仅电导率无ADC原始值
elif "电导率" in line:
try:
val = float(line.split("")[1].replace("ms/cm", "").strip())
self._ec_value = val
if self.debug:
print(f"电导率更新: {val:.2f} ms/cm")
except Exception:
pass
# 仅ADC原始值如有分开回传场景
elif "ADC原始值" in line:
try:
adc_val = int(line.split("")[1].strip())
self._ec_adc_value = adc_val
if self.debug:
print(f"EC ADC原始值更新: {adc_val}")
except Exception:
pass
def spin_when_ec_ge_0():
pass
def main():
"""测试函数"""
print("=== ChinWe设备测试 ===")
# 创建设备实例
device = ChinweDevice("/dev/tty.usbserial-A5069RR4", debug=True)
try:
# 测试5: 发送电机命令
print("\n5. 发送电机命令测试:")
print(" 5.3 使用通用函数控制电机20顺时针转2圈:")
device.rotate_motor(2, 20.0, clockwise=True)
time.sleep(0.5)
finally:
time.sleep(10)
# 测试7: 断开连接
print("\n7. 断开连接:")
device.disconnect()
if __name__ == "__main__": if __name__ == "__main__":
main() # Test
logging.basicConfig(level=logging.INFO)
dev = ChinweDevice(port="192.168.31.201:8899")
try:
if dev.is_connected:
print(f"Status: Level={dev.sensor_level}, RSSI={dev.sensor_rssi}")
# Test pump 1
# dev.pump_valve(1, 1)
# dev.pump_move(1, 1000, "aspirate")
# Test motor 4
# dev.motor_run(4, 60, 0, 2)
for _ in range(5):
print(f"Level={dev.sensor_level}, RSSI={dev.sensor_rssi}")
time.sleep(1)
finally:
dev.disconnect()

View File

@@ -0,0 +1,323 @@
separator.chinwe:
category:
- separator
- chinwe
class:
action_value_mappings:
motor_rotate_quarter:
goal:
direction: 顺时针
motor_id: 4
speed: 60
handles: {}
schema:
description: 电机旋转 1/4 圈
properties:
goal:
properties:
direction:
default: 顺时针
description: 旋转方向
enum:
- 顺时针
- 逆时针
type: string
motor_id:
default: '4'
description: 选择电机 (4:搅拌, 5:旋钮)
enum:
- '4'
- '5'
type: string
speed:
default: 60
description: 速度 (RPM)
type: integer
required:
- motor_id
- speed
type: object
type: UniLabJsonCommand
motor_run_continuous:
goal:
direction: 顺时针
motor_id: 4
speed: 60
handles: {}
schema:
description: 电机一直旋转 (速度模式)
properties:
goal:
properties:
direction:
default: 顺时针
description: 旋转方向
enum:
- 顺时针
- 逆时针
type: string
motor_id:
default: '4'
description: 选择电机 (4:搅拌, 5:旋钮)
enum:
- '4'
- '5'
type: string
speed:
default: 60
description: 速度 (RPM)
type: integer
required:
- motor_id
- speed
type: object
type: UniLabJsonCommand
motor_stop:
goal:
motor_id: 4
handles: {}
schema:
description: 停止指定步进电机
properties:
goal:
properties:
motor_id:
default: '4'
description: 选择电机
enum:
- '4'
- '5'
title: '注: 4=搅拌, 5=旋钮'
type: string
required:
- motor_id
type: object
type: UniLabJsonCommand
pump_aspirate:
goal:
pump_id: 1
valve_port: 1
volume: 1000
handles: {}
schema:
description: 注射泵吸液
properties:
goal:
properties:
pump_id:
default: '1'
description: 选择泵
enum:
- '1'
- '2'
- '3'
type: string
valve_port:
default: '1'
description: 阀门端口
enum:
- '1'
- '2'
- '3'
- '4'
- '5'
- '6'
- '7'
- '8'
type: string
volume:
default: 1000
description: 吸液步数
type: integer
required:
- pump_id
- volume
- valve_port
type: object
type: UniLabJsonCommand
pump_dispense:
goal:
pump_id: 1
valve_port: 1
volume: 1000
handles: {}
schema:
description: 注射泵排液
properties:
goal:
properties:
pump_id:
default: '1'
description: 选择泵
enum:
- '1'
- '2'
- '3'
type: string
valve_port:
default: '1'
description: 阀门端口
enum:
- '1'
- '2'
- '3'
- '4'
- '5'
- '6'
- '7'
- '8'
type: string
volume:
default: 1000
description: 排液步数
type: integer
required:
- pump_id
- volume
- valve_port
type: object
type: UniLabJsonCommand
pump_initialize:
goal:
drain_port: 0
output_port: 0
pump_id: 1
speed: 10
handles: {}
schema:
description: 初始化指定注射泵
properties:
goal:
properties:
drain_port:
default: 0
description: 排液口索引
type: integer
output_port:
default: 0
description: 输出口索引
type: integer
pump_id:
default: '1'
description: 选择泵
enum:
- '1'
- '2'
- '3'
title: '注: 1号泵, 2号泵, 3号泵'
type: string
speed:
default: 10
description: 运动速度
type: integer
required:
- pump_id
type: object
type: UniLabJsonCommand
pump_valve:
goal:
port: 1
pump_id: 1
handles: {}
schema:
description: 切换指定泵的阀门端口
properties:
goal:
properties:
port:
default: '1'
description: 阀门端口号 (1-8)
enum:
- '1'
- '2'
- '3'
- '4'
- '5'
- '6'
- '7'
- '8'
type: string
pump_id:
default: '1'
description: 选择泵
enum:
- '1'
- '2'
- '3'
type: string
required:
- pump_id
- port
type: object
type: UniLabJsonCommand
wait_sensor_level:
goal:
target_state: 有液
timeout: 30
handles: {}
schema:
description: 等待传感器液位条件
properties:
goal:
properties:
target_state:
default: 有液
description: 目标液位状态
enum:
- 有液
- 无液
type: string
timeout:
default: 30
description: 超时时间 (秒)
type: integer
required:
- target_state
type: object
type: UniLabJsonCommand
module: unilabos.devices.separator.chinwe:ChinweDevice
status_types:
is_connected: bool
sensor_level: bool
sensor_rssi: int
type: python
config_info: []
description: ChinWe 简易工作站控制器 (3泵, 2电机, 1传感器)
handles: []
icon: ''
init_param_schema:
goal:
baudrate:
default: 9600
description: 串口波特率
type: integer
motor_ids:
default:
- 4
- 5
description: 步进电机ID列表
items:
type: integer
type: array
port:
default: 192.168.1.200:8899
description: 串口号或 IP:Port
type: string
pump_ids:
default:
- 1
- 2
- 3
description: 注射泵ID列表
items:
type: integer
type: array
sensor_id:
default: 6
description: XKC传感器ID
type: integer
sensor_threshold:
default: 300
description: 传感器液位判定阈值
type: integer
version: 2.1.0

View File

@@ -0,0 +1,34 @@
{
"nodes": [
{
"id": "ChinWeStation",
"name": "分液工作站",
"children": [],
"parent": null,
"type": "device",
"class": "separator.chinwe",
"position": {
"x": 0,
"y": 0,
"z": 0
},
"config": {
"port": "192.168.31.13:8899",
"baudrate": 9600,
"pump_ids": [
1,
2,
3
],
"motor_ids": [
4,
5
],
"sensor_id": 6,
"sensor_threshold": 300
},
"data": {}
}
],
"links": []
}