From 39cc280c91fdd0b84f94134c699b4efb5f25e69b Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Fri, 19 Dec 2025 03:05:11 +0800 Subject: [PATCH] feat: Add `SyringePump` (SY-03B) driver with unified serial/TCP transport for `chinwe` device, including registry and test configurations. --- unilabos/devices/separator/chinwe.py | 868 ++++++++++++++++++-------- unilabos/registry/devices/chinwe.yaml | 323 ++++++++++ unilabos/test/experiments/chinwe.json | 34 + 3 files changed, 968 insertions(+), 257 deletions(-) create mode 100644 unilabos/registry/devices/chinwe.yaml create mode 100644 unilabos/test/experiments/chinwe.json diff --git a/unilabos/devices/separator/chinwe.py b/unilabos/devices/separator/chinwe.py index 4eeb0f47..3454a9b5 100644 --- a/unilabos/devices/separator/chinwe.py +++ b/unilabos/devices/separator/chinwe.py @@ -1,282 +1,636 @@ -import sys -import threading -import serial -import serial.tools.list_ports -import re -import time -from typing import Optional, List, Dict, Tuple +# -*- coding: utf-8 -*- +""" +Contains drivers for: +1. SyringePump: Runze Fluid SY-03B (ASCII) +2. EmmMotor: Emm V5.0 Closed-loop Stepper (Modbus-RTU variant) +3. XKCSensor: XKC Non-contact Level Sensor (Modbus-RTU) +""" -class ChinweDevice: +import socket +import serial +import time +import threading +import struct +import re +import traceback +import queue +from typing import Optional, Dict, List, Any + +try: + from unilabos.device_comms.universal_driver import UniversalDriver +except ImportError: + import logging + class UniversalDriver: + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + + def execute_command_from_outer(self, command: str): + pass + +# ============================================================================== +# 1. Transport Layer (通信层) +# ============================================================================== + +class TransportManager: """ - ChinWe设备控制类 - 提供串口通信、电机控制、传感器数据读取等功能 + 统一通信管理类。 + 自动识别 串口 (Serial) 或 网络 (TCP) 连接。 """ - - def __init__(self, port: str, baudrate: int = 115200, debug: bool = False): - """ - 初始化ChinWe设备 - - Args: - port: 串口名称,如果为None则自动检测 - baudrate: 波特率,默认115200 - """ - self.debug = debug + def __init__(self, port: str, baudrate: int = 9600, timeout: float = 3.0, logger=None): self.port = port self.baudrate = baudrate - self.serial_port: Optional[serial.Serial] = None - self._voltage: float = 0.0 - self._ec_value: float = 0.0 - self._ec_adc_value: int = 0 + self.timeout = timeout + self.logger = logger + self.lock = threading.RLock() # 线程锁,确保多设备共用一个连接时不冲突 + + self.is_tcp = False + self.serial = None + self.socket = None + + # 简单判断: 如果包含 ':' (如 192.168.1.1:8899) 或者看起来像 IP,则认为是 TCP + if ':' in self.port or (self.port.count('.') == 3 and not self.port.startswith('/')): + self.is_tcp = True + self._connect_tcp() + else: + self._connect_serial() + + def _log(self, msg): + if self.logger: + pass + # self.logger.debug(f"[Transport] {msg}") + + def _connect_tcp(self): + try: + if ':' in self.port: + host, p = self.port.split(':') + self.tcp_host = host + self.tcp_port = int(p) + else: + self.tcp_host = self.port + self.tcp_port = 8899 # 默认端口 + + # if self.logger: self.logger.info(f"Connecting TCP {self.tcp_host}:{self.tcp_port} ...") + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(self.timeout) + self.socket.connect((self.tcp_host, self.tcp_port)) + except Exception as e: + raise ConnectionError(f"TCP connection failed: {e}") + + def _connect_serial(self): + try: + # if self.logger: self.logger.info(f"Opening Serial {self.port} (Baud: {self.baudrate}) ...") + self.serial = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=self.timeout + ) + except Exception as e: + raise ConnectionError(f"Serial open failed: {e}") + + def close(self): + """关闭连接""" + if self.is_tcp and self.socket: + try: self.socket.close() + except: pass + elif not self.is_tcp and self.serial and self.serial.is_open: + self.serial.close() + + def clear_buffer(self): + """清空缓冲区 (Thread-safe)""" + with self.lock: + if self.is_tcp: + self.socket.setblocking(False) + try: + while True: + if not self.socket.recv(1024): break + except: pass + finally: self.socket.settimeout(self.timeout) + else: + self.serial.reset_input_buffer() + + def write(self, data: bytes): + """发送原始字节""" + with self.lock: + if self.is_tcp: + self.socket.sendall(data) + else: + self.serial.write(data) + + def read(self, size: int) -> bytes: + """读取指定长度字节""" + if self.is_tcp: + data = b'' + start = time.time() + while len(data) < size: + if time.time() - start > self.timeout: break + try: + chunk = self.socket.recv(size - len(data)) + if not chunk: break + data += chunk + except socket.timeout: break + return data + else: + return self.serial.read(size) + + def send_ascii_command(self, command: str) -> str: + """ + 发送 ASCII 字符串命令 (如注射泵指令),读取直到 '\r'。 + """ + with self.lock: + data = command.encode('ascii') if isinstance(command, str) else command + self.clear_buffer() + self.write(data) + + # Read until \r + if self.is_tcp: + resp = b'' + start = time.time() + while True: + if time.time() - start > self.timeout: break + try: + char = self.socket.recv(1) + if not char: break + resp += char + if char == b'\r': break + except: break + return resp.decode('ascii', errors='ignore').strip() + else: + return self.serial.read_until(b'\r').decode('ascii', errors='ignore').strip() + +# ============================================================================== +# 2. Syringe Pump Driver (注射泵) +# ============================================================================== + +class SyringePump: + """SY-03B 注射泵驱动 (ASCII协议)""" + + CMD_INITIALIZE = "Z{speed},{drain_port},{output_port}R" + CMD_SWITCH_VALVE = "I{port}R" + CMD_ASPIRATE = "P{vol}R" + CMD_DISPENSE = "D{vol}R" + CMD_DISPENSE_ALL = "A0R" + CMD_STOP = "TR" + CMD_QUERY_STATUS = "Q" + CMD_QUERY_PLUNGER = "?0" + + def __init__(self, device_id: int, transport: TransportManager): + if not 1 <= device_id <= 15: + pass # Allow all IDs for now + self.id = str(device_id) + self.transport = transport + + def _send(self, template: str, **kwargs) -> str: + cmd = f"/{self.id}" + template.format(**kwargs) + "\r" + return self.transport.send_ascii_command(cmd) + + def is_busy(self) -> bool: + """查询繁忙状态""" + resp = self._send(self.CMD_QUERY_STATUS) + # 响应如 /0` (Ready, 0x60) 或 /0@ (Busy, 0x40) + if len(resp) >= 3: + status_byte = ord(resp[2]) + # Bit 5: 1=Ready, 0=Busy + return (status_byte & 0x20) == 0 + return False + + def wait_until_idle(self, timeout=30): + """阻塞等待直到空闲""" + start = time.time() + while time.time() - start < timeout: + if not self.is_busy(): return + time.sleep(0.5) + # raise TimeoutError(f"Pump {self.id} wait idle timeout") + pass + + def initialize(self, drain_port=0, output_port=0, speed=10): + """初始化""" + self._send(self.CMD_INITIALIZE, speed=speed, drain_port=drain_port, output_port=output_port) + + def switch_valve(self, port: int): + """切换阀门 (1-8)""" + self._send(self.CMD_SWITCH_VALVE, port=port) + + def aspirate(self, steps: int): + """吸液 (相对步数)""" + self._send(self.CMD_ASPIRATE, vol=steps) + + def dispense(self, steps: int): + """排液 (相对步数)""" + self._send(self.CMD_DISPENSE, vol=steps) + + def stop(self): + """停止""" + self._send(self.CMD_STOP) + + def get_position(self) -> int: + """获取柱塞位置 (步数)""" + resp = self._send(self.CMD_QUERY_PLUNGER) + m = re.search(r'\d+', resp) + return int(m.group()) if m else -1 + +# ============================================================================== +# 3. Stepper Motor Driver (步进电机) +# ============================================================================== + +class EmmMotor: + """Emm V5.0 闭环步进电机驱动""" + + def __init__(self, device_id: int, transport: TransportManager): + self.id = device_id + self.transport = transport + + def _send(self, func_code: int, payload: list) -> bytes: + with self.transport.lock: + self.transport.clear_buffer() + # 格式: [ID] [Func] [Data...] [Check=0x6B] + body = [self.id, func_code] + payload + body.append(0x6B) # Checksum + self.transport.write(bytes(body)) + + # 根据指令不同,读取不同长度响应 + read_len = 10 if func_code in [0x31, 0x32, 0x35, 0x24, 0x27] else 4 + return self.transport.read(read_len) + + def enable(self, on=True): + """使能 (True=锁轴, False=松轴)""" + state = 1 if on else 0 + self._send(0xF3, [0xAB, state, 0]) + + def run_speed(self, speed_rpm: int, direction=0, acc=10): + """速度模式运行""" + sp = struct.pack('>H', int(speed_rpm)) + self._send(0xF6, [direction, sp[0], sp[1], acc, 0]) + + def run_position(self, pulses: int, speed_rpm: int, direction=0, acc=10, absolute=False): + """位置模式运行""" + sp = struct.pack('>H', int(speed_rpm)) + pl = struct.pack('>I', int(pulses)) + is_abs = 1 if absolute else 0 + self._send(0xFD, [direction, sp[0], sp[1], acc, pl[0], pl[1], pl[2], pl[3], is_abs, 0]) + + def stop(self): + """停止""" + self._send(0xFE, [0x98, 0]) + + def set_zero(self): + """清零位置""" + self._send(0x0A, []) + + def get_position(self) -> int: + """获取当前脉冲位置""" + resp = self._send(0x32, []) + if len(resp) >= 8: + sign = resp[2] + val = struct.unpack('>I', resp[3:7])[0] + return -val if sign == 1 else val + return 0 + +# ============================================================================== +# 4. Liquid Sensor Driver (液位传感器) +# ============================================================================== + +class XKCSensor: + """XKC RS485 液位传感器 (Modbus RTU)""" + + def __init__(self, device_id: int, transport: TransportManager, threshold: int = 300): + self.id = device_id + self.transport = transport + self.threshold = threshold + + def _crc(self, data: bytes) -> bytes: + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 + else: crc >>= 1 + return struct.pack(' Optional[Dict[str, Any]]: + """ + 读取液位。 + 返回: {'level': bool, 'rssi': int} + """ + with self.transport.lock: + self.transport.clear_buffer() + # Modbus Read Registers: 01 03 00 01 00 02 CRC + payload = struct.pack('>HH', 0x0001, 0x0002) + msg = struct.pack('BB', self.id, 0x03) + payload + msg += self._crc(msg) + self.transport.write(msg) + + # Read header + h = self.transport.read(3) # Addr, Func, Len + if len(h) < 3: return None + length = h[2] + + # Read body + CRC + body = self.transport.read(length + 2) + if len(body) < length + 2: + # Firmware bug fix specific to some modules + if len(body) == 4 and length == 4: + pass + else: + return None + + data = body[:-2] + if len(data) == 2: + rssi = data[1] + elif len(data) >= 4: + rssi = (data[2] << 8) | data[3] + else: + return None + + return { + 'level': rssi > self.threshold, + 'rssi': rssi + } + +# ============================================================================== +# 5. Main Device Class (ChinweDevice) +# ============================================================================== + +class ChinweDevice(UniversalDriver): + """ + ChinWe 工作站主驱动 + 继承自 UniversalDriver,管理所有子设备(泵、电机、传感器) + """ + + def __init__(self, port: str = "192.168.1.200:8899", baudrate: int = 9600, + pump_ids: List[int] = None, motor_ids: List[int] = None, + sensor_id: int = 6, sensor_threshold: int = 300): + """ + 初始化 ChinWe 工作站 + :param port: 串口号 或 IP:Port + :param baudrate: 串口波特率 + :param pump_ids: 注射泵 ID列表 (默认 [1, 2, 3]) + :param motor_ids: 步进电机 ID列表 (默认 [4, 5]) + :param sensor_id: 液位传感器 ID (默认 6) + """ + super().__init__() + self.port = port + self.baudrate = baudrate + self.mgr = None self._is_connected = False - self.connect() - + + # 默认配置 + if pump_ids is None: pump_ids = [1, 2, 3] + if motor_ids is None: motor_ids = [4, 5] + + # 配置信息 + self.pump_ids = pump_ids + self.motor_ids = motor_ids + self.sensor_id = sensor_id + self.sensor_threshold = sensor_threshold + + # 子设备实例容器 + self.pumps: Dict[int, SyringePump] = {} + self.motors: Dict[int, EmmMotor] = {} + self.sensor: Optional[XKCSensor] = None + + # 轮询线程控制 + self._stop_event = threading.Event() + self._poll_thread = None + + # 实时状态缓存 + self.status_cache = { + "sensor_rssi": 0, + "sensor_level": False, + "connected": False + } + + # 自动连接 + if self.port: + self.connect() + + def connect(self) -> bool: + if self._is_connected: return True + try: + self.logger.info(f"Connecting to {self.port}...") + self.mgr = TransportManager(self.port, baudrate=self.baudrate, logger=self.logger) + + # 初始化所有泵 + for pid in self.pump_ids: + self.pumps[pid] = SyringePump(pid, self.mgr) + + # 初始化所有电机 + for mid in self.motor_ids: + self.motors[mid] = EmmMotor(mid, self.mgr) + + # 初始化传感器 + self.sensor = XKCSensor(self.sensor_id, self.mgr, self.sensor_threshold) + + self._is_connected = True + self.status_cache["connected"] = True + + # 启动轮询线程 + self._start_polling() + return True + except Exception as e: + self.logger.error(f"Connection failed: {e}") + self._is_connected = False + self.status_cache["connected"] = False + return False + + def disconnect(self): + self._stop_event.set() + if self._poll_thread: + self._poll_thread.join(timeout=2.0) + + if self.mgr: + self.mgr.close() + + self._is_connected = False + self.status_cache["connected"] = False + self.logger.info("Disconnected.") + + def _start_polling(self): + """启动传感器轮询线程""" + if self._poll_thread and self._poll_thread.is_alive(): + return + + self._stop_event.clear() + self._poll_thread = threading.Thread(target=self._polling_loop, daemon=True, name="ChinwePoll") + self._poll_thread.start() + + def _polling_loop(self): + """轮询主循环""" + self.logger.info("Sensor polling started.") + error_count = 0 + while not self._stop_event.is_set(): + if not self._is_connected or not self.sensor: + time.sleep(1) + continue + + try: + # 获取传感器数据 + data = self.sensor.read_level() + if data: + self.status_cache["sensor_rssi"] = data['rssi'] + self.status_cache["sensor_level"] = data['level'] + error_count = 0 + else: + error_count += 1 + + # 降低轮询频率防止总线拥塞 + time.sleep(0.2) + + except Exception as e: + error_count += 1 + if error_count > 10: # 连续错误记录日志 + # self.logger.error(f"Polling error: {e}") + error_count = 0 + time.sleep(1) + + # --- 对外暴露属性 (Properties) --- + + @property + def sensor_level(self) -> bool: + return self.status_cache["sensor_level"] + + @property + def sensor_rssi(self) -> int: + return self.status_cache["sensor_rssi"] + @property def is_connected(self) -> bool: - """获取连接状态""" - return self._is_connected and self.serial_port and self.serial_port.is_open - - @property - def voltage(self) -> float: - """获取电源电压值""" - return self._voltage - - @property - def ec_value(self) -> float: - """获取电导率值 (ms/cm)""" - return self._ec_value + return self._is_connected - @property - def ec_adc_value(self) -> int: - """获取EC ADC原始值""" - return self._ec_adc_value - + # --- 对外功能指令 (Actions) --- - @property - def device_status(self) -> Dict[str, any]: - """ - 获取设备状态信息 - - Returns: - 包含设备状态的字典 - """ - return { - "connected": self.is_connected, - "port": self.port, - "baudrate": self.baudrate, - "voltage": self.voltage, - "ec_value": self.ec_value, - "ec_adc_value": self.ec_adc_value - } - - def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None) -> bool: - """ - 连接到串口设备 - - Args: - port: 串口名称,如果为None则使用初始化时的port或自动检测 - baudrate: 波特率,如果为None则使用初始化时的baudrate - - Returns: - 连接是否成功 - """ - if self.is_connected: + def pump_initialize(self, pump_id: int, drain_port=0, output_port=0, speed=10): + """指定泵初始化""" + pump_id = int(pump_id) + if pump_id in self.pumps: + self.pumps[pump_id].initialize(drain_port, output_port, speed) + self.pumps[pump_id].wait_until_idle() return True - - target_port = port or self.port - target_baudrate = baudrate or self.baudrate - - try: - self.serial_port = serial.Serial(target_port, target_baudrate, timeout=0.5) - self._is_connected = True - self.port = target_port - self.baudrate = target_baudrate - connect_allow_times = 5 - while not self.serial_port.is_open and connect_allow_times > 0: - time.sleep(0.5) - connect_allow_times -= 1 - print(f"尝试连接到 {target_port} @ {target_baudrate},剩余尝试次数: {connect_allow_times}", self.debug) - raise ValueError("串口未打开,请检查设备连接") - print(f"已连接到 {target_port} @ {target_baudrate}", self.debug) - threading.Thread(target=self._read_data, daemon=True).start() + return False + + def pump_aspirate(self, pump_id: int, volume: int, valve_port: int): + """ + 泵吸液 (阻塞) + :param valve_port: 阀门端口 (1-8) + """ + pump_id = int(pump_id) + valve_port = int(valve_port) + if pump_id in self.pumps: + pump = self.pumps[pump_id] + # 1. 切换阀门 + pump.switch_valve(valve_port) + pump.wait_until_idle() + # 2. 吸液 + pump.aspirate(volume) + pump.wait_until_idle() return True - except Exception as e: - print(f"ChinweDevice连接失败: {e}") - self._is_connected = False - return False - - def disconnect(self) -> bool: + return False + + def pump_dispense(self, pump_id: int, volume: int, valve_port: int): """ - 断开串口连接 - - Returns: - 断开是否成功 + 泵排液 (阻塞) + :param valve_port: 阀门端口 (1-8) """ - if self.serial_port and self.serial_port.is_open: - try: - self.serial_port.close() - self._is_connected = False - print("已断开串口连接") - return True - except Exception as e: - print(f"断开连接失败: {e}") - return False + pump_id = int(pump_id) + valve_port = int(valve_port) + if pump_id in self.pumps: + pump = self.pumps[pump_id] + # 1. 切换阀门 + pump.switch_valve(valve_port) + pump.wait_until_idle() + # 2. 排液 + pump.dispense(volume) + pump.wait_until_idle() + return True + return False + + def pump_valve(self, pump_id: int, port: int): + """泵切换阀门 (阻塞)""" + pump_id = int(pump_id) + port = int(port) + if pump_id in self.pumps: + pump = self.pumps[pump_id] + pump.switch_valve(port) + pump.wait_until_idle() + return True + return False + + def motor_run_continuous(self, motor_id: int, speed: int, direction: str = "顺时针"): + """ + 电机一直旋转 (速度模式) + :param direction: "顺时针" or "逆时针" + """ + motor_id = int(motor_id) + if motor_id not in self.motors: return False + + dir_val = 0 if direction == "顺时针" else 1 + self.motors[motor_id].run_speed(speed, dir_val) return True - - def _send_motor_command(self, command: str) -> bool: + + def motor_rotate_quarter(self, motor_id: int, speed: int = 60, direction: str = "顺时针"): """ - 发送电机控制命令 - - Args: - command: 电机命令字符串,例如 "M 1 CW 1.5" - - Returns: - 发送是否成功 + 电机旋转1/4圈 (阻塞) + 假设电机设置为 3200 脉冲/圈,1/4圈 = 800脉冲 """ - if not self.is_connected: - print("设备未连接") - return False - - try: - self.serial_port.write((command + "\n").encode('utf-8')) - print(f"发送命令: {command}") + motor_id = int(motor_id) + if motor_id not in self.motors: return False + + pulses = 800 + dir_val = 0 if direction == "顺时针" else 1 + + self.motors[motor_id].run_position(pulses, speed, dir_val, absolute=False) + + # 预估时间阻塞 (单位: 分钟 -> 秒) + # Time(s) = revs / (RPM/60). revs = 0.25. time = 15 / RPM. + estimated_time = 15.0 / max(1, speed) + time.sleep(estimated_time + 0.5) + + return True + + def motor_stop(self, motor_id: int): + """电机停止""" + motor_id = int(motor_id) + if motor_id in self.motors: + self.motors[motor_id].stop() return True - except Exception as e: - print(f"发送命令失败: {e}") - return False - - def rotate_motor(self, motor_id: int, turns: float, clockwise: bool = True) -> bool: - """ - 使电机转动指定圈数 - - Args: - motor_id: 电机ID(1, 2, 3...) - turns: 转动圈数,支持小数 - clockwise: True为顺时针,False为逆时针 - - Returns: - 命令发送是否成功 - """ - if clockwise: - command = f"M {motor_id} CW {turns}" - else: - command = f"M {motor_id} CCW {turns}" - return self._send_motor_command(command) + return False - def set_motor_speed(self, motor_id: int, speed: float) -> bool: + def wait_sensor_level(self, target_state: str = "有液", timeout: int = 30) -> bool: """ - 设置电机转速(如果设备支持) - - Args: - motor_id: 电机ID(1, 2, 3...) - speed: 转速值 - - Returns: - 命令发送是否成功 + 等待传感器达到指定电平 + :param target_state: "有液" or "无液" """ - command = f"M {motor_id} SPEED {speed}" - return self._send_motor_command(command) + target_bool = True if target_state == "有液" else False - def _read_data(self) -> List[str]: - """ - 读取串口数据并解析 - - Returns: - 读取到的数据行列表 - """ - print("开始读取串口数据...") - if not self.is_connected: - return [] - - data_lines = [] - try: - while self.serial_port.in_waiting: - time.sleep(0.1) # 等待数据稳定 - try: - line = self.serial_port.readline().decode('utf-8', errors='ignore').strip() - if line: - data_lines.append(line) - self._parse_sensor_data(line) - except Exception as ex: - print(f"解码数据错误: {ex}") - except Exception as e: - print(f"读取串口数据错误: {e}") - - return data_lines - - def _parse_sensor_data(self, line: str) -> None: - """ - 解析传感器数据 - - Args: - line: 接收到的数据行 - """ - # 解析电源电压 - if "电源电压" in line: - try: - val = float(line.split(":")[1].replace("V", "").strip()) - self._voltage = val - if self.debug: - print(f"电源电压更新: {val}V") - except Exception: - pass + self.logger.info(f"Wait sensor: {target_state} ({target_bool}), timeout: {timeout}") + start = time.time() + while time.time() - start < timeout: + if self.sensor_level == target_bool: + return True + time.sleep(0.1) + self.logger.warning("Wait sensor level timeout") + return False - # 解析电导率和ADC原始值(支持两种格式) - if "电导率" in line and "ADC原始值" in line: - try: - # 支持格式如:电导率:2.50ms/cm, ADC原始值:2052 - ec_match = re.search(r"电导率[::]\s*([\d\.]+)", line) - adc_match = re.search(r"ADC原始值[::]\s*(\d+)", line) - if ec_match: - ec_val = float(ec_match.group(1)) - self._ec_value = ec_val - if self.debug: - print(f"电导率更新: {ec_val:.2f} ms/cm") - if adc_match: - adc_val = int(adc_match.group(1)) - self._ec_adc_value = adc_val - if self.debug: - print(f"EC ADC原始值更新: {adc_val}") - except Exception: - pass - # 仅电导率,无ADC原始值 - elif "电导率" in line: - try: - val = float(line.split(":")[1].replace("ms/cm", "").strip()) - self._ec_value = val - if self.debug: - print(f"电导率更新: {val:.2f} ms/cm") - except Exception: - pass - # 仅ADC原始值(如有分开回传场景) - elif "ADC原始值" in line: - try: - adc_val = int(line.split(":")[1].strip()) - self._ec_adc_value = adc_val - if self.debug: - print(f"EC ADC原始值更新: {adc_val}") - except Exception: - pass - - def spin_when_ec_ge_0(): - pass - + def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool: + """支持标准 JSON 指令调用""" + return super().execute_command_from_outer(command_dict) -def main(): - """测试函数""" - print("=== ChinWe设备测试 ===") - - # 创建设备实例 - device = ChinweDevice("/dev/tty.usbserial-A5069RR4", debug=True) - try: - # 测试5: 发送电机命令 - print("\n5. 发送电机命令测试:") - print(" 5.3 使用通用函数控制电机20顺时针转2圈:") - device.rotate_motor(2, 20.0, clockwise=True) - time.sleep(0.5) - finally: - time.sleep(10) - # 测试7: 断开连接 - print("\n7. 断开连接:") - device.disconnect() if __name__ == "__main__": - main() + # Test + logging.basicConfig(level=logging.INFO) + dev = ChinweDevice(port="192.168.31.201:8899") + try: + if dev.is_connected: + print(f"Status: Level={dev.sensor_level}, RSSI={dev.sensor_rssi}") + + # Test pump 1 + # dev.pump_valve(1, 1) + # dev.pump_move(1, 1000, "aspirate") + + # Test motor 4 + # dev.motor_run(4, 60, 0, 2) + + for _ in range(5): + print(f"Level={dev.sensor_level}, RSSI={dev.sensor_rssi}") + time.sleep(1) + finally: + dev.disconnect() diff --git a/unilabos/registry/devices/chinwe.yaml b/unilabos/registry/devices/chinwe.yaml new file mode 100644 index 00000000..3d9a2638 --- /dev/null +++ b/unilabos/registry/devices/chinwe.yaml @@ -0,0 +1,323 @@ +separator.chinwe: + category: + - separator + - chinwe + class: + action_value_mappings: + motor_rotate_quarter: + goal: + direction: 顺时针 + motor_id: 4 + speed: 60 + handles: {} + schema: + description: 电机旋转 1/4 圈 + properties: + goal: + properties: + direction: + default: 顺时针 + description: 旋转方向 + enum: + - 顺时针 + - 逆时针 + type: string + motor_id: + default: '4' + description: 选择电机 (4:搅拌, 5:旋钮) + enum: + - '4' + - '5' + type: string + speed: + default: 60 + description: 速度 (RPM) + type: integer + required: + - motor_id + - speed + type: object + type: UniLabJsonCommand + motor_run_continuous: + goal: + direction: 顺时针 + motor_id: 4 + speed: 60 + handles: {} + schema: + description: 电机一直旋转 (速度模式) + properties: + goal: + properties: + direction: + default: 顺时针 + description: 旋转方向 + enum: + - 顺时针 + - 逆时针 + type: string + motor_id: + default: '4' + description: 选择电机 (4:搅拌, 5:旋钮) + enum: + - '4' + - '5' + type: string + speed: + default: 60 + description: 速度 (RPM) + type: integer + required: + - motor_id + - speed + type: object + type: UniLabJsonCommand + motor_stop: + goal: + motor_id: 4 + handles: {} + schema: + description: 停止指定步进电机 + properties: + goal: + properties: + motor_id: + default: '4' + description: 选择电机 + enum: + - '4' + - '5' + title: '注: 4=搅拌, 5=旋钮' + type: string + required: + - motor_id + type: object + type: UniLabJsonCommand + pump_aspirate: + goal: + pump_id: 1 + valve_port: 1 + volume: 1000 + handles: {} + schema: + description: 注射泵吸液 + properties: + goal: + properties: + pump_id: + default: '1' + description: 选择泵 + enum: + - '1' + - '2' + - '3' + type: string + valve_port: + default: '1' + description: 阀门端口 + enum: + - '1' + - '2' + - '3' + - '4' + - '5' + - '6' + - '7' + - '8' + type: string + volume: + default: 1000 + description: 吸液步数 + type: integer + required: + - pump_id + - volume + - valve_port + type: object + type: UniLabJsonCommand + pump_dispense: + goal: + pump_id: 1 + valve_port: 1 + volume: 1000 + handles: {} + schema: + description: 注射泵排液 + properties: + goal: + properties: + pump_id: + default: '1' + description: 选择泵 + enum: + - '1' + - '2' + - '3' + type: string + valve_port: + default: '1' + description: 阀门端口 + enum: + - '1' + - '2' + - '3' + - '4' + - '5' + - '6' + - '7' + - '8' + type: string + volume: + default: 1000 + description: 排液步数 + type: integer + required: + - pump_id + - volume + - valve_port + type: object + type: UniLabJsonCommand + pump_initialize: + goal: + drain_port: 0 + output_port: 0 + pump_id: 1 + speed: 10 + handles: {} + schema: + description: 初始化指定注射泵 + properties: + goal: + properties: + drain_port: + default: 0 + description: 排液口索引 + type: integer + output_port: + default: 0 + description: 输出口索引 + type: integer + pump_id: + default: '1' + description: 选择泵 + enum: + - '1' + - '2' + - '3' + title: '注: 1号泵, 2号泵, 3号泵' + type: string + speed: + default: 10 + description: 运动速度 + type: integer + required: + - pump_id + type: object + type: UniLabJsonCommand + pump_valve: + goal: + port: 1 + pump_id: 1 + handles: {} + schema: + description: 切换指定泵的阀门端口 + properties: + goal: + properties: + port: + default: '1' + description: 阀门端口号 (1-8) + enum: + - '1' + - '2' + - '3' + - '4' + - '5' + - '6' + - '7' + - '8' + type: string + pump_id: + default: '1' + description: 选择泵 + enum: + - '1' + - '2' + - '3' + type: string + required: + - pump_id + - port + type: object + type: UniLabJsonCommand + wait_sensor_level: + goal: + target_state: 有液 + timeout: 30 + handles: {} + schema: + description: 等待传感器液位条件 + properties: + goal: + properties: + target_state: + default: 有液 + description: 目标液位状态 + enum: + - 有液 + - 无液 + type: string + timeout: + default: 30 + description: 超时时间 (秒) + type: integer + required: + - target_state + type: object + type: UniLabJsonCommand + module: unilabos.devices.separator.chinwe:ChinweDevice + status_types: + is_connected: bool + sensor_level: bool + sensor_rssi: int + type: python + config_info: [] + description: ChinWe 简易工作站控制器 (3泵, 2电机, 1传感器) + handles: [] + icon: '' + init_param_schema: + goal: + baudrate: + default: 9600 + description: 串口波特率 + type: integer + motor_ids: + default: + - 4 + - 5 + description: 步进电机ID列表 + items: + type: integer + type: array + port: + default: 192.168.1.200:8899 + description: 串口号或 IP:Port + type: string + pump_ids: + default: + - 1 + - 2 + - 3 + description: 注射泵ID列表 + items: + type: integer + type: array + sensor_id: + default: 6 + description: XKC传感器ID + type: integer + sensor_threshold: + default: 300 + description: 传感器液位判定阈值 + type: integer + version: 2.1.0 diff --git a/unilabos/test/experiments/chinwe.json b/unilabos/test/experiments/chinwe.json new file mode 100644 index 00000000..26950c78 --- /dev/null +++ b/unilabos/test/experiments/chinwe.json @@ -0,0 +1,34 @@ +{ + "nodes": [ + { + "id": "ChinWeStation", + "name": "分液工作站", + "children": [], + "parent": null, + "type": "device", + "class": "separator.chinwe", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "port": "192.168.31.13:8899", + "baudrate": 9600, + "pump_ids": [ + 1, + 2, + 3 + ], + "motor_ids": [ + 4, + 5 + ], + "sensor_id": 6, + "sensor_threshold": 300 + }, + "data": {} + } + ], + "links": [] +}