diff --git a/unilabos/devices/motor/ZDT_X42.py b/unilabos/devices/motor/ZDT_X42.py new file mode 100644 index 0000000..0d1566c --- /dev/null +++ b/unilabos/devices/motor/ZDT_X42.py @@ -0,0 +1,376 @@ +# -*- coding: utf-8 -*- +""" +ZDT X42 Closed-Loop Stepper Motor Driver +RS485 Serial Communication via USB-Serial Converter + +- Baudrate: 115200 +""" + +import serial +import time +import threading +import struct +import logging +from typing import Optional, Any + +try: + from unilabos.device_comms.universal_driver import UniversalDriver +except ImportError: + class UniversalDriver: + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(self.__class__.__name__) + def execute_command_from_outer(self, command: Any): pass + +from serial.rs485 import RS485Settings + + +class ZDTX42Driver(UniversalDriver): + """ + ZDT X42 闭环步进电机驱动器 + + 支持功能: + - 速度模式运行 + - 位置模式运行 (相对/绝对) + - 位置读取和清零 + - 使能/禁用控制 + + 通信协议: + - 帧格式: [设备ID] [功能码] [数据...] [校验位=0x6B] + - 响应长度根据功能码决定 + """ + + def __init__( + self, + port: str, + baudrate: int = 115200, + device_id: int = 1, + timeout: float = 0.5, + debug: bool = False + ): + """ + 初始化 ZDT X42 电机驱动 + + Args: + port: 串口设备路径 + baudrate: 波特率 (默认 115200) + device_id: 设备地址 (1-255) + timeout: 通信超时时间(秒) + debug: 是否启用调试输出 + """ + super().__init__() + self.id = device_id + self.debug = debug + self.lock = threading.RLock() + self.status = "idle" # 对应注册表中的 status (str) + self.position = 0 # 对应注册表中的 position (int) + + try: + self.ser = serial.Serial( + port=port, + baudrate=baudrate, + timeout=timeout, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE + ) + + # 启用 RS485 模式 + try: + self.ser.rs485_mode = RS485Settings( + rts_level_for_tx=True, + rts_level_for_rx=False + ) + except Exception: + pass # RS485 模式是可选的 + + self.logger.info( + f"ZDT X42 Motor connected: {port} " + f"(Baud: {baudrate}, ID: {device_id})" + ) + # 自动使能电机,确保初始状态可运动 + self.enable(True) + + # 启动背景轮询线程,确保 position 实时刷新 + self._stop_event = threading.Event() + self._polling_thread = threading.Thread( + target=self._update_loop, + name=f"ZDTPolling_{port}", + daemon=True + ) + self._polling_thread.start() + except Exception as e: + self.logger.error(f"Failed to open serial port {port}: {e}") + self.ser = None + + def _update_loop(self): + """背景循环读取电机位置""" + while not self._stop_event.is_set(): + try: + self.get_position() + except Exception as e: + if self.debug: + self.logger.error(f"Polling error: {e}") + time.sleep(1.0) # 每1秒刷新一次位置数据 + + def _send(self, func_code: int, payload: list) -> bytes: + """ + 发送指令并接收响应 + + Args: + func_code: 功能码 + payload: 数据负载 (list of bytes) + + Returns: + 响应数据 (bytes) + """ + if not self.ser: + self.logger.error("Serial port not available") + return b"" + + with self.lock: + # 清空输入缓冲区 + self.ser.reset_input_buffer() + + # 构建消息: [ID] [功能码] [数据...] [校验位=0x6B] + message = bytes([self.id, func_code] + payload + [0x6B]) + + # 发送 + self.ser.write(message) + + # 根据功能码决定响应长度 + # 查询类指令返回 10 字节,控制类指令返回 4 字节 + read_len = 10 if func_code in [0x31, 0x32, 0x35, 0x24, 0x27] else 4 + response = self.ser.read(read_len) + + # 调试输出 + if self.debug: + sent_hex = message.hex().upper() + recv_hex = response.hex().upper() if response else 'TIMEOUT' + print(f"[ID {self.id}] TX: {sent_hex} → RX: {recv_hex}") + + return response + + def enable(self, on: bool = True) -> bool: + """ + 使能/禁用电机 + + Args: + on: True=使能(锁轴), False=禁用(松轴) + + Returns: + 是否成功 + """ + state = 1 if on else 0 + resp = self._send(0xF3, [0xAB, state, 0]) + return len(resp) >= 4 + + def move_speed( + self, + speed_rpm: int, + direction: str = "CW", + acceleration: int = 10 + ) -> bool: + """ + 速度模式运行 + + Args: + speed_rpm: 转速 (RPM) + direction: 方向 ("CW"=顺时针, "CCW"=逆时针) + acceleration: 加速度 (0-255) + + Returns: + 是否成功 + """ + dir_val = 0 if direction.upper() in ["CW", "顺时针"] else 1 + speed_bytes = struct.pack('>H', int(speed_rpm)) + self.status = f"moving@{speed_rpm}rpm" + resp = self._send(0xF6, [dir_val, speed_bytes[0], speed_bytes[1], acceleration, 0]) + return len(resp) >= 4 + + def move_position( + self, + pulses: int, + speed_rpm: int, + direction: str = "CW", + acceleration: int = 10, + absolute: bool = False + ) -> bool: + """ + 位置模式运行 + + Args: + pulses: 脉冲数 + speed_rpm: 转速 (RPM) + direction: 方向 ("CW"=顺时针, "CCW"=逆时针) + acceleration: 加速度 (0-255) + absolute: True=绝对位置, False=相对位置 + + Returns: + 是否成功 + """ + dir_val = 0 if direction.upper() in ["CW", "顺时针"] else 1 + speed_bytes = struct.pack('>H', int(speed_rpm)) + self.status = f"moving_to_{pulses}" + pulse_bytes = struct.pack('>I', int(pulses)) + abs_flag = 1 if absolute else 0 + + payload = [ + dir_val, + speed_bytes[0], speed_bytes[1], + acceleration, + pulse_bytes[0], pulse_bytes[1], pulse_bytes[2], pulse_bytes[3], + abs_flag, + 0 + ] + + resp = self._send(0xFD, payload) + return len(resp) >= 4 + + def stop(self) -> bool: + """ + 停止电机 + + Returns: + 是否成功 + """ + self.status = "idle" + resp = self._send(0xFE, [0x98, 0]) + return len(resp) >= 4 + + def rotate_quarter(self, speed_rpm: int = 60, direction: str = "CW") -> bool: + """ + 电机旋转 1/4 圈 (阻塞式) + 假设电机细分为 3200 脉冲/圈,1/4 圈 = 800 脉冲 + """ + pulses = 800 + success = self.move_position(pulses=pulses, speed_rpm=speed_rpm, direction=direction, absolute=False) + + if success: + # 计算预估旋转时间并进行阻塞等待 (Time = revolutions / (RPM/60)) + # 1/4 rev / (RPM/60) = 15.0 / RPM + estimated_time = 15.0 / max(1, speed_rpm) + time.sleep(estimated_time + 0.5) # 额外给 0.5 秒缓冲 + self.status = "idle" + + return success + + def wait_time(self, duration_s: float) -> bool: + """ + 等待指定时间 (秒) + """ + self.logger.info(f"Waiting for {duration_s} seconds...") + time.sleep(duration_s) + return True + + def set_zero(self) -> bool: + """ + 清零当前位置 + + Returns: + 是否成功 + """ + resp = self._send(0x0A, []) + return len(resp) >= 4 + + def get_position(self) -> Optional[int]: + """ + 读取当前位置 (脉冲数) + + Returns: + 当前位置脉冲数,失败返回 None + """ + resp = self._send(0x32, []) + + if len(resp) >= 8: + # 响应格式: [ID] [Func] [符号位] [数值4字节] [校验] + sign = resp[2] # 0=正, 1=负 + value = struct.unpack('>I', resp[3:7])[0] + self.position = -value if sign == 1 else value + + if self.debug: + print(f"[Position] Raw: {resp.hex().upper()}, Parsed: {self.position}") + + return self.position + + self.logger.warning("Failed to read position") + return None + + def close(self): + """关闭串口连接并停止线程""" + if hasattr(self, '_stop_event'): + self._stop_event.set() + + if self.ser and self.ser.is_open: + self.ser.close() + self.logger.info("Serial port closed") + + +# ============================================================ +# 测试和调试代码 +# ============================================================ + +def test_motor(): + """基础功能测试""" + logging.basicConfig(level=logging.INFO) + + print("="*60) + print("ZDT X42 电机驱动测试") + print("="*60) + + driver = ZDTX42Driver( + port="/dev/tty.usbserial-3110", + baudrate=115200, + device_id=2, + debug=True + ) + + if not driver.ser: + print("❌ 串口打开失败") + return + + try: + # 测试 1: 读取位置 + print("\n[1] 读取当前位置") + pos = driver.get_position() + print(f"✓ 当前位置: {pos} 脉冲") + + # 测试 2: 使能 + print("\n[2] 使能电机") + driver.enable(True) + time.sleep(0.3) + print("✓ 电机已锁定") + + # 测试 3: 相对位置运动 + print("\n[3] 相对位置运动 (1000脉冲)") + driver.move_position(pulses=1000, speed_rpm=60, direction="CW") + time.sleep(2) + pos = driver.get_position() + print(f"✓ 新位置: {pos}") + + # 测试 4: 速度运动 + print("\n[4] 速度模式 (30RPM, 3秒)") + driver.move_speed(speed_rpm=30, direction="CW") + time.sleep(3) + driver.stop() + pos = driver.get_position() + print(f"✓ 停止后位置: {pos}") + + # 测试 5: 禁用 + print("\n[5] 禁用电机") + driver.enable(False) + print("✓ 电机已松开") + + print("\n" + "="*60) + print("✅ 测试完成") + print("="*60) + + except Exception as e: + print(f"\n❌ 测试失败: {e}") + import traceback + traceback.print_exc() + finally: + driver.close() + + +if __name__ == "__main__": + test_motor() diff --git a/unilabos/devices/separator/xkc_sensor.py b/unilabos/devices/separator/xkc_sensor.py new file mode 100644 index 0000000..c954a2e --- /dev/null +++ b/unilabos/devices/separator/xkc_sensor.py @@ -0,0 +1,379 @@ +# -*- coding: utf-8 -*- +""" +XKC RS485 液位传感器 (Modbus RTU) + +说明: + 1. 遵循 Modbus-RTU 协议。 + 2. 数据寄存器: 0x0001 (液位状态, 1=有液, 0=无液), 0x0002 (RSSI 信号强度)。 + 3. 地址寄存器: 0x0004 (可读写, 范围 1-254)。 + 4. 波特率寄存器: 0x0005 (可写, 代码表见 change_baudrate 方法)。 +""" + +import struct +import threading +import time +import logging +import serial +from typing import Optional, Dict, Any, List + +from unilabos.device_comms.universal_driver import UniversalDriver + +class TransportManager: + """ + 统一通信管理类。 + 仅支持 串口 (Serial/有线) 连接。 + """ + def __init__(self, port: str, baudrate: int = 9600, timeout: float = 3.0, logger=None): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.logger = logger + self.lock = threading.RLock() # 线程锁,确保多设备共用一个连接时不冲突 + + self.serial = None + self._connect_serial() + + def _connect_serial(self): + try: + self.serial = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=self.timeout + ) + except Exception as e: + raise ConnectionError(f"Serial open failed: {e}") + + def close(self): + """关闭连接""" + if self.serial and self.serial.is_open: + self.serial.close() + + def clear_buffer(self): + """清空缓冲区 (Thread-safe)""" + with self.lock: + if self.serial: + self.serial.reset_input_buffer() + + def write(self, data: bytes): + """发送原始字节""" + with self.lock: + if self.serial: + self.serial.write(data) + + def read(self, size: int) -> bytes: + """读取指定长度字节""" + if self.serial: + return self.serial.read(size) + return b'' + +class XKCSensorDriver(UniversalDriver): + """XKC RS485 液位传感器 (Modbus RTU)""" + + def __init__(self, port: str, baudrate: int = 9600, device_id: int = 6, + threshold: int = 300, timeout: float = 3.0, debug: bool = False): + super().__init__() + self.port = port + self.baudrate = baudrate + self.device_id = device_id + self.threshold = threshold + self.timeout = timeout + self.debug = debug + self.level = False + self.rssi = 0 + self.status = {"level": self.level, "rssi": self.rssi} + + try: + self.transport = TransportManager(port, baudrate, timeout, logger=self.logger) + self.logger.info(f"XKCSensorDriver connected to {port} (ID: {device_id})") + except Exception as e: + self.logger.error(f"Failed to connect XKCSensorDriver: {e}") + self.transport = None + + # 启动背景轮询线程,确保 status 实时刷新 + self._stop_event = threading.Event() + self._polling_thread = threading.Thread( + target=self._update_loop, + name=f"XKCPolling_{port}", + daemon=True + ) + if self.transport: + self._polling_thread.start() + + def _update_loop(self): + """背景循环读取传感器数据""" + while not self._stop_event.is_set(): + try: + self.read_level() + except Exception as e: + if self.debug: + self.logger.error(f"Polling error: {e}") + time.sleep(2.0) # 每2秒刷新一次数据 + + def _crc(self, data: bytes) -> bytes: + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 + else: crc >>= 1 + return struct.pack(' Optional[Dict[str, Any]]: + """ + 读取液位。 + 返回: {'level': bool, 'rssi': int} + """ + if not self.transport: + return None + + with self.transport.lock: + self.transport.clear_buffer() + # Modbus Read Registers: 01 03 00 01 00 02 CRC + payload = struct.pack('>HH', 0x0001, 0x0002) + msg = struct.pack('BB', self.device_id, 0x03) + payload + msg += self._crc(msg) + + if self.debug: + self.logger.info(f"TX (ID {self.device_id}): {msg.hex().upper()}") + + self.transport.write(msg) + + # Read header + h = self.transport.read(3) # Addr, Func, Len + if self.debug: + self.logger.info(f"RX Header: {h.hex().upper()}") + + if len(h) < 3: return None + length = h[2] + + # Read body + CRC + body = self.transport.read(length + 2) + if self.debug: + self.logger.info(f"RX Body+CRC: {body.hex().upper()}") + if len(body) < length + 2: + # Firmware bug fix specific to some modules + if len(body) == 4 and length == 4: + pass + else: + return None + + data = body[:-2] + # 根据手册说明: + # 寄存器 0x0001 (data[0:2]): 液位状态 (00 01 为有液, 00 00 为无液) + # 寄存器 0x0002 (data[2:4]): 信号强度 RSSI + + hw_level = False + rssi = 0 + + if len(data) >= 4: + hw_level = ((data[0] << 8) | data[1]) == 1 + rssi = (data[2] << 8) | data[3] + elif len(data) == 2: + # 兼容模式: 某些老固件可能只返回 1 个寄存器 + rssi = (data[0] << 8) | data[1] + hw_level = rssi > self.threshold + else: + return None + + # 最终判定: 优先使用硬件层级的 level 判定,但 RSSI 阈值逻辑作为补充/校验 + # 注意: 如果用户显式设置了 THRESHOLD,我们可以在逻辑中做权衡 + self.level = hw_level or (rssi > self.threshold) + self.rssi = rssi + result = { + 'level': self.level, + 'rssi': self.rssi + } + self.status = result + return result + + def wait_level(self, target_state: bool, timeout: float = 60.0) -> bool: + """ + 等待液位达到目标状态 (阻塞式) + """ + self.logger.info(f"Waiting for level: {target_state}") + start_time = time.time() + while (time.time() - start_time) < timeout: + res = self.read_level() + if res and res.get('level') == target_state: + return True + time.sleep(0.5) + self.logger.warning(f"Wait level timeout ({timeout}s)") + return False + + def wait_for_liquid(self, target_state: bool, timeout: float = 120.0) -> bool: + """ + 实时检测电导率(RSSI)并等待用户指定的“有液”或“无液”状态。 + 一旦检测到符合目标状态,立即返回。 + + Args: + target_state: True 为“有液”, False 为“无液” + timeout: 最大等待时间(秒) + """ + state_str = "有液" if target_state else "无液" + self.logger.info(f"开始实时检测电导率,等待状态: {state_str} (超时: {timeout}s)") + + start_time = time.time() + while (time.time() - start_time) < timeout: + res = self.read_level() # 内部已更新 self.level 和 self.rssi + if res: + current_level = res.get('level') + current_rssi = res.get('rssi') + if current_level == target_state: + self.logger.info(f"✅ 检测到目标状态: {state_str} (当前电导率/RSSI: {current_rssi})") + return True + + if self.debug: + self.logger.debug(f"当前状态: {'有液' if current_level else '无液'}, RSSI: {current_rssi}") + + time.sleep(0.2) # 高频采样 + + self.logger.warning(f"❌ 等待 {state_str} 状态超时 ({timeout}s)") + return False + + def set_threshold(self, threshold: int): + """设置液位判定阈值""" + self.threshold = int(threshold) + self.logger.info(f"Threshold updated to: {self.threshold}") + + def change_device_id(self, new_id: int) -> bool: + """ + 修改设备的 Modbus 从站地址。 + 寄存器: 0x0004, 功能码: 0x06 + """ + if not (1 <= new_id <= 254): + self.logger.error(f"Invalid device ID: {new_id}. Must be 1-254.") + return False + + self.logger.info(f"Changing device ID from {self.device_id} to {new_id}") + success = self._write_single_register(0x0004, new_id) + if success: + self.device_id = new_id # 更新内存中的地址 + self.logger.info(f"Device ID update command sent successfully (target {new_id}).") + return success + + def change_baudrate(self, baud_code: int) -> bool: + """ + 更改通讯波特率 (寄存器: 0x0005)。 + 设置成功后传感器 LED 会闪烁,通常无数据返回。 + + 波特率代码对照表 (16进制): + 05: 2400 + 06: 4800 + 07: 9600 (默认) + 08: 14400 + 09: 19200 + 0A: 28800 + 0C: 57600 + 0D: 115200 + 0E: 128000 + 0F: 256000 + """ + self.logger.info(f"Sending baudrate change command (Code: {baud_code:02X})") + # 写入寄存器 0x0005 + self._write_single_register(0x0005, baud_code) + self.logger.info("Baudrate change command executed. Device LED should flash. Please update connection settings.") + return True + + def factory_reset(self) -> bool: + """ + 恢复出厂设置 (通过广播地址 FF)。 + 设置地址为 01,逻辑为向 0x0004 写入 0x0002 + """ + self.logger.info("Sending factory reset command via broadcast address FF...") + # 广播指令通常无回显 + self._write_single_register(0x0004, 0x0002, slave_id=0xFF) + self.logger.info("Factory reset command sent. Device address should be 01 now.") + return True + + def _write_single_register(self, reg_addr: int, value: int, slave_id: Optional[int] = None) -> bool: + """内部辅助函数: Modbus 功能码 06 写单个寄存器""" + if not self.transport: return False + + target_id = slave_id if slave_id is not None else self.device_id + msg = struct.pack('BBHH', target_id, 0x06, reg_addr, value) + msg += self._crc(msg) + + with self.transport.lock: + self.transport.clear_buffer() + if self.debug: + self.logger.info(f"TX Write (Reg {reg_addr:#06x}): {msg.hex().upper()}") + + self.transport.write(msg) + + # 广播地址、波特率修改或厂家特定指令可能无回显 + if target_id == 0xFF or reg_addr == 0x0005: + time.sleep(0.5) + return True + + # 等待返回 (正常应返回相同报文) + resp = self.transport.read(len(msg)) + if self.debug: + self.logger.info(f"RX Write Response: {resp.hex().upper()}") + + return resp == msg + + def close(self): + if self.transport: + self.transport.close() + +if __name__ == "__main__": + # 快速实例化测试 + import logging + # 减少冗余日志,仅显示重要信息 + logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') + + # 硬件配置 (根据实际情况修改) + TEST_PORT = "/dev/tty.usbserial-3110" + SLAVE_ID = 1 + THRESHOLD = 300 + + print("\n" + "="*50) + print(f" XKC RS485 传感器独立测试程序") + print(f" 端口: {TEST_PORT} | 地址: {SLAVE_ID} | 阈值: {THRESHOLD}") + print("="*50) + + sensor = XKCSensorDriver(port=TEST_PORT, device_id=SLAVE_ID, threshold=THRESHOLD, debug=False) + + try: + if sensor.transport: + print(f"\n开始实时连续采样测试 (持续 15 秒)...") + print(f"按 Ctrl+C 可提前停止\n") + + start_time = time.time() + duration = 15 + count = 0 + + while time.time() - start_time < duration: + count += 1 + res = sensor.read_level() + if res: + rssi = res['rssi'] + level = res['level'] + status_str = "【有液】" if level else "【无液】" + # 使用 \r 实现单行刷新显示 (或者不刷,直接打印历史) + # 为了方便查看变化,我们直接打印 + elapsed = time.time() - start_time + print(f" [{elapsed:4.1f}s] 采样 {count:<3}: 电导率/RSSI = {rssi:<5} | 判定结果: {status_str}") + else: + print(f" [{time.time()-start_time:4.1f}s] 采样 {count:<3}: 通信失败 (无响应)") + + time.sleep(0.5) # 每秒采样 2 次 + + print(f"\n--- 15 秒采样测试完成 (总计 {count} 次) ---") + + # [3] 测试动态修改阈值 + print(f"\n[3] 动态修改阈值演示...") + new_threshold = 400 + sensor.set_threshold(new_threshold) + res = sensor.read_level() + if res: + print(f" 采样 (当前阈值={new_threshold}): 电导率/RSSI = {res['rssi']:<5} | 判定结果: {'【有液】' if res['level'] else '【无液】'}") + sensor.set_threshold(THRESHOLD) # 还原 + + except KeyboardInterrupt: + print("\n[!] 用户中断测试") + except Exception as e: + print(f"\n[!] 测试运行出错: {e}") + finally: + sensor.close() + print("\n--- 测试程序已退出 ---\n") diff --git a/unilabos/registry/devices/motor.yaml b/unilabos/registry/devices/motor.yaml new file mode 100644 index 0000000..7b603ae --- /dev/null +++ b/unilabos/registry/devices/motor.yaml @@ -0,0 +1,286 @@ +motor.zdt_x42: + category: + - motor + class: + action_value_mappings: + auto-enable: + feedback: {} + goal: {} + goal_default: + 'on': true + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 使能或禁用电机。使能后电机进入锁轴状态,可接收运动指令;禁用后电机进入松轴状态。 + properties: + feedback: {} + goal: + properties: + 'on': + default: true + type: boolean + required: [] + type: object + result: {} + required: + - goal + title: enable参数 + type: object + type: UniLabJsonCommand + auto-get_position: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 获取当前电机脉冲位置。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: + properties: + position: + type: integer + type: object + required: + - goal + title: get_position参数 + type: object + type: UniLabJsonCommand + auto-move_position: + feedback: {} + goal: {} + goal_default: + absolute: false + acceleration: 10 + direction: CW + pulses: 1000 + speed_rpm: 60 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 位置模式运行。控制电机移动到指定脉冲位置或相对于当前位置移动指定脉冲数。 + properties: + feedback: {} + goal: + properties: + absolute: + default: false + type: boolean + acceleration: + default: 10 + maximum: 255 + minimum: 0 + type: integer + direction: + default: CW + enum: + - CW + - CCW + type: string + pulses: + default: 1000 + type: integer + speed_rpm: + default: 60 + minimum: 0 + type: integer + required: + - pulses + - speed_rpm + type: object + result: {} + required: + - goal + title: move_position参数 + type: object + type: UniLabJsonCommand + auto-move_speed: + feedback: {} + goal: {} + goal_default: + acceleration: 10 + direction: CW + speed_rpm: 60 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 速度模式运行。控制电机以指定转速和方向持续转动。 + properties: + feedback: {} + goal: + properties: + acceleration: + default: 10 + maximum: 255 + minimum: 0 + type: integer + direction: + default: CW + enum: + - CW + - CCW + type: string + speed_rpm: + default: 60 + minimum: 0 + type: integer + required: + - speed_rpm + type: object + result: {} + required: + - goal + title: move_speed参数 + type: object + type: UniLabJsonCommand + auto-rotate_quarter: + feedback: {} + goal: {} + goal_default: + direction: CW + speed_rpm: 60 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 电机旋转 1/4 圈 (阻塞式)。 + properties: + feedback: {} + goal: + properties: + direction: + default: CW + enum: + - CW + - CCW + type: string + speed_rpm: + default: 60 + minimum: 1 + type: integer + required: [] + type: object + result: {} + required: + - goal + title: rotate_quarter参数 + type: object + type: UniLabJsonCommand + auto-set_zero: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 将当前电机位置设为零点。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: set_zero参数 + type: object + type: UniLabJsonCommand + auto-stop: + feedback: {} + goal: {} + goal_default: {} + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 立即停止电机运动。 + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: stop参数 + type: object + type: UniLabJsonCommand + auto-wait_time: + feedback: {} + goal: {} + goal_default: + duration_s: 1.0 + handles: {} + placeholder_keys: {} + result: {} + schema: + description: 等待指定时间 (秒)。 + properties: + feedback: {} + goal: + properties: + duration_s: + default: 1.0 + minimum: 0 + type: number + required: + - duration_s + type: object + result: {} + required: + - goal + title: wait_time参数 + type: object + type: UniLabJsonCommand + module: unilabos.devices.motor.ZDT_X42:ZDTX42Driver + status_types: + position: int + status: str + type: python + config_info: [] + description: ZDT X42 闭环步进电机驱动。支持速度运行、精确位置控制、位置查询和清零功能。适用于各种需要精确运动控制的实验室自动化场景。 + handles: [] + icon: '' + init_param_schema: + config: + properties: + baudrate: + default: 115200 + type: integer + debug: + default: false + type: boolean + device_id: + default: 1 + type: integer + port: + type: string + timeout: + default: 0.5 + type: number + required: + - port + type: object + data: + properties: + position: + type: integer + status: + type: string + required: + - status + - position + type: object + version: 1.0.0 diff --git a/unilabos/registry/devices/sensor.yaml b/unilabos/registry/devices/sensor.yaml new file mode 100644 index 0000000..81d05b0 --- /dev/null +++ b/unilabos/registry/devices/sensor.yaml @@ -0,0 +1,148 @@ +sensor.xkc_rs485: + category: + - sensor + - separator + class: + action_value_mappings: + auto-change_baudrate: + goal: + baud_code: 7 + handles: {} + schema: + description: '更改通讯波特率 (设置成功后无返回,且需手动切换波特率重连)。代码表 (16进制): 05=2400, 06=4800, + 07=9600, 08=14400, 09=19200, 0A=28800, 0C=57600, 0D=115200, 0E=128000, + 0F=256000' + properties: + goal: + properties: + baud_code: + description: '波特率代码 (例如: 7 为 9600, 13 即 0x0D 为 115200)' + type: integer + required: + - baud_code + type: object + type: UniLabJsonCommand + auto-change_device_id: + goal: + new_id: 1 + handles: {} + schema: + description: 修改传感器的 Modbus 从站地址 + properties: + goal: + properties: + new_id: + description: 新的从站地址 (1-254) + maximum: 254 + minimum: 1 + type: integer + required: + - new_id + type: object + type: UniLabJsonCommand + auto-factory_reset: + goal: {} + handles: {} + schema: + description: 恢复出厂设置 (地址重置为 01) + properties: + goal: + type: object + type: UniLabJsonCommand + auto-read_level: + goal: {} + handles: {} + schema: + description: 直接读取当前液位及信号强度 + properties: + goal: + type: object + type: object + type: UniLabJsonCommand + auto-set_threshold: + goal: + threshold: 300 + handles: {} + schema: + description: 设置液位判定阈值 + properties: + goal: + properties: + threshold: + type: integer + required: + - threshold + type: object + type: UniLabJsonCommand + auto-wait_for_liquid: + goal: + target_state: true + timeout: 120 + handles: {} + schema: + description: 实时检测电导率(RSSI)并等待用户指定的状态 + properties: + goal: + properties: + target_state: + default: true + description: 目标状态 (True=有液, False=无液) + type: boolean + timeout: + default: 120 + description: 超时时间 (秒) + required: + - target_state + type: object + type: UniLabJsonCommand + auto-wait_level: + goal: + level: true + timeout: 10 + handles: {} + schema: + description: 等待液位达到目标状态 + properties: + goal: + properties: + level: + type: boolean + timeout: + type: number + required: + - level + type: object + type: UniLabJsonCommand + module: unilabos.devices.separator.xkc_sensor:XKCSensorDriver + status_types: + level: bool + rssi: int + type: python + config_info: [] + description: XKC RS485 非接触式液位传感器 (Modbus RTU) + handles: [] + icon: '' + init_param_schema: + config: + properties: + baudrate: + default: 9600 + type: integer + debug: + default: false + type: boolean + device_id: + default: 1 + type: integer + port: + type: string + threshold: + default: 300 + type: integer + timeout: + default: 3.0 + type: number + required: + - port + type: object + version: 1.0.0 diff --git a/unilabos/test/experiments/xkc_sensor_test.json b/unilabos/test/experiments/xkc_sensor_test.json new file mode 100644 index 0000000..ef50dde --- /dev/null +++ b/unilabos/test/experiments/xkc_sensor_test.json @@ -0,0 +1,29 @@ +{ + "nodes": [ + { + "id": "Liquid_Sensor_1", + "name": "XKC Sensor", + "children": [], + "parent": null, + "type": "device", + "class": "sensor.xkc_rs485", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "port": "/dev/tty.usbserial-3110", + "baudrate": 9600, + "device_id": 1, + "threshold": 300, + "timeout": 3.0 + }, + "data": { + "level": false, + "rssi": 0 + } + } + ], + "links": [] +} \ No newline at end of file diff --git a/unilabos/test/experiments/zdt_motor_test.json b/unilabos/test/experiments/zdt_motor_test.json new file mode 100644 index 0000000..692e40e --- /dev/null +++ b/unilabos/test/experiments/zdt_motor_test.json @@ -0,0 +1,28 @@ +{ + "nodes": [ + { + "id": "ZDT_Motor", + "name": "ZDT Motor", + "children": [], + "parent": null, + "type": "device", + "class": "motor.zdt_x42", + "position": { + "x": 0, + "y": 0, + "z": 0 + }, + "config": { + "port": "/dev/tty.usbserial-3110", + "baudrate": 115200, + "device_id": 1, + "debug": true + }, + "data": { + "position": 0, + "status": "idle" + } + } + ], + "links": [] +} \ No newline at end of file