feat: enhance separation_step logic with polling thread management and error handling

This commit is contained in:
ZiWei
2026-01-27 12:37:09 +08:00
parent e11070315d
commit c4a3be1498

View File

@@ -655,6 +655,12 @@ class ChinweDevice(UniversalDriver):
motor = self.motors[motor_id]
# 停止轮询线程,避免与 separation_step 同时读取传感器造成串口冲突
self.logger.info("Stopping polling thread for separation_step...")
self._stop_event.set()
if self._poll_thread and self._poll_thread.is_alive():
self._poll_thread.join(timeout=2.0)
# 使能电机
self.logger.info(f"Enabling motor {motor_id}...")
motor.enable(True)
@@ -669,6 +675,7 @@ class ChinweDevice(UniversalDriver):
start_time = time.time()
error_count = 0
try:
while True:
# 检查超时
if time.time() - start_time > timeout:
@@ -695,8 +702,8 @@ class ChinweDevice(UniversalDriver):
current_level = data['level']
rssi = data['rssi']
# 检测状态变化
if current_level != last_level and last_level is not None:
# 检测状态变化 (包括首次检测)
if current_level != last_level:
cycle_count += 1
if current_level:
@@ -724,6 +731,10 @@ class ChinweDevice(UniversalDriver):
# 轮询间隔
time.sleep(0.1)
finally:
# 恢复轮询线程
self.logger.info("Restarting polling thread...")
self._start_polling()
def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool:
"""支持标准 JSON 指令调用"""