mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-05 14:05:12 +00:00
feat: add separation_step with sensor-motor linkage
This commit is contained in:
@@ -623,6 +623,108 @@ class ChinweDevice(UniversalDriver):
|
|||||||
time.sleep(duration)
|
time.sleep(duration)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def separation_step(self, motor_id: int = 5, speed: int = 60, pulses: int = 700,
|
||||||
|
max_cycles: int = 0, timeout: int = 300) -> bool:
|
||||||
|
"""
|
||||||
|
分液步骤 - 液位传感器与电机联动
|
||||||
|
当液位传感器检测到"有液"时,电机顺时针旋转指定脉冲数
|
||||||
|
当液位传感器检测到"无液"时,电机逆时针旋转指定脉冲数
|
||||||
|
|
||||||
|
:param motor_id: 电机ID (必须在初始化时配置的motor_ids中)
|
||||||
|
:param speed: 电机转速 (RPM)
|
||||||
|
:param pulses: 每次旋转的脉冲数 (默认700约为1/4圈,假设3200脉冲/圈)
|
||||||
|
:param max_cycles: 最大执行循环次数 (0=无限制,默认0)
|
||||||
|
:param timeout: 整体超时时间 (秒)
|
||||||
|
:return: 成功返回True,超时或失败返回False
|
||||||
|
"""
|
||||||
|
motor_id = int(motor_id)
|
||||||
|
speed = int(speed)
|
||||||
|
pulses = int(pulses)
|
||||||
|
max_cycles = int(max_cycles)
|
||||||
|
timeout = int(timeout)
|
||||||
|
|
||||||
|
# 检查电机是否存在
|
||||||
|
if motor_id not in self.motors:
|
||||||
|
self.logger.error(f"Motor {motor_id} not found in configured motors: {list(self.motors.keys())}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 检查传感器是否可用
|
||||||
|
if not self.sensor:
|
||||||
|
self.logger.error("Sensor not initialized")
|
||||||
|
return False
|
||||||
|
|
||||||
|
motor = self.motors[motor_id]
|
||||||
|
|
||||||
|
# 使能电机
|
||||||
|
self.logger.info(f"Enabling motor {motor_id}...")
|
||||||
|
motor.enable(True)
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
self.logger.info(f"Starting separation step: motor_id={motor_id}, speed={speed} RPM, "
|
||||||
|
f"pulses={pulses}, max_cycles={max_cycles}, timeout={timeout}s")
|
||||||
|
|
||||||
|
# 记录上一次的液位状态
|
||||||
|
last_level = None
|
||||||
|
cycle_count = 0
|
||||||
|
start_time = time.time()
|
||||||
|
error_count = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# 检查超时
|
||||||
|
if time.time() - start_time > timeout:
|
||||||
|
self.logger.warning(f"Separation step timeout after {timeout} seconds")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 检查循环次数限制
|
||||||
|
if max_cycles > 0 and cycle_count >= max_cycles:
|
||||||
|
self.logger.info(f"Separation step completed: reached max_cycles={max_cycles}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 读取传感器数据
|
||||||
|
data = self.sensor.read_level()
|
||||||
|
|
||||||
|
if data is None:
|
||||||
|
error_count += 1
|
||||||
|
if error_count > 5:
|
||||||
|
self.logger.warning("Sensor read failed multiple times, retrying...")
|
||||||
|
error_count = 0
|
||||||
|
time.sleep(0.5)
|
||||||
|
continue
|
||||||
|
|
||||||
|
error_count = 0
|
||||||
|
current_level = data['level']
|
||||||
|
rssi = data['rssi']
|
||||||
|
|
||||||
|
# 检测状态变化
|
||||||
|
if current_level != last_level and last_level is not None:
|
||||||
|
cycle_count += 1
|
||||||
|
|
||||||
|
if current_level:
|
||||||
|
# 有液 -> 电机顺时针旋转
|
||||||
|
self.logger.info(f"[Cycle {cycle_count}] Liquid detected (RSSI={rssi}), "
|
||||||
|
f"rotating motor {motor_id} clockwise {pulses} pulses")
|
||||||
|
motor.run_position(pulses=pulses, speed_rpm=speed, direction=0, absolute=False)
|
||||||
|
|
||||||
|
# 等待电机完成 (预估时间)
|
||||||
|
estimated_time = 15.0 / max(1, speed)
|
||||||
|
time.sleep(estimated_time + 0.5)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 无液 -> 电机逆时针旋转
|
||||||
|
self.logger.info(f"[Cycle {cycle_count}] No liquid detected (RSSI={rssi}), "
|
||||||
|
f"rotating motor {motor_id} counter-clockwise {pulses} pulses")
|
||||||
|
motor.run_position(pulses=pulses, speed_rpm=speed, direction=1, absolute=False)
|
||||||
|
|
||||||
|
# 等待电机完成 (预估时间)
|
||||||
|
estimated_time = 15.0 / max(1, speed)
|
||||||
|
time.sleep(estimated_time + 0.5)
|
||||||
|
|
||||||
|
# 更新状态
|
||||||
|
last_level = current_level
|
||||||
|
|
||||||
|
# 轮询间隔
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool:
|
def execute_command_from_outer(self, command_dict: Dict[str, Any]) -> bool:
|
||||||
"""支持标准 JSON 指令调用"""
|
"""支持标准 JSON 指令调用"""
|
||||||
return super().execute_command_from_outer(command_dict)
|
return super().execute_command_from_outer(command_dict)
|
||||||
|
|||||||
@@ -317,6 +317,47 @@ separator.chinwe:
|
|||||||
- port
|
- port
|
||||||
type: object
|
type: object
|
||||||
type: UniLabJsonCommand
|
type: UniLabJsonCommand
|
||||||
|
separation_step:
|
||||||
|
goal:
|
||||||
|
max_cycles: 0
|
||||||
|
motor_id: 5
|
||||||
|
pulses: 700
|
||||||
|
speed: 60
|
||||||
|
timeout: 300
|
||||||
|
handles: {}
|
||||||
|
schema:
|
||||||
|
description: 分液步骤 - 液位传感器与电机联动 (有液→顺时针, 无液→逆时针)
|
||||||
|
properties:
|
||||||
|
goal:
|
||||||
|
properties:
|
||||||
|
max_cycles:
|
||||||
|
default: 0
|
||||||
|
description: 最大循环次数 (0=无限制)
|
||||||
|
type: integer
|
||||||
|
motor_id:
|
||||||
|
default: '5'
|
||||||
|
description: 选择电机
|
||||||
|
enum:
|
||||||
|
- '4'
|
||||||
|
- '5'
|
||||||
|
title: '注: 4=搅拌, 5=旋钮'
|
||||||
|
type: string
|
||||||
|
pulses:
|
||||||
|
default: 700
|
||||||
|
description: 每次旋转脉冲数 (约1/4圈)
|
||||||
|
type: integer
|
||||||
|
speed:
|
||||||
|
default: 60
|
||||||
|
description: 电机转速 (RPM)
|
||||||
|
type: integer
|
||||||
|
timeout:
|
||||||
|
default: 300
|
||||||
|
description: 超时时间 (秒)
|
||||||
|
type: integer
|
||||||
|
required:
|
||||||
|
- motor_id
|
||||||
|
type: object
|
||||||
|
type: UniLabJsonCommand
|
||||||
wait_sensor_level:
|
wait_sensor_level:
|
||||||
goal:
|
goal:
|
||||||
target_state: 有液
|
target_state: 有液
|
||||||
|
|||||||
Reference in New Issue
Block a user