diff --git a/unilabos/devices/laiyu_liquid_test/driver_enable_move_test.py b/unilabos/devices/laiyu_liquid_test/driver_enable_move_test.py new file mode 100644 index 00000000..a3c5797d --- /dev/null +++ b/unilabos/devices/laiyu_liquid_test/driver_enable_move_test.py @@ -0,0 +1,138 @@ + +import os +import time +import json +import logging +from xyz_stepper_driver import ModbusRTUTransport, ModbusClient, XYZStepperController, MotorStatus + +# ========== 日志配置 ========== +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("XYZ_Debug") + + +def create_controller(port: str = "/dev/ttyUSB1", baudrate: int = 115200) -> XYZStepperController: + """ + 初始化通信层与三轴控制器 + """ + logger.info(f"🔧 初始化控制器: {port} @ {baudrate}bps") + transport = ModbusRTUTransport(port=port, baudrate=baudrate) + transport.open() + client = ModbusClient(transport) + return XYZStepperController(client=client, port=port, baudrate=baudrate) + + +def load_existing_soft_zero(ctrl: XYZStepperController, path: str = "work_origin.json") -> bool: + """ + 如果已存在软零点文件则加载,否则返回 False + """ + if not os.path.exists(path): + logger.warning("⚠ 未找到已有软零点文件,将等待人工定义新零点。") + return False + + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + origin = data.get("work_origin_steps", {}) + ctrl.work_origin_steps = origin + ctrl.is_homed = True + logger.info(f"✔ 已加载软零点文件:{path}") + logger.info(f"当前软零点步数: {origin}") + return True + except Exception as e: + logger.error(f"读取软零点文件失败: {e}") + return False + + +def test_enable_axis(ctrl: XYZStepperController): + """ + 依次使能 X / Y / Z 三轴 + """ + logger.info("=== 测试各轴使能 ===") + for axis in ["X", "Y", "Z"]: + try: + result = ctrl.enable(axis, True) + if result: + vals = ctrl.get_status(axis) + st = MotorStatus(vals[3]) + logger.info(f"{axis} 轴使能成功,当前状态: {st.name}") + else: + logger.error(f"{axis} 轴使能失败") + except Exception as e: + logger.error(f"{axis} 轴使能异常: {e}") + time.sleep(0.5) + + +def test_status_read(ctrl: XYZStepperController): + """ + 读取各轴当前状态(调试) + """ + logger.info("=== 当前各轴状态 ===") + for axis in ["X", "Y", "Z"]: + try: + vals = ctrl.get_status(axis) + st = MotorStatus(vals[3]) + logger.info( + f"{axis}: steps={vals[0]}, speed={vals[1]}, " + f"current={vals[2]}, status={st.name}" + ) + except Exception as e: + logger.error(f"获取 {axis} 状态失败: {e}") + time.sleep(0.2) + + +def redefine_soft_zero(ctrl: XYZStepperController): + """ + 手动重新定义软零点 + """ + logger.info("=== ⚙️ 重新定义软零点 ===") + ctrl.define_current_as_zero("work_origin.json") + logger.info("✅ 新软零点已写入 work_origin.json") + + +def test_soft_zero_move(ctrl: XYZStepperController): + """ + 以软零点为基准执行三轴运动测试 + """ + logger.info("=== 测试软零点相对运动 ===") + ctrl.move_xyz_work(x=100.0, y=100.0, z=40.0, speed=100, acc=800) + + for axis in ["X", "Y", "Z"]: + ctrl.wait_complete(axis) + + test_status_read(ctrl) + logger.info("✅ 软零点运动测试完成") + + +def main(): + ctrl = create_controller(port="/dev/ttyUSB1", baudrate=115200) + + try: + test_enable_axis(ctrl) + test_status_read(ctrl) + + # === 初始化或加载软零点 === + loaded = load_existing_soft_zero(ctrl) + if not loaded: + logger.info("👣 首次运行,定义软零点并保存。") + ctrl.define_current_as_zero("work_origin.json") + + # === 软零点回归动作 === + ctrl.return_to_work_origin() + + # === 可选软零点运动测试 === + # test_soft_zero_move(ctrl) + + except KeyboardInterrupt: + logger.info("🛑 手动中断退出") + + except Exception as e: + logger.exception(f"❌ 调试出错: {e}") + + finally: + if hasattr(ctrl.client, "transport"): + ctrl.client.transport.close() + logger.info("串口已安全关闭 ✅") + + +if __name__ == "__main__": + main() diff --git a/unilabos/devices/laiyu_liquid_test/driver_status_test.py b/unilabos/devices/laiyu_liquid_test/driver_status_test.py new file mode 100644 index 00000000..f6960d5b --- /dev/null +++ b/unilabos/devices/laiyu_liquid_test/driver_status_test.py @@ -0,0 +1,58 @@ + +import logging +from xyz_stepper_driver import ( + ModbusRTUTransport, + ModbusClient, + XYZStepperController, + MotorAxis, +) + +logger = logging.getLogger("XYZStepperCommTest") +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + + +def test_xyz_stepper_comm(): + """仅测试 Modbus 通信是否正常(并输出寄存器数据,不做电机运动)""" + port = "/dev/ttyUSB1" + baudrate = 115200 + timeout = 1.2 # 略长避免响应被截断 + + logger.info(f"尝试连接 Modbus 设备 {port} ...") + transport = ModbusRTUTransport(port, baudrate=baudrate, timeout=timeout) + transport.open() + + client = ModbusClient(transport) + ctrl = XYZStepperController(client) + + try: + logger.info("✅ 串口已打开,开始读取三个轴状态(打印寄存器内容) ...") + for axis in [MotorAxis.X, MotorAxis.Y, MotorAxis.Z]: + addr = ctrl.axis_addr[axis] + + try: + # # 在 get_status 前打印原始寄存器内容 + # regs = client.read_registers(addr, ctrl.REG_STATUS, 6) + # hex_regs = [f"0x{val:04X}" for val in regs] + # logger.info(f"[{axis.name}] 原始寄存器 ({len(regs)} 个): {regs} -> {hex_regs}") + + # 调用 get_status() 正常解析 + status = ctrl.get_status(axis) + logger.info( + f"[{axis.name}] ✅ 通信正常: steps={status.steps}, speed={status.speed}, " + f"current={status.current}, status={status.status.name}" + ) + + except Exception as e_axis: + logger.error(f"[{axis.name}] ❌ 通信失败: {e_axis}") + + + except Exception as e: + logger.error(f"❌ 通讯测试失败: {e}") + + finally: + transport.close() + logger.info("🔌 串口已关闭") + + +if __name__ == "__main__": + test_xyz_stepper_comm() diff --git a/unilabos/devices/laiyu_liquid_test/work_origin.json b/unilabos/devices/laiyu_liquid_test/work_origin.json new file mode 100644 index 00000000..935c3e3b --- /dev/null +++ b/unilabos/devices/laiyu_liquid_test/work_origin.json @@ -0,0 +1,8 @@ +{ + "work_origin_steps": { + "x": 11799, + "y": 11476, + "z": 3312 + }, + "timestamp": "2025-11-04T15:31:09.802155" +} \ No newline at end of file diff --git a/unilabos/devices/laiyu_liquid_test/xyz_stepper_driver.py b/unilabos/devices/laiyu_liquid_test/xyz_stepper_driver.py new file mode 100644 index 00000000..6ad37ed7 --- /dev/null +++ b/unilabos/devices/laiyu_liquid_test/xyz_stepper_driver.py @@ -0,0 +1,336 @@ + +""" +XYZ 三轴步进电机驱动(统一字符串参数版) +基于 Modbus RTU 协议 +Author: Xiuyu Chen (Modified by Assistant) +""" + +import serial # type: ignore +import struct +import time +import logging +from enum import Enum +from dataclasses import dataclass +from typing import Optional, List, Dict + +# ========== 日志配置 ========== +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("XYZStepper") + + +# ========== 层 1:Modbus RTU ========== +class ModbusException(Exception): + pass + + +class ModbusRTUTransport: + """底层串口通信层""" + + def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.2): + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.ser: Optional[serial.Serial] = None + + def open(self): + try: + self.ser = serial.Serial( + port=self.port, + baudrate=self.baudrate, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=0.02, + write_timeout=0.5, + ) + logger.info(f"[RTU] 串口连接成功: {self.port}") + except Exception as e: + raise ModbusException(f"无法打开串口 {self.port}: {e}") + + def close(self): + if self.ser and self.ser.is_open: + self.ser.close() + logger.info("[RTU] 串口已关闭") + + def send(self, frame: bytes): + if not self.ser or not self.ser.is_open: + raise ModbusException("串口未连接") + + self.ser.reset_input_buffer() + self.ser.write(frame) + self.ser.flush() + logger.debug(f"[TX] {frame.hex(' ').upper()}") + + def receive(self, expected_len: int) -> bytes: + if not self.ser or not self.ser.is_open: + raise ModbusException("串口未连接") + + start = time.time() + buf = bytearray() + while len(buf) < expected_len and (time.time() - start) < self.timeout: + chunk = self.ser.read(expected_len - len(buf)) + if chunk: + buf.extend(chunk) + else: + time.sleep(0.01) + return bytes(buf) + + +# ========== 层 2:Modbus 协议 ========== +class ModbusFunction(Enum): + READ_HOLDING_REGISTERS = 0x03 + WRITE_SINGLE_REGISTER = 0x06 + WRITE_MULTIPLE_REGISTERS = 0x10 + + +class ModbusClient: + """Modbus RTU 客户端""" + + def __init__(self, transport: ModbusRTUTransport): + self.transport = transport + + @staticmethod + def calc_crc(data: bytes) -> bytes: + crc = 0xFFFF + for b in data: + crc ^= b + for _ in range(8): + crc = (crc >> 1) ^ 0xA001 if crc & 1 else crc >> 1 + return struct.pack(" bytes: + frame = bytes([addr, func]) + payload + full = frame + self.calc_crc(frame) + self.transport.send(full) + time.sleep(0.01) + resp = self.transport.ser.read(256) + if not resp: + raise ModbusException("未收到响应") + + start = resp.find(bytes([addr, func])) + if start > 0: + resp = resp[start:] + if len(resp) < 5: + raise ModbusException(f"响应长度不足: {resp.hex(' ').upper()}") + if self.calc_crc(resp[:-2]) != resp[-2:]: + raise ModbusException("CRC 校验失败") + return resp + + def read_registers(self, addr: int, start: int, count: int) -> List[int]: + payload = struct.pack(">HH", start, count) + resp = self.send_request(addr, ModbusFunction.READ_HOLDING_REGISTERS.value, payload) + byte_count = resp[2] + regs = [struct.unpack(">H", resp[3 + i:5 + i])[0] for i in range(0, byte_count, 2)] + return regs + + def write_single_register(self, addr: int, reg: int, val: int) -> bool: + payload = struct.pack(">HH", reg, val) + resp = self.send_request(addr, ModbusFunction.WRITE_SINGLE_REGISTER.value, payload) + return resp[1] == ModbusFunction.WRITE_SINGLE_REGISTER.value + + def write_multiple_registers(self, addr: int, start: int, values: List[int]) -> bool: + byte_count = len(values) * 2 + payload = struct.pack(">HHB", start, len(values), byte_count) + payload += b"".join(struct.pack(">H", v & 0xFFFF) for v in values) + resp = self.send_request(addr, ModbusFunction.WRITE_MULTIPLE_REGISTERS.value, payload) + return resp[1] == ModbusFunction.WRITE_MULTIPLE_REGISTERS.value + + +# ========== 层 3:业务逻辑 ========== +class MotorAxis(Enum): + X = 1 + Y = 2 + Z = 3 + + +class MotorStatus(Enum): + STANDBY = 0 + RUNNING = 1 + COLLISION_STOP = 2 + FORWARD_LIMIT_STOP = 3 + REVERSE_LIMIT_STOP = 4 + + +@dataclass +class MotorPosition: + steps: int + speed: int + current: int + status: MotorStatus + + +class XYZStepperController: + """XYZ 三轴步进控制器(字符串接口版)""" + + STEPS_PER_REV = 16384 + LEAD_MM_X, LEAD_MM_Y, LEAD_MM_Z = 80.0, 80.0, 5.0 + STEPS_PER_MM_X = STEPS_PER_REV / LEAD_MM_X + STEPS_PER_MM_Y = STEPS_PER_REV / LEAD_MM_Y + STEPS_PER_MM_Z = STEPS_PER_REV / LEAD_MM_Z + + REG_STATUS, REG_POS_HIGH, REG_POS_LOW = 0x00, 0x01, 0x02 + REG_ACTUAL_SPEED, REG_CURRENT, REG_ENABLE = 0x03, 0x05, 0x06 + REG_ZERO_CMD, REG_TARGET_HIGH, REG_TARGET_LOW = 0x0F, 0x10, 0x11 + REG_SPEED, REG_ACCEL, REG_PRECISION, REG_START = 0x13, 0x14, 0x15, 0x16 + REG_COMMAND = 0x60 + + def __init__(self, client: Optional[ModbusClient] = None, + port="/dev/ttyUSB0", baudrate=115200, + origin_path="unilabos/devices/laiyu_liquid_test/work_origin.json"): + if client is None: + transport = ModbusRTUTransport(port, baudrate) + transport.open() + self.client = ModbusClient(transport) + else: + self.client = client + + self.axis_addr = {MotorAxis.X: 1, MotorAxis.Y: 2, MotorAxis.Z: 3} + self.work_origin_steps = {"x": 0, "y": 0, "z": 0} + self.is_homed = False + self._load_work_origin(origin_path) + + # ========== 基础工具 ========== + @staticmethod + def s16(v: int) -> int: + return v - 0x10000 if v & 0x8000 else v + + @staticmethod + def s32(h: int, l: int) -> int: + v = (h << 16) | l + return v - 0x100000000 if v & 0x80000000 else v + + @classmethod + def mm_to_steps(cls, axis: str, mm: float = 0.0) -> int: + axis = axis.upper() + if axis == "X": + return int(mm * cls.STEPS_PER_MM_X) + elif axis == "Y": + return int(mm * cls.STEPS_PER_MM_Y) + elif axis == "Z": + return int(mm * cls.STEPS_PER_MM_Z) + raise ValueError(f"未知轴: {axis}") + + @classmethod + def steps_to_mm(cls, axis: str, steps: int) -> float: + axis = axis.upper() + if axis == "X": + return steps / cls.STEPS_PER_MM_X + elif axis == "Y": + return steps / cls.STEPS_PER_MM_Y + elif axis == "Z": + return steps / cls.STEPS_PER_MM_Z + raise ValueError(f"未知轴: {axis}") + + # ========== 状态与控制 ========== + def get_status(self, axis: str = "Z") -> list: + """返回简化数组格式: [steps, speed, current, status_value]""" + if isinstance(axis, MotorAxis): + axis_enum = axis + elif isinstance(axis, str): + axis_enum = MotorAxis[axis.upper()] + else: + raise TypeError("axis 参数必须为 str 或 MotorAxis") + + vals = self.client.read_registers(self.axis_addr[axis_enum], self.REG_STATUS, 6) + return [ + self.s32(vals[1], vals[2]), + self.s16(vals[3]), + vals[4], + int(MotorStatus(vals[0]).value) + ] + + def enable(self, axis: str, state: bool) -> bool: + a = MotorAxis[axis.upper()] + return self.client.write_single_register(self.axis_addr[a], self.REG_ENABLE, 1 if state else 0) + + def wait_complete(self, axis: str, timeout=30.0) -> bool: + a = axis.upper() + start = time.time() + while time.time() - start < timeout: + vals = self.get_status(a) + st = MotorStatus(vals[3]) # 第4个元素是状态值 + if st == MotorStatus.STANDBY: + return True + if st in (MotorStatus.COLLISION_STOP, MotorStatus.FORWARD_LIMIT_STOP, MotorStatus.REVERSE_LIMIT_STOP): + logger.warning(f"{a} 轴异常停止: {st.name}") + return False + time.sleep(0.1) + logger.warning(f"{a} 轴运动超时") + return False + + # ========== 控制命令 ========== + def move_to(self, axis: str, steps: int, speed: int = 2000, acc: int = 500, precision: int = 50) -> bool: + a = MotorAxis[axis.upper()] + addr = self.axis_addr[a] + hi, lo = (steps >> 16) & 0xFFFF, steps & 0xFFFF + values = [hi, lo, speed, acc, precision] + ok = self.client.write_multiple_registers(addr, self.REG_TARGET_HIGH, values) + if ok: + self.client.write_single_register(addr, self.REG_START, 1) + return ok + + def move_xyz_work(self, x: float = 0.0, y: float = 0.0, z: float = 0.0, speed: int = 100, acc: int = 1500): + logger.info("🧭 执行安全多轴运动:Z→XY→Z") + if z is not None: + safe_z = self._to_machine_steps("Z", 0.0) + self.move_to("Z", safe_z, speed, acc) + self.wait_complete("Z") + + if x is not None or y is not None: + if x is not None: + self.move_to("X", self._to_machine_steps("X", x), speed, acc) + if y is not None: + self.move_to("Y", self._to_machine_steps("Y", y), speed, acc) + if x is not None: + self.wait_complete("X") + if y is not None: + self.wait_complete("Y") + + if z is not None: + self.move_to("Z", self._to_machine_steps("Z", z), speed, acc) + self.wait_complete("Z") + logger.info("✅ 多轴顺序运动完成") + + # ========== 坐标与零点 ========== + def _to_machine_steps(self, axis: str, mm: float) -> int: + base = self.work_origin_steps.get(axis.lower(), 0) + return base + self.mm_to_steps(axis, mm) + + def define_current_as_zero(self, save_path="work_origin.json"): + import json + from datetime import datetime + + origin = {} + for axis in ["X", "Y", "Z"]: + vals = self.get_status(axis) + origin[axis.lower()] = int(vals[0]) # 第1个是步数 + with open(save_path, "w", encoding="utf-8") as f: + json.dump({"work_origin_steps": origin, "timestamp": datetime.now().isoformat()}, f, indent=2) + self.work_origin_steps = origin + self.is_homed = True + logger.info(f"✅ 零点已定义并保存至 {save_path}") + + def _load_work_origin(self, path: str) -> bool: + import json, os + + if not os.path.exists(path): + logger.warning("⚠️ 未找到软零点文件") + return False + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + self.work_origin_steps = data.get("work_origin_steps", {"x": 0, "y": 0, "z": 0}) + self.is_homed = True + logger.info(f"📂 软零点已加载: {self.work_origin_steps}") + return True + + def return_to_work_origin(self, speed: int = 200, acc: int = 800): + logger.info("🏁 回工件软零点") + self.move_to("Z", self._to_machine_steps("Z", 0.0), speed, acc) + self.wait_complete("Z") + self.move_to("X", self.work_origin_steps.get("x", 0), speed, acc) + self.move_to("Y", self.work_origin_steps.get("y", 0), speed, acc) + self.wait_complete("X") + self.wait_complete("Y") + self.move_to("Z", self.work_origin_steps.get("z", 0), speed, acc) + self.wait_complete("Z") + logger.info("🎯 回软零点完成 ✅") diff --git a/unilabos/registry/devices/laiyu_liquid_test.yaml b/unilabos/registry/devices/laiyu_liquid_test.yaml new file mode 100644 index 00000000..ffd46123 --- /dev/null +++ b/unilabos/registry/devices/laiyu_liquid_test.yaml @@ -0,0 +1,226 @@ +xyz_stepper_controller: + category: + - laiyu_liquid_test + class: + action_value_mappings: + auto-define_current_as_zero: + feedback: {} + goal: {} + goal_default: + save_path: work_origin.json + handles: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + save_path: + default: work_origin.json + type: string + required: [] + type: object + result: {} + required: + - goal + title: define_current_as_zero参数 + type: object + type: UniLabJsonCommand + auto-enable: + feedback: {} + goal: {} + goal_default: + axis: null + state: null + handles: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + axis: + type: string + state: + type: boolean + required: + - axis + - state + type: object + result: {} + required: + - goal + title: enable参数 + type: object + type: UniLabJsonCommand + auto-move_to: + feedback: {} + goal: {} + goal_default: + acc: 500 + axis: null + precision: 50 + speed: 2000 + steps: null + handles: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + acc: + default: 500 + type: integer + axis: + type: string + precision: + default: 50 + type: integer + speed: + default: 2000 + type: integer + steps: + type: integer + required: + - axis + - steps + type: object + result: {} + required: + - goal + title: move_to参数 + type: object + type: UniLabJsonCommand + auto-move_xyz_work: + feedback: {} + goal: {} + goal_default: + acc: 1500 + speed: 100 + x: 0.0 + y: 0.0 + z: 0.0 + handles: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + acc: + default: 1500 + type: integer + speed: + default: 100 + type: integer + x: + default: 0.0 + type: number + y: + default: 0.0 + type: number + z: + default: 0.0 + type: number + required: [] + type: object + result: {} + required: + - goal + title: move_xyz_work参数 + type: object + type: UniLabJsonCommand + auto-return_to_work_origin: + feedback: {} + goal: {} + goal_default: + acc: 800 + speed: 200 + handles: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + acc: + default: 800 + type: integer + speed: + default: 200 + type: integer + required: [] + type: object + result: {} + required: + - goal + title: return_to_work_origin参数 + type: object + type: UniLabJsonCommand + auto-wait_complete: + feedback: {} + goal: {} + goal_default: + axis: null + timeout: 30.0 + handles: {} + result: {} + schema: + description: '' + properties: + feedback: {} + goal: + properties: + axis: + type: string + timeout: + default: 30.0 + type: string + required: + - axis + type: object + result: {} + required: + - goal + title: wait_complete参数 + type: object + type: UniLabJsonCommand + module: unilabos.devices.laiyu_liquid_test.xyz_stepper_driver:XYZStepperController + status_types: + status: list + type: python + config_info: [] + description: 新XYZ控制器 + handles: [] + icon: '' + init_param_schema: + config: + properties: + baudrate: + default: 115200 + type: string + client: + type: string + origin_path: + default: unilabos/devices/laiyu_liquid_test/work_origin.json + type: string + port: + default: /dev/ttyUSB1 + type: string + required: [] + type: object + data: + properties: + status: + type: array + required: + - status + type: object + registry_type: device + version: 1.0.0