From 06e66765e76aa657fa63ec0bd4bc4a2ef9548344 Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Wed, 6 Aug 2025 18:49:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0ChinWe=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E6=8E=A7=E5=88=B6=E7=B1=BB=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E4=B8=B2=E5=8F=A3=E9=80=9A=E4=BF=A1=E5=92=8C=E7=94=B5=E6=9C=BA?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E5=8A=9F=E8=83=BD=20(#79)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/devices/ChinWe/chinwe.py | 282 ++++++++++++++++++ .../devices/pump_and_valve/runze_backbone.py | 6 + unilabos/registry/registry.py | 2 + unilabos/ros/nodes/presets/protocol_node.py | 3 + 4 files changed, 293 insertions(+) create mode 100644 unilabos/devices/ChinWe/chinwe.py diff --git a/unilabos/devices/ChinWe/chinwe.py b/unilabos/devices/ChinWe/chinwe.py new file mode 100644 index 00000000..4eeb0f47 --- /dev/null +++ b/unilabos/devices/ChinWe/chinwe.py @@ -0,0 +1,282 @@ +import sys +import threading +import serial +import serial.tools.list_ports +import re +import time +from typing import Optional, List, Dict, Tuple + +class ChinweDevice: + """ + ChinWe设备控制类 + 提供串口通信、电机控制、传感器数据读取等功能 + """ + + def __init__(self, port: str, baudrate: int = 115200, debug: bool = False): + """ + 初始化ChinWe设备 + + Args: + port: 串口名称,如果为None则自动检测 + baudrate: 波特率,默认115200 + """ + self.debug = debug + self.port = port + self.baudrate = baudrate + self.serial_port: Optional[serial.Serial] = None + self._voltage: float = 0.0 + self._ec_value: float = 0.0 + self._ec_adc_value: int = 0 + self._is_connected = False + self.connect() + + @property + def is_connected(self) -> bool: + """获取连接状态""" + return self._is_connected and self.serial_port and self.serial_port.is_open + + @property + def voltage(self) -> float: + """获取电源电压值""" + return self._voltage + + @property + def ec_value(self) -> float: + """获取电导率值 (ms/cm)""" + return self._ec_value + + @property + def ec_adc_value(self) -> int: + """获取EC ADC原始值""" + return self._ec_adc_value + + + @property + def device_status(self) -> Dict[str, any]: + """ + 获取设备状态信息 + + Returns: + 包含设备状态的字典 + """ + return { + "connected": self.is_connected, + "port": self.port, + "baudrate": self.baudrate, + "voltage": self.voltage, + "ec_value": self.ec_value, + "ec_adc_value": self.ec_adc_value + } + + def connect(self, port: Optional[str] = None, baudrate: Optional[int] = None) -> bool: + """ + 连接到串口设备 + + Args: + port: 串口名称,如果为None则使用初始化时的port或自动检测 + baudrate: 波特率,如果为None则使用初始化时的baudrate + + Returns: + 连接是否成功 + """ + if self.is_connected: + return True + + target_port = port or self.port + target_baudrate = baudrate or self.baudrate + + try: + self.serial_port = serial.Serial(target_port, target_baudrate, timeout=0.5) + self._is_connected = True + self.port = target_port + self.baudrate = target_baudrate + connect_allow_times = 5 + while not self.serial_port.is_open and connect_allow_times > 0: + time.sleep(0.5) + connect_allow_times -= 1 + print(f"尝试连接到 {target_port} @ {target_baudrate},剩余尝试次数: {connect_allow_times}", self.debug) + raise ValueError("串口未打开,请检查设备连接") + print(f"已连接到 {target_port} @ {target_baudrate}", self.debug) + threading.Thread(target=self._read_data, daemon=True).start() + return True + except Exception as e: + print(f"ChinweDevice连接失败: {e}") + self._is_connected = False + return False + + def disconnect(self) -> bool: + """ + 断开串口连接 + + Returns: + 断开是否成功 + """ + if self.serial_port and self.serial_port.is_open: + try: + self.serial_port.close() + self._is_connected = False + print("已断开串口连接") + return True + except Exception as e: + print(f"断开连接失败: {e}") + return False + return True + + def _send_motor_command(self, command: str) -> bool: + """ + 发送电机控制命令 + + Args: + command: 电机命令字符串,例如 "M 1 CW 1.5" + + Returns: + 发送是否成功 + """ + if not self.is_connected: + print("设备未连接") + return False + + try: + self.serial_port.write((command + "\n").encode('utf-8')) + print(f"发送命令: {command}") + return True + except Exception as e: + print(f"发送命令失败: {e}") + return False + + def rotate_motor(self, motor_id: int, turns: float, clockwise: bool = True) -> bool: + """ + 使电机转动指定圈数 + + Args: + motor_id: 电机ID(1, 2, 3...) + turns: 转动圈数,支持小数 + clockwise: True为顺时针,False为逆时针 + + Returns: + 命令发送是否成功 + """ + if clockwise: + command = f"M {motor_id} CW {turns}" + else: + command = f"M {motor_id} CCW {turns}" + return self._send_motor_command(command) + + def set_motor_speed(self, motor_id: int, speed: float) -> bool: + """ + 设置电机转速(如果设备支持) + + Args: + motor_id: 电机ID(1, 2, 3...) + speed: 转速值 + + Returns: + 命令发送是否成功 + """ + command = f"M {motor_id} SPEED {speed}" + return self._send_motor_command(command) + + def _read_data(self) -> List[str]: + """ + 读取串口数据并解析 + + Returns: + 读取到的数据行列表 + """ + print("开始读取串口数据...") + if not self.is_connected: + return [] + + data_lines = [] + try: + while self.serial_port.in_waiting: + time.sleep(0.1) # 等待数据稳定 + try: + line = self.serial_port.readline().decode('utf-8', errors='ignore').strip() + if line: + data_lines.append(line) + self._parse_sensor_data(line) + except Exception as ex: + print(f"解码数据错误: {ex}") + except Exception as e: + print(f"读取串口数据错误: {e}") + + return data_lines + + def _parse_sensor_data(self, line: str) -> None: + """ + 解析传感器数据 + + Args: + line: 接收到的数据行 + """ + # 解析电源电压 + if "电源电压" in line: + try: + val = float(line.split(":")[1].replace("V", "").strip()) + self._voltage = val + if self.debug: + print(f"电源电压更新: {val}V") + except Exception: + pass + + # 解析电导率和ADC原始值(支持两种格式) + if "电导率" in line and "ADC原始值" in line: + try: + # 支持格式如:电导率:2.50ms/cm, ADC原始值:2052 + ec_match = re.search(r"电导率[::]\s*([\d\.]+)", line) + adc_match = re.search(r"ADC原始值[::]\s*(\d+)", line) + if ec_match: + ec_val = float(ec_match.group(1)) + self._ec_value = ec_val + if self.debug: + print(f"电导率更新: {ec_val:.2f} ms/cm") + if adc_match: + adc_val = int(adc_match.group(1)) + self._ec_adc_value = adc_val + if self.debug: + print(f"EC ADC原始值更新: {adc_val}") + except Exception: + pass + # 仅电导率,无ADC原始值 + elif "电导率" in line: + try: + val = float(line.split(":")[1].replace("ms/cm", "").strip()) + self._ec_value = val + if self.debug: + print(f"电导率更新: {val:.2f} ms/cm") + except Exception: + pass + # 仅ADC原始值(如有分开回传场景) + elif "ADC原始值" in line: + try: + adc_val = int(line.split(":")[1].strip()) + self._ec_adc_value = adc_val + if self.debug: + print(f"EC ADC原始值更新: {adc_val}") + except Exception: + pass + + def spin_when_ec_ge_0(): + pass + + +def main(): + """测试函数""" + print("=== ChinWe设备测试 ===") + + # 创建设备实例 + device = ChinweDevice("/dev/tty.usbserial-A5069RR4", debug=True) + try: + # 测试5: 发送电机命令 + print("\n5. 发送电机命令测试:") + print(" 5.3 使用通用函数控制电机20顺时针转2圈:") + device.rotate_motor(2, 20.0, clockwise=True) + time.sleep(0.5) + finally: + time.sleep(10) + # 测试7: 断开连接 + print("\n7. 断开连接:") + device.disconnect() +if __name__ == "__main__": + main() diff --git a/unilabos/devices/pump_and_valve/runze_backbone.py b/unilabos/devices/pump_and_valve/runze_backbone.py index f6629ee0..36cb235a 100644 --- a/unilabos/devices/pump_and_valve/runze_backbone.py +++ b/unilabos/devices/pump_and_valve/runze_backbone.py @@ -3,6 +3,7 @@ from threading import Lock, Event from enum import Enum from dataclasses import dataclass import time +import traceback from typing import Any, Union, Optional, overload import serial.tools.list_ports @@ -386,3 +387,8 @@ class RunzeSyringePump: def list(): for item in serial.tools.list_ports.comports(): yield RunzeSyringePumpInfo(port=item.device) + + +if __name__ == "__main__": + r = RunzeSyringePump("/dev/tty.usbserial-D30JUGG5", "1", 25.0) + r.initialize() \ No newline at end of file diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 1e789488..588180d0 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -447,6 +447,8 @@ class Registry: if complete_registry: device_config["class"]["status_types"].clear() enhanced_info = get_enhanced_class_info(device_config["class"]["module"], use_dynamic=True) + if not enhanced_info.get("dynamic_import_success", False): + continue device_config["class"]["status_types"].update( {k: v["return_type"] for k, v in enhanced_info["status_methods"].items()} ) diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index 833e1892..76129a18 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -88,6 +88,9 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): if device_config.get("type", "device") != "device": continue # 设置硬件接口代理 + if device_id not in self.sub_devices: + self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...") + continue d = self.sub_devices[device_id] if d: hardware_interface = d.ros_node_instance._hardware_interface