From c4a3be149847438165388c23df9264a5ab7d67c6 Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Tue, 27 Jan 2026 12:37:09 +0800 Subject: [PATCH] feat: enhance separation_step logic with polling thread management and error handling --- unilabos/devices/separator/chinwe.py | 99 +++++++++++++++------------- 1 file changed, 55 insertions(+), 44 deletions(-) diff --git a/unilabos/devices/separator/chinwe.py b/unilabos/devices/separator/chinwe.py index 7adfdbe..b8c36a7 100644 --- a/unilabos/devices/separator/chinwe.py +++ b/unilabos/devices/separator/chinwe.py @@ -655,6 +655,12 @@ class ChinweDevice(UniversalDriver): motor = self.motors[motor_id] + # 停止轮询线程,避免与 separation_step 同时读取传感器造成串口冲突 + self.logger.info("Stopping polling thread for separation_step...") + self._stop_event.set() + if self._poll_thread and self._poll_thread.is_alive(): + self._poll_thread.join(timeout=2.0) + # 使能电机 self.logger.info(f"Enabling motor {motor_id}...") motor.enable(True) @@ -669,61 +675,66 @@ class ChinweDevice(UniversalDriver): start_time = time.time() error_count = 0 - while True: - # 检查超时 - if time.time() - start_time > timeout: - self.logger.warning(f"Separation step timeout after {timeout} seconds") - return False + try: + while True: + # 检查超时 + if time.time() - start_time > timeout: + self.logger.warning(f"Separation step timeout after {timeout} seconds") + return False - # 检查循环次数限制 - if max_cycles > 0 and cycle_count >= max_cycles: - self.logger.info(f"Separation step completed: reached max_cycles={max_cycles}") - return True + # 检查循环次数限制 + if max_cycles > 0 and cycle_count >= max_cycles: + self.logger.info(f"Separation step completed: reached max_cycles={max_cycles}") + return True - # 读取传感器数据 - data = self.sensor.read_level() + # 读取传感器数据 + data = self.sensor.read_level() - if data is None: - error_count += 1 - if error_count > 5: - self.logger.warning("Sensor read failed multiple times, retrying...") - error_count = 0 - time.sleep(0.5) - continue + if data is None: + error_count += 1 + if error_count > 5: + self.logger.warning("Sensor read failed multiple times, retrying...") + error_count = 0 + time.sleep(0.5) + continue - error_count = 0 - current_level = data['level'] - rssi = data['rssi'] + error_count = 0 + current_level = data['level'] + rssi = data['rssi'] - # 检测状态变化 - if current_level != last_level and last_level is not None: - cycle_count += 1 + # 检测状态变化 (包括首次检测) + if current_level != last_level: + cycle_count += 1 - if current_level: - # 有液 -> 电机顺时针旋转 - self.logger.info(f"[Cycle {cycle_count}] Liquid detected (RSSI={rssi}), " - f"rotating motor {motor_id} clockwise {pulses} pulses") - motor.run_position(pulses=pulses, speed_rpm=speed, direction=0, absolute=False) + if current_level: + # 有液 -> 电机顺时针旋转 + self.logger.info(f"[Cycle {cycle_count}] Liquid detected (RSSI={rssi}), " + f"rotating motor {motor_id} clockwise {pulses} pulses") + motor.run_position(pulses=pulses, speed_rpm=speed, direction=0, absolute=False) - # 等待电机完成 (预估时间) - estimated_time = 15.0 / max(1, speed) - time.sleep(estimated_time + 0.5) + # 等待电机完成 (预估时间) + estimated_time = 15.0 / max(1, speed) + time.sleep(estimated_time + 0.5) - else: - # 无液 -> 电机逆时针旋转 - self.logger.info(f"[Cycle {cycle_count}] No liquid detected (RSSI={rssi}), " - f"rotating motor {motor_id} counter-clockwise {pulses} pulses") - motor.run_position(pulses=pulses, speed_rpm=speed, direction=1, absolute=False) + else: + # 无液 -> 电机逆时针旋转 + self.logger.info(f"[Cycle {cycle_count}] No liquid detected (RSSI={rssi}), " + f"rotating motor {motor_id} counter-clockwise {pulses} pulses") + motor.run_position(pulses=pulses, speed_rpm=speed, direction=1, absolute=False) - # 等待电机完成 (预估时间) - estimated_time = 15.0 / max(1, speed) - time.sleep(estimated_time + 0.5) + # 等待电机完成 (预估时间) + estimated_time = 15.0 / max(1, speed) + time.sleep(estimated_time + 0.5) - # 更新状态 - last_level = current_level + # 更新状态 + last_level = current_level - # 轮询间隔 - time.sleep(0.1) + # 轮询间隔 + time.sleep(0.1) + finally: + # 恢复轮询线程 + self.logger.info("Restarting polling thread...") + self._start_polling() def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool: """支持标准 JSON 指令调用"""