mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 13:01:12 +00:00
feat: 添加ChinWe设备控制类,支持串口通信和电机控制功能 (#79)
This commit is contained in:
282
unilabos/devices/ChinWe/chinwe.py
Normal file
282
unilabos/devices/ChinWe/chinwe.py
Normal file
@@ -0,0 +1,282 @@
|
|||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import serial
|
||||||
|
import serial.tools.list_ports
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from typing import Optional, List, Dict, Tuple
|
||||||
|
|
||||||
|
class ChinweDevice:
|
||||||
|
"""
|
||||||
|
ChinWe设备控制类
|
||||||
|
提供串口通信、电机控制、传感器数据读取等功能
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, port: str, baudrate: int = 115200, debug: bool = False):
|
||||||
|
"""
|
||||||
|
初始化ChinWe设备
|
||||||
|
|
||||||
|
Args:
|
||||||
|
port: 串口名称,如果为None则自动检测
|
||||||
|
baudrate: 波特率,默认115200
|
||||||
|
"""
|
||||||
|
self.debug = debug
|
||||||
|
self.port = port
|
||||||
|
self.baudrate = baudrate
|
||||||
|
self.serial_port: Optional[serial.Serial] = None
|
||||||
|
self._voltage: float = 0.0
|
||||||
|
self._ec_value: float = 0.0
|
||||||
|
self._ec_adc_value: int = 0
|
||||||
|
self._is_connected = False
|
||||||
|
self.connect()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
"""获取连接状态"""
|
||||||
|
return self._is_connected and self.serial_port and self.serial_port.is_open
|
||||||
|
|
||||||
|
@property
|
||||||
|
def voltage(self) -> float:
|
||||||
|
"""获取电源电压值"""
|
||||||
|
return self._voltage
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ec_value(self) -> float:
|
||||||
|
"""获取电导率值 (ms/cm)"""
|
||||||
|
return self._ec_value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ec_adc_value(self) -> int:
|
||||||
|
"""获取EC ADC原始值"""
|
||||||
|
return self._ec_adc_value
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device_status(self) -> Dict[str, any]:
|
||||||
|
"""
|
||||||
|
获取设备状态信息
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含设备状态的字典
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"connected": self.is_connected,
|
||||||
|
"port": self.port,
|
||||||
|
"baudrate": self.baudrate,
|
||||||
|
"voltage": self.voltage,
|
||||||
|
"ec_value": self.ec_value,
|
||||||
|
"ec_adc_value": self.ec_adc_value
|
||||||
|
}
|
||||||
|
|
||||||
|
def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None) -> bool:
|
||||||
|
"""
|
||||||
|
连接到串口设备
|
||||||
|
|
||||||
|
Args:
|
||||||
|
port: 串口名称,如果为None则使用初始化时的port或自动检测
|
||||||
|
baudrate: 波特率,如果为None则使用初始化时的baudrate
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
连接是否成功
|
||||||
|
"""
|
||||||
|
if self.is_connected:
|
||||||
|
return True
|
||||||
|
|
||||||
|
target_port = port or self.port
|
||||||
|
target_baudrate = baudrate or self.baudrate
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.serial_port = serial.Serial(target_port, target_baudrate, timeout=0.5)
|
||||||
|
self._is_connected = True
|
||||||
|
self.port = target_port
|
||||||
|
self.baudrate = target_baudrate
|
||||||
|
connect_allow_times = 5
|
||||||
|
while not self.serial_port.is_open and connect_allow_times > 0:
|
||||||
|
time.sleep(0.5)
|
||||||
|
connect_allow_times -= 1
|
||||||
|
print(f"尝试连接到 {target_port} @ {target_baudrate},剩余尝试次数: {connect_allow_times}", self.debug)
|
||||||
|
raise ValueError("串口未打开,请检查设备连接")
|
||||||
|
print(f"已连接到 {target_port} @ {target_baudrate}", self.debug)
|
||||||
|
threading.Thread(target=self._read_data, daemon=True).start()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"ChinweDevice连接失败: {e}")
|
||||||
|
self._is_connected = False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def disconnect(self) -> bool:
|
||||||
|
"""
|
||||||
|
断开串口连接
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
断开是否成功
|
||||||
|
"""
|
||||||
|
if self.serial_port and self.serial_port.is_open:
|
||||||
|
try:
|
||||||
|
self.serial_port.close()
|
||||||
|
self._is_connected = False
|
||||||
|
print("已断开串口连接")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"断开连接失败: {e}")
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _send_motor_command(self, command: str) -> bool:
|
||||||
|
"""
|
||||||
|
发送电机控制命令
|
||||||
|
|
||||||
|
Args:
|
||||||
|
command: 电机命令字符串,例如 "M 1 CW 1.5"
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
发送是否成功
|
||||||
|
"""
|
||||||
|
if not self.is_connected:
|
||||||
|
print("设备未连接")
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.serial_port.write((command + "\n").encode('utf-8'))
|
||||||
|
print(f"发送命令: {command}")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"发送命令失败: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def rotate_motor(self, motor_id: int, turns: float, clockwise: bool = True) -> bool:
|
||||||
|
"""
|
||||||
|
使电机转动指定圈数
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motor_id: 电机ID(1, 2, 3...)
|
||||||
|
turns: 转动圈数,支持小数
|
||||||
|
clockwise: True为顺时针,False为逆时针
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
命令发送是否成功
|
||||||
|
"""
|
||||||
|
if clockwise:
|
||||||
|
command = f"M {motor_id} CW {turns}"
|
||||||
|
else:
|
||||||
|
command = f"M {motor_id} CCW {turns}"
|
||||||
|
return self._send_motor_command(command)
|
||||||
|
|
||||||
|
def set_motor_speed(self, motor_id: int, speed: float) -> bool:
|
||||||
|
"""
|
||||||
|
设置电机转速(如果设备支持)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
motor_id: 电机ID(1, 2, 3...)
|
||||||
|
speed: 转速值
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
命令发送是否成功
|
||||||
|
"""
|
||||||
|
command = f"M {motor_id} SPEED {speed}"
|
||||||
|
return self._send_motor_command(command)
|
||||||
|
|
||||||
|
def _read_data(self) -> List[str]:
|
||||||
|
"""
|
||||||
|
读取串口数据并解析
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
读取到的数据行列表
|
||||||
|
"""
|
||||||
|
print("开始读取串口数据...")
|
||||||
|
if not self.is_connected:
|
||||||
|
return []
|
||||||
|
|
||||||
|
data_lines = []
|
||||||
|
try:
|
||||||
|
while self.serial_port.in_waiting:
|
||||||
|
time.sleep(0.1) # 等待数据稳定
|
||||||
|
try:
|
||||||
|
line = self.serial_port.readline().decode('utf-8', errors='ignore').strip()
|
||||||
|
if line:
|
||||||
|
data_lines.append(line)
|
||||||
|
self._parse_sensor_data(line)
|
||||||
|
except Exception as ex:
|
||||||
|
print(f"解码数据错误: {ex}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"读取串口数据错误: {e}")
|
||||||
|
|
||||||
|
return data_lines
|
||||||
|
|
||||||
|
def _parse_sensor_data(self, line: str) -> None:
|
||||||
|
"""
|
||||||
|
解析传感器数据
|
||||||
|
|
||||||
|
Args:
|
||||||
|
line: 接收到的数据行
|
||||||
|
"""
|
||||||
|
# 解析电源电压
|
||||||
|
if "电源电压" in line:
|
||||||
|
try:
|
||||||
|
val = float(line.split(":")[1].replace("V", "").strip())
|
||||||
|
self._voltage = val
|
||||||
|
if self.debug:
|
||||||
|
print(f"电源电压更新: {val}V")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 解析电导率和ADC原始值(支持两种格式)
|
||||||
|
if "电导率" in line and "ADC原始值" in line:
|
||||||
|
try:
|
||||||
|
# 支持格式如:电导率:2.50ms/cm, ADC原始值:2052
|
||||||
|
ec_match = re.search(r"电导率[::]\s*([\d\.]+)", line)
|
||||||
|
adc_match = re.search(r"ADC原始值[::]\s*(\d+)", line)
|
||||||
|
if ec_match:
|
||||||
|
ec_val = float(ec_match.group(1))
|
||||||
|
self._ec_value = ec_val
|
||||||
|
if self.debug:
|
||||||
|
print(f"电导率更新: {ec_val:.2f} ms/cm")
|
||||||
|
if adc_match:
|
||||||
|
adc_val = int(adc_match.group(1))
|
||||||
|
self._ec_adc_value = adc_val
|
||||||
|
if self.debug:
|
||||||
|
print(f"EC ADC原始值更新: {adc_val}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# 仅电导率,无ADC原始值
|
||||||
|
elif "电导率" in line:
|
||||||
|
try:
|
||||||
|
val = float(line.split(":")[1].replace("ms/cm", "").strip())
|
||||||
|
self._ec_value = val
|
||||||
|
if self.debug:
|
||||||
|
print(f"电导率更新: {val:.2f} ms/cm")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# 仅ADC原始值(如有分开回传场景)
|
||||||
|
elif "ADC原始值" in line:
|
||||||
|
try:
|
||||||
|
adc_val = int(line.split(":")[1].strip())
|
||||||
|
self._ec_adc_value = adc_val
|
||||||
|
if self.debug:
|
||||||
|
print(f"EC ADC原始值更新: {adc_val}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def spin_when_ec_ge_0():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""测试函数"""
|
||||||
|
print("=== ChinWe设备测试 ===")
|
||||||
|
|
||||||
|
# 创建设备实例
|
||||||
|
device = ChinweDevice("/dev/tty.usbserial-A5069RR4", debug=True)
|
||||||
|
try:
|
||||||
|
# 测试5: 发送电机命令
|
||||||
|
print("\n5. 发送电机命令测试:")
|
||||||
|
print(" 5.3 使用通用函数控制电机20顺时针转2圈:")
|
||||||
|
device.rotate_motor(2, 20.0, clockwise=True)
|
||||||
|
time.sleep(0.5)
|
||||||
|
finally:
|
||||||
|
time.sleep(10)
|
||||||
|
# 测试7: 断开连接
|
||||||
|
print("\n7. 断开连接:")
|
||||||
|
device.disconnect()
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -3,6 +3,7 @@ from threading import Lock, Event
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import time
|
import time
|
||||||
|
import traceback
|
||||||
from typing import Any, Union, Optional, overload
|
from typing import Any, Union, Optional, overload
|
||||||
|
|
||||||
import serial.tools.list_ports
|
import serial.tools.list_ports
|
||||||
@@ -386,3 +387,8 @@ class RunzeSyringePump:
|
|||||||
def list():
|
def list():
|
||||||
for item in serial.tools.list_ports.comports():
|
for item in serial.tools.list_ports.comports():
|
||||||
yield RunzeSyringePumpInfo(port=item.device)
|
yield RunzeSyringePumpInfo(port=item.device)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0)
|
||||||
|
r.initialize()
|
||||||
@@ -447,6 +447,8 @@ class Registry:
|
|||||||
if complete_registry:
|
if complete_registry:
|
||||||
device_config["class"]["status_types"].clear()
|
device_config["class"]["status_types"].clear()
|
||||||
enhanced_info = get_enhanced_class_info(device_config["class"]["module"], use_dynamic=True)
|
enhanced_info = get_enhanced_class_info(device_config["class"]["module"], use_dynamic=True)
|
||||||
|
if not enhanced_info.get("dynamic_import_success", False):
|
||||||
|
continue
|
||||||
device_config["class"]["status_types"].update(
|
device_config["class"]["status_types"].update(
|
||||||
{k: v["return_type"] for k, v in enhanced_info["status_methods"].items()}
|
{k: v["return_type"] for k, v in enhanced_info["status_methods"].items()}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -88,6 +88,9 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
|
|||||||
if device_config.get("type", "device") != "device":
|
if device_config.get("type", "device") != "device":
|
||||||
continue
|
continue
|
||||||
# 设置硬件接口代理
|
# 设置硬件接口代理
|
||||||
|
if device_id not in self.sub_devices:
|
||||||
|
self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...")
|
||||||
|
continue
|
||||||
d = self.sub_devices[device_id]
|
d = self.sub_devices[device_id]
|
||||||
if d:
|
if d:
|
||||||
hardware_interface = d.ros_node_instance._hardware_interface
|
hardware_interface = d.ros_node_instance._hardware_interface
|
||||||
|
|||||||
Reference in New Issue
Block a user