mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-04 05:15:10 +00:00
377 lines
11 KiB
Python
377 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
ZDT X42 Closed-Loop Stepper Motor Driver
|
||
RS485 Serial Communication via USB-Serial Converter
|
||
|
||
- Baudrate: 115200
|
||
"""
|
||
|
||
import serial
|
||
import time
|
||
import threading
|
||
import struct
|
||
import logging
|
||
from typing import Optional, Any
|
||
|
||
try:
|
||
from unilabos.device_comms.universal_driver import UniversalDriver
|
||
except ImportError:
|
||
class UniversalDriver:
|
||
def __init__(self, *args, **kwargs):
|
||
self.logger = logging.getLogger(self.__class__.__name__)
|
||
def execute_command_from_outer(self, command: Any): pass
|
||
|
||
from serial.rs485 import RS485Settings
|
||
|
||
|
||
class ZDTX42Driver(UniversalDriver):
|
||
"""
|
||
ZDT X42 闭环步进电机驱动器
|
||
|
||
支持功能:
|
||
- 速度模式运行
|
||
- 位置模式运行 (相对/绝对)
|
||
- 位置读取和清零
|
||
- 使能/禁用控制
|
||
|
||
通信协议:
|
||
- 帧格式: [设备ID] [功能码] [数据...] [校验位=0x6B]
|
||
- 响应长度根据功能码决定
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
port: str,
|
||
baudrate: int = 115200,
|
||
device_id: int = 1,
|
||
timeout: float = 0.5,
|
||
debug: bool = False
|
||
):
|
||
"""
|
||
初始化 ZDT X42 电机驱动
|
||
|
||
Args:
|
||
port: 串口设备路径
|
||
baudrate: 波特率 (默认 115200)
|
||
device_id: 设备地址 (1-255)
|
||
timeout: 通信超时时间(秒)
|
||
debug: 是否启用调试输出
|
||
"""
|
||
super().__init__()
|
||
self.id = device_id
|
||
self.debug = debug
|
||
self.lock = threading.RLock()
|
||
self.status = "idle" # 对应注册表中的 status (str)
|
||
self.position = 0 # 对应注册表中的 position (int)
|
||
|
||
try:
|
||
self.ser = serial.Serial(
|
||
port=port,
|
||
baudrate=baudrate,
|
||
timeout=timeout,
|
||
bytesize=serial.EIGHTBITS,
|
||
parity=serial.PARITY_NONE,
|
||
stopbits=serial.STOPBITS_ONE
|
||
)
|
||
|
||
# 启用 RS485 模式
|
||
try:
|
||
self.ser.rs485_mode = RS485Settings(
|
||
rts_level_for_tx=True,
|
||
rts_level_for_rx=False
|
||
)
|
||
except Exception:
|
||
pass # RS485 模式是可选的
|
||
|
||
self.logger.info(
|
||
f"ZDT X42 Motor connected: {port} "
|
||
f"(Baud: {baudrate}, ID: {device_id})"
|
||
)
|
||
# 自动使能电机,确保初始状态可运动
|
||
self.enable(True)
|
||
|
||
# 启动背景轮询线程,确保 position 实时刷新
|
||
self._stop_event = threading.Event()
|
||
self._polling_thread = threading.Thread(
|
||
target=self._update_loop,
|
||
name=f"ZDTPolling_{port}",
|
||
daemon=True
|
||
)
|
||
self._polling_thread.start()
|
||
except Exception as e:
|
||
self.logger.error(f"Failed to open serial port {port}: {e}")
|
||
self.ser = None
|
||
|
||
def _update_loop(self):
|
||
"""背景循环读取电机位置"""
|
||
while not self._stop_event.is_set():
|
||
try:
|
||
self.get_position()
|
||
except Exception as e:
|
||
if self.debug:
|
||
self.logger.error(f"Polling error: {e}")
|
||
time.sleep(1.0) # 每1秒刷新一次位置数据
|
||
|
||
def _send(self, func_code: int, payload: list) -> bytes:
|
||
"""
|
||
发送指令并接收响应
|
||
|
||
Args:
|
||
func_code: 功能码
|
||
payload: 数据负载 (list of bytes)
|
||
|
||
Returns:
|
||
响应数据 (bytes)
|
||
"""
|
||
if not self.ser:
|
||
self.logger.error("Serial port not available")
|
||
return b""
|
||
|
||
with self.lock:
|
||
# 清空输入缓冲区
|
||
self.ser.reset_input_buffer()
|
||
|
||
# 构建消息: [ID] [功能码] [数据...] [校验位=0x6B]
|
||
message = bytes([self.id, func_code] + payload + [0x6B])
|
||
|
||
# 发送
|
||
self.ser.write(message)
|
||
|
||
# 根据功能码决定响应长度
|
||
# 查询类指令返回 10 字节,控制类指令返回 4 字节
|
||
read_len = 10 if func_code in [0x31, 0x32, 0x35, 0x24, 0x27] else 4
|
||
response = self.ser.read(read_len)
|
||
|
||
# 调试输出
|
||
if self.debug:
|
||
sent_hex = message.hex().upper()
|
||
recv_hex = response.hex().upper() if response else 'TIMEOUT'
|
||
print(f"[ID {self.id}] TX: {sent_hex} → RX: {recv_hex}")
|
||
|
||
return response
|
||
|
||
def enable(self, on: bool = True) -> bool:
|
||
"""
|
||
使能/禁用电机
|
||
|
||
Args:
|
||
on: True=使能(锁轴), False=禁用(松轴)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
state = 1 if on else 0
|
||
resp = self._send(0xF3, [0xAB, state, 0])
|
||
return len(resp) >= 4
|
||
|
||
def move_speed(
|
||
self,
|
||
speed_rpm: int,
|
||
direction: str = "CW",
|
||
acceleration: int = 10
|
||
) -> bool:
|
||
"""
|
||
速度模式运行
|
||
|
||
Args:
|
||
speed_rpm: 转速 (RPM)
|
||
direction: 方向 ("CW"=顺时针, "CCW"=逆时针)
|
||
acceleration: 加速度 (0-255)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
dir_val = 0 if direction.upper() in ["CW", "顺时针"] else 1
|
||
speed_bytes = struct.pack('>H', int(speed_rpm))
|
||
self.status = f"moving@{speed_rpm}rpm"
|
||
resp = self._send(0xF6, [dir_val, speed_bytes[0], speed_bytes[1], acceleration, 0])
|
||
return len(resp) >= 4
|
||
|
||
def move_position(
|
||
self,
|
||
pulses: int,
|
||
speed_rpm: int,
|
||
direction: str = "CW",
|
||
acceleration: int = 10,
|
||
absolute: bool = False
|
||
) -> bool:
|
||
"""
|
||
位置模式运行
|
||
|
||
Args:
|
||
pulses: 脉冲数
|
||
speed_rpm: 转速 (RPM)
|
||
direction: 方向 ("CW"=顺时针, "CCW"=逆时针)
|
||
acceleration: 加速度 (0-255)
|
||
absolute: True=绝对位置, False=相对位置
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
dir_val = 0 if direction.upper() in ["CW", "顺时针"] else 1
|
||
speed_bytes = struct.pack('>H', int(speed_rpm))
|
||
self.status = f"moving_to_{pulses}"
|
||
pulse_bytes = struct.pack('>I', int(pulses))
|
||
abs_flag = 1 if absolute else 0
|
||
|
||
payload = [
|
||
dir_val,
|
||
speed_bytes[0], speed_bytes[1],
|
||
acceleration,
|
||
pulse_bytes[0], pulse_bytes[1], pulse_bytes[2], pulse_bytes[3],
|
||
abs_flag,
|
||
0
|
||
]
|
||
|
||
resp = self._send(0xFD, payload)
|
||
return len(resp) >= 4
|
||
|
||
def stop(self) -> bool:
|
||
"""
|
||
停止电机
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
self.status = "idle"
|
||
resp = self._send(0xFE, [0x98, 0])
|
||
return len(resp) >= 4
|
||
|
||
def rotate_quarter(self, speed_rpm: int = 60, direction: str = "CW") -> bool:
|
||
"""
|
||
电机旋转 1/4 圈 (阻塞式)
|
||
假设电机细分为 3200 脉冲/圈,1/4 圈 = 800 脉冲
|
||
"""
|
||
pulses = 800
|
||
success = self.move_position(pulses=pulses, speed_rpm=speed_rpm, direction=direction, absolute=False)
|
||
|
||
if success:
|
||
# 计算预估旋转时间并进行阻塞等待 (Time = revolutions / (RPM/60))
|
||
# 1/4 rev / (RPM/60) = 15.0 / RPM
|
||
estimated_time = 15.0 / max(1, speed_rpm)
|
||
time.sleep(estimated_time + 0.5) # 额外给 0.5 秒缓冲
|
||
self.status = "idle"
|
||
|
||
return success
|
||
|
||
def wait_time(self, duration_s: float) -> bool:
|
||
"""
|
||
等待指定时间 (秒)
|
||
"""
|
||
self.logger.info(f"Waiting for {duration_s} seconds...")
|
||
time.sleep(duration_s)
|
||
return True
|
||
|
||
def set_zero(self) -> bool:
|
||
"""
|
||
清零当前位置
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
resp = self._send(0x0A, [])
|
||
return len(resp) >= 4
|
||
|
||
def get_position(self) -> Optional[int]:
|
||
"""
|
||
读取当前位置 (脉冲数)
|
||
|
||
Returns:
|
||
当前位置脉冲数,失败返回 None
|
||
"""
|
||
resp = self._send(0x32, [])
|
||
|
||
if len(resp) >= 8:
|
||
# 响应格式: [ID] [Func] [符号位] [数值4字节] [校验]
|
||
sign = resp[2] # 0=正, 1=负
|
||
value = struct.unpack('>I', resp[3:7])[0]
|
||
self.position = -value if sign == 1 else value
|
||
|
||
if self.debug:
|
||
print(f"[Position] Raw: {resp.hex().upper()}, Parsed: {self.position}")
|
||
|
||
return self.position
|
||
|
||
self.logger.warning("Failed to read position")
|
||
return None
|
||
|
||
def close(self):
|
||
"""关闭串口连接并停止线程"""
|
||
if hasattr(self, '_stop_event'):
|
||
self._stop_event.set()
|
||
|
||
if self.ser and self.ser.is_open:
|
||
self.ser.close()
|
||
self.logger.info("Serial port closed")
|
||
|
||
|
||
# ============================================================
|
||
# 测试和调试代码
|
||
# ============================================================
|
||
|
||
def test_motor():
|
||
"""基础功能测试"""
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
print("="*60)
|
||
print("ZDT X42 电机驱动测试")
|
||
print("="*60)
|
||
|
||
driver = ZDTX42Driver(
|
||
port="/dev/tty.usbserial-3110",
|
||
baudrate=115200,
|
||
device_id=2,
|
||
debug=True
|
||
)
|
||
|
||
if not driver.ser:
|
||
print("❌ 串口打开失败")
|
||
return
|
||
|
||
try:
|
||
# 测试 1: 读取位置
|
||
print("\n[1] 读取当前位置")
|
||
pos = driver.get_position()
|
||
print(f"✓ 当前位置: {pos} 脉冲")
|
||
|
||
# 测试 2: 使能
|
||
print("\n[2] 使能电机")
|
||
driver.enable(True)
|
||
time.sleep(0.3)
|
||
print("✓ 电机已锁定")
|
||
|
||
# 测试 3: 相对位置运动
|
||
print("\n[3] 相对位置运动 (1000脉冲)")
|
||
driver.move_position(pulses=1000, speed_rpm=60, direction="CW")
|
||
time.sleep(2)
|
||
pos = driver.get_position()
|
||
print(f"✓ 新位置: {pos}")
|
||
|
||
# 测试 4: 速度运动
|
||
print("\n[4] 速度模式 (30RPM, 3秒)")
|
||
driver.move_speed(speed_rpm=30, direction="CW")
|
||
time.sleep(3)
|
||
driver.stop()
|
||
pos = driver.get_position()
|
||
print(f"✓ 停止后位置: {pos}")
|
||
|
||
# 测试 5: 禁用
|
||
print("\n[5] 禁用电机")
|
||
driver.enable(False)
|
||
print("✓ 电机已松开")
|
||
|
||
print("\n" + "="*60)
|
||
print("✅ 测试完成")
|
||
print("="*60)
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 测试失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
driver.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
test_motor()
|