From f947658bbd30bf407ea02c1ab5e6d165a2a57afc Mon Sep 17 00:00:00 2001 From: KCFeng425 <2100011801@stu.pku.edu.cn> Date: Mon, 9 Jun 2025 23:47:23 +0800 Subject: [PATCH] Match mock device with action --- unilabos/devices/mock/mock_chiller.py | 109 +++--- unilabos/devices/mock/mock_filter.py | 199 ++++++---- unilabos/devices/mock/mock_heater.py | 185 +++++---- unilabos/devices/mock/mock_pump.py | 434 ++++++++++------------ unilabos/devices/mock/mock_rotavap.py | 65 +--- unilabos/devices/mock/mock_separator.py | 281 ++++++++++++-- unilabos/devices/mock/mock_stirrer.py | 179 +-------- unilabos/devices/mock/mock_stirrer_new.py | 229 ++++++++++++ 8 files changed, 980 insertions(+), 701 deletions(-) create mode 100644 unilabos/devices/mock/mock_stirrer_new.py diff --git a/unilabos/devices/mock/mock_chiller.py b/unilabos/devices/mock/mock_chiller.py index 46bb7f0..fbb823c 100644 --- a/unilabos/devices/mock/mock_chiller.py +++ b/unilabos/devices/mock/mock_chiller.py @@ -10,11 +10,15 @@ class MockChiller: self._status: str = "Idle" self._is_cooling: bool = False self._is_heating: bool = False - self._power_on: bool = False + self._vessel = "Unknown" + self._purpose = "Unknown" # 模拟温度变化的线程 self._temperature_thread = None - self._running = False + self._running = True + self._temperature_thread = threading.Thread(target=self._temperature_control_loop) + self._temperature_thread.daemon = True + self._temperature_thread.start() @property def current_temperature(self) -> float: @@ -31,11 +35,6 @@ class MockChiller: """设备状态 - 会被自动识别的设备属性""" return self._status - @property - def power_on(self) -> bool: - """电源状态""" - return self._power_on - @property def is_cooling(self) -> bool: """是否正在冷却""" @@ -46,17 +45,22 @@ class MockChiller: """是否正在加热""" return self._is_heating - def set_temperature(self, temperature: float): - """设置目标温度 - 需要在注册表添加的设备动作""" - if not self._power_on: - self._status = "Error: Power Off" - return False + @property + def vessel(self) -> str: + """当前操作的容器名称""" + return self._vessel - # 将传入温度转换为 float,并限制在允许范围内 - temperature = float(temperature) - self._target_temperature = temperature + @property + def purpose(self) -> str: + """当前操作目的""" + return self._purpose + + def heat_chill_start(self, vessel: str, temp: float, purpose: str): + """设置目标温度并记录容器和目的""" + self._vessel = str(vessel) + self._purpose = str(purpose) + self._target_temperature = float(temp) - # 立即更新状态 diff = self._target_temperature - self._current_temperature if abs(diff) < 0.1: self._status = "At Target Temperature" @@ -71,31 +75,37 @@ class MockChiller: self._is_heating = True self._is_cooling = False - # 启动温度控制 self._start_temperature_control() return True - - def power_on_off(self, power_state: str): - """开关机控制""" - if power_state == "on": - self._power_on = True - # 不在这里直接设置状态和加热/制冷标志 - self._start_temperature_control() - else: - self._power_on = False - self._status = "Power Off" - self._stop_temperature_control() - self._is_cooling = False - self._is_heating = False + + def heat_chill_stop(self, vessel: str): + """停止加热/制冷""" + if vessel != self._vessel: + return {"success": False, "status": f"Wrong vessel: expected {self._vessel}, got {vessel}"} + + # 停止温度控制线程,锁定当前温度 + self._stop_temperature_control() + + # 更新状态 + self._status = "Stopped" + self._is_cooling = False + self._is_heating = False + + # 重新启动线程但保持温度 + self._running = True + self._temperature_thread = threading.Thread(target=self._temperature_control_loop) + self._temperature_thread.daemon = True + self._temperature_thread.start() + + return {"success": True, "status": self._status} def _start_temperature_control(self): """启动温度控制线程""" - if self._power_on: # 移除 not self._running 检查 - self._running = True - if self._temperature_thread is None or not self._temperature_thread.is_alive(): - self._temperature_thread = threading.Thread(target=self._temperature_control_loop) - self._temperature_thread.daemon = True - self._temperature_thread.start() + self._running = True + if self._temperature_thread is None or not self._temperature_thread.is_alive(): + self._temperature_thread = threading.Thread(target=self._temperature_control_loop) + self._temperature_thread.daemon = True + self._temperature_thread.start() def _stop_temperature_control(self): """停止温度控制""" @@ -105,30 +115,30 @@ class MockChiller: def _temperature_control_loop(self): """温度控制循环 - 模拟真实冷却器的温度变化""" - while self._running and self._power_on: + while self._running: + # 如果状态是 Stopped,不改变温度 + if self._status == "Stopped": + time.sleep(1.0) + continue + temp_diff = self._target_temperature - self._current_temperature - if abs(temp_diff) < 0.1: # 将判断范围从0.5改小到0.1 + if abs(temp_diff) < 0.1: self._status = "At Target Temperature" self._is_cooling = False self._is_heating = False - elif temp_diff < 0: # 需要冷却 + elif temp_diff < 0: self._status = "Cooling" self._is_cooling = True self._is_heating = False - # 模拟冷却过程,每秒降低0.5度 self._current_temperature -= 0.5 - else: # 需要加热 + else: self._status = "Heating" self._is_heating = True self._is_cooling = False - # 模拟加热过程,每秒升高0.3度 self._current_temperature += 0.3 - # 限制温度范围 - self._current_temperature = max(-20.0, min(80.0, self._current_temperature)) - - time.sleep(1.0) # 每秒更新一次 + time.sleep(1.0) def emergency_stop(self): """紧急停止""" @@ -143,9 +153,10 @@ class MockChiller: "current_temperature": self._current_temperature, "target_temperature": self._target_temperature, "status": self._status, - "power_on": self._power_on, "is_cooling": self._is_cooling, "is_heating": self._is_heating, + "vessel": self._vessel, + "purpose": self._purpose, } @@ -155,12 +166,8 @@ if __name__ == "__main__": # 测试基本功能 print("启动冷却器测试...") - chiller.power_on_off("on") print(f"初始状态: {chiller.get_status_info()}") - # 设置目标温度为5度 - chiller.set_temperature(5.0) - # 模拟运行10秒 for i in range(10): time.sleep(1) diff --git a/unilabos/devices/mock/mock_filter.py b/unilabos/devices/mock/mock_filter.py index 6581ac4..f54e41e 100644 --- a/unilabos/devices/mock/mock_filter.py +++ b/unilabos/devices/mock/mock_filter.py @@ -4,64 +4,112 @@ import threading class MockFilter: def __init__(self, port: str = "MOCK"): + # 基本参数初始化 self.port = port self._status: str = "Idle" self._is_filtering: bool = False - self._filter_efficiency: float = 95.0 # 过滤效率百分比 - self._flow_rate: float = 0.0 # 流速 L/min - self._pressure_drop: float = 0.0 # 压降 Pa - self._filter_life: float = 100.0 # 滤芯寿命百分比 - self._power_on: bool = False - - # 模拟过滤过程的线程 + + # 过滤性能参数 + self._flow_rate: float = 1.0 # 流速(L/min) + self._pressure_drop: float = 0.0 # 压降(Pa) + self._filter_life: float = 100.0 # 滤芯寿命(%) + + # 过滤操作参数 + self._vessel: str = "" # 源容器 + self._filtrate_vessel: str = "" # 目标容器 + self._stir: bool = False # 是否搅拌 + self._stir_speed: float = 0.0 # 搅拌速度 + self._temperature: float = 25.0 # 温度(℃) + self._continue_heatchill: bool = False # 是否继续加热/制冷 + self._target_volume: float = 0.0 # 目标过滤体积(L) + self._filtered_volume: float = 0.0 # 已过滤体积(L) + self._progress: float = 0.0 # 过滤进度(%) + + # 线程控制 self._filter_thread = None self._running = False @property def status(self) -> str: - """设备状态 - 会被自动识别的设备属性""" return self._status @property def is_filtering(self) -> bool: - """是否正在过滤""" return self._is_filtering - @property - def filter_efficiency(self) -> float: - """过滤效率""" - return self._filter_efficiency - @property def flow_rate(self) -> float: - """流速""" return self._flow_rate @property def pressure_drop(self) -> float: - """压降""" return self._pressure_drop @property def filter_life(self) -> float: - """滤芯寿命""" return self._filter_life + # 新增 property + @property + def vessel(self) -> str: + return self._vessel @property - def power_on(self) -> bool: - """电源状态""" - return self._power_on + def filtrate_vessel(self) -> str: + return self._filtrate_vessel - def start_filtering(self, flow_rate: float = 1.0): - """开始过滤 - 需要在注册表添加的设备动作""" - if not self._power_on: - self._status = "Error: Power Off" - return False + @property + def filtered_volume(self) -> float: + return self._filtered_volume - self._flow_rate = flow_rate + @property + def progress(self) -> float: + return self._progress + + @property + def stir(self) -> bool: + return self._stir + + @property + def stir_speed(self) -> float: + return self._stir_speed + + @property + def temperature(self) -> float: + return self._temperature + + @property + def continue_heatchill(self) -> bool: + return self._continue_heatchill + + @property + def target_volume(self) -> float: + return self._target_volume + + def filter(self, vessel: str, filtrate_vessel: str, stir: bool = False, stir_speed: float = 0.0, temp: float = 25.0, continue_heatchill: bool = False, volume: float = 0.0) -> dict: + """新的过滤操作""" + # 停止任何正在进行的过滤 + if self._is_filtering: + self.stop_filtering() + # 验证参数 + if volume <= 0: + return {"success": False, "message": "Target volume must be greater than 0"} + # 设置新的过滤参数 + self._vessel = vessel + self._filtrate_vessel = filtrate_vessel + self._stir = stir + self._stir_speed = stir_speed + self._temperature = temp + self._continue_heatchill = continue_heatchill + self._target_volume = volume + # 重置过滤状态 + self._filtered_volume = 0.0 + self._progress = 0.0 self._status = "Starting Filter" + # 启动过滤过程 + self._flow_rate = 1.0 # 设置默认流速 self._start_filter_process() - return True + + return {"success": True, "message": "Filter started"} def stop_filtering(self): """停止过滤""" @@ -69,31 +117,18 @@ class MockFilter: self._stop_filter_process() self._flow_rate = 0.0 self._is_filtering = False - self._status = "Idle" + self._status = "Stopped" return True - def power_on_off(self, power_state: str): - """开关机控制""" - if power_state == "on": - self._power_on = True - self._status = "Power On" - else: - self._power_on = False - self._status = "Power Off" - self._stop_filter_process() - self._is_filtering = False - self._flow_rate = 0.0 - def replace_filter(self): """更换滤芯""" self._filter_life = 100.0 - self._filter_efficiency = 95.0 self._status = "Filter Replaced" return True def _start_filter_process(self): """启动过滤过程线程""" - if not self._running and self._power_on: + if not self._running: self._running = True self._is_filtering = True self._filter_thread = threading.Thread(target=self._filter_loop) @@ -107,24 +142,50 @@ class MockFilter: self._filter_thread.join(timeout=1.0) def _filter_loop(self): - """过滤过程循环 - 模拟真实过滤器的工作过程""" - while self._running and self._power_on and self._is_filtering: - self._status = "Filtering" - - # 模拟滤芯磨损 - if self._filter_life > 0: - self._filter_life -= 0.1 # 每秒减少0.1%寿命 - - # 根据滤芯寿命调整效率和压降 - life_factor = self._filter_life / 100.0 - self._filter_efficiency = 95.0 * life_factor + 50.0 * (1 - life_factor) - self._pressure_drop = 100.0 + (200.0 * (1 - life_factor)) # 压降随磨损增加 - - # 检查滤芯是否需要更换 - if self._filter_life <= 10.0: - self._status = "Filter Needs Replacement" - - time.sleep(1.0) # 每秒更新一次 + """过滤进程主循环""" + update_interval = 1.0 # 更新间隔(秒) + + while self._running and self._is_filtering: + try: + self._status = "Filtering" + + # 计算这一秒过滤的体积 (L/min -> L/s) + volume_increment = (self._flow_rate / 60.0) * update_interval + + # 更新已过滤体积 + self._filtered_volume += volume_increment + + # 更新进度 (避免除零错误) + if self._target_volume > 0: + self._progress = min(100.0, (self._filtered_volume / self._target_volume) * 100.0) + + # 更新滤芯寿命 (每过滤1L减少0.5%寿命) + self._filter_life = max(0.0, self._filter_life - (volume_increment * 0.5)) + + # 更新压降 (根据滤芯寿命和流速动态计算) + life_factor = self._filter_life / 100.0 # 将寿命转换为0-1的因子 + flow_factor = self._flow_rate / 2.0 # 将流速标准化(假设2L/min是标准流速) + base_pressure = 100.0 # 基础压降 + # 压降随滤芯寿命降低而增加,随流速增加而增加 + self._pressure_drop = base_pressure * (2 - life_factor) * flow_factor + + # 检查是否完成目标体积 + if self._target_volume > 0 and self._filtered_volume >= self._target_volume: + self._status = "Completed" + self._progress = 100.0 + self.stop_filtering() + break + + # 检查滤芯寿命 + if self._filter_life <= 10.0: + self._status = "Filter Needs Replacement" + + time.sleep(update_interval) + + except Exception as e: + print(f"Error in filter loop: {e}") + self.emergency_stop() + break def emergency_stop(self): """紧急停止""" @@ -134,15 +195,21 @@ class MockFilter: self._flow_rate = 0.0 def get_status_info(self) -> dict: - """获取完整状态信息""" + """扩展的状态信息""" return { "status": self._status, "is_filtering": self._is_filtering, - "filter_efficiency": self._filter_efficiency, "flow_rate": self._flow_rate, "pressure_drop": self._pressure_drop, "filter_life": self._filter_life, - "power_on": self._power_on, + "vessel": self._vessel, + "filtrate_vessel": self._filtrate_vessel, + "filtered_volume": self._filtered_volume, + "target_volume": self._target_volume, + "progress": self._progress, + "temperature": self._temperature, + "stir": self._stir, + "stir_speed": self._stir_speed } @@ -152,17 +219,15 @@ if __name__ == "__main__": # 测试基本功能 print("启动过滤器测试...") - filter_device.power_on_off("on") print(f"初始状态: {filter_device.get_status_info()}") - # 开始过滤 - filter_device.start_filtering(2.0) + # 模拟运行10秒 for i in range(10): time.sleep(1) print( - f"第{i+1}秒: 效率={filter_device.filter_efficiency:.1f}%, " + f"第{i+1}秒: " f"寿命={filter_device.filter_life:.1f}%, 状态={filter_device.status}" ) diff --git a/unilabos/devices/mock/mock_heater.py b/unilabos/devices/mock/mock_heater.py index 6d9abff..47dd8d8 100644 --- a/unilabos/devices/mock/mock_heater.py +++ b/unilabos/devices/mock/mock_heater.py @@ -1,7 +1,6 @@ import time import threading - class MockHeater: def __init__(self, port: str = "MOCK"): self.port = port @@ -9,13 +8,21 @@ class MockHeater: self._target_temperature: float = 25.0 self._status: str = "Idle" self._is_heating: bool = False - self._power_on: bool = False self._heating_power: float = 0.0 # 加热功率百分比 0-100 self._max_temperature: float = 300.0 # 最大加热温度 + + # 新增加的属性 + self._vessel: str = "Unknown" + self._purpose: str = "Unknown" + self._stir: bool = False + self._stir_speed: float = 0.0 # 模拟加热过程的线程 self._heating_thread = None - self._running = False + self._running = True + self._heating_thread = threading.Thread(target=self._heating_control_loop) + self._heating_thread.daemon = True + self._heating_thread.start() @property def current_temperature(self) -> float: @@ -32,11 +39,6 @@ class MockHeater: """设备状态 - 会被自动识别的设备属性""" return self._status - @property - def power_on(self) -> bool: - """电源状态""" - return self._power_on - @property def is_heating(self) -> bool: """是否正在加热""" @@ -51,6 +53,79 @@ class MockHeater: def max_temperature(self) -> float: """最大加热温度""" return self._max_temperature + + @property + def vessel(self) -> str: + """当前操作的容器名称""" + return self._vessel + + @property + def purpose(self) -> str: + """操作目的""" + return self._purpose + + @property + def stir(self) -> bool: + """是否搅拌""" + return self._stir + + @property + def stir_speed(self) -> float: + """搅拌速度""" + return self._stir_speed + + def heat_chill_start(self, vessel: str, temp: float, purpose: str) -> dict: + """开始加热/制冷过程""" + self._vessel = str(vessel) + self._purpose = str(purpose) + self._target_temperature = float(temp) + + diff = self._target_temperature - self._current_temperature + if abs(diff) < 0.1: + self._status = "At Target Temperature" + self._is_heating = False + elif diff > 0: + self._status = "Heating" + self._is_heating = True + else: + self._status = "Cooling Down" + self._is_heating = False + + return {"success": True, "status": self._status} + + def heat_chill_stop(self, vessel: str) -> dict: + """停止加热/制冷""" + if vessel != self._vessel: + return {"success": False, "status": f"Wrong vessel: expected {self._vessel}, got {vessel}"} + + self._status = "Stopped" + self._is_heating = False + self._heating_power = 0.0 + + return {"success": True, "status": self._status} + + def heat_chill(self, vessel: str, temp: float, time: float, + stir: bool = False, stir_speed: float = 0.0, + purpose: str = "Unknown") -> dict: + """完整的加热/制冷控制""" + self._vessel = str(vessel) + self._target_temperature = float(temp) + self._purpose = str(purpose) + self._stir = stir + self._stir_speed = stir_speed + + diff = self._target_temperature - self._current_temperature + if abs(diff) < 0.1: + self._status = "At Target Temperature" + self._is_heating = False + elif diff > 0: + self._status = "Heating" + self._is_heating = True + else: + self._status = "Cooling Down" + self._is_heating = False + + return {"success": True, "status": self._status} def set_temperature(self, temperature: float): """设置目标温度 - 需要在注册表添加的设备动作""" @@ -60,10 +135,6 @@ class MockHeater: self._status = "Error: Invalid temperature value" return False - if not self._power_on: - self._status = "Error: Power Off" - return False - if temperature > self._max_temperature: self._status = f"Error: Temperature exceeds maximum ({self._max_temperature}°C)" return False @@ -83,33 +154,12 @@ class MockHeater: self._status = "Error: Invalid power value" return False - if not self._power_on: - self._status = "Error: Power Off" - return False - self._heating_power = max(0.0, min(100.0, power)) # 限制在0-100% return True - def power_on_off(self, power_state: str): - """开关机控制,接收字符串命令 "On" 或 "Off" """ - power_state = power_state.capitalize() - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - self._power_on = True if power_state == "On" else False - if self._power_on: - self._status = "Power On" - self._start_heating_control() - else: - self._status = "Power Off" - self._stop_heating_control() - self._is_heating = False - self._heating_power = 0.0 - return "Success" - def _start_heating_control(self): """启动加热控制线程""" - if not self._running and self._power_on: + if not self._running: self._running = True self._heating_thread = threading.Thread(target=self._heating_control_loop) self._heating_thread.daemon = True @@ -122,41 +172,31 @@ class MockHeater: self._heating_thread.join(timeout=1.0) def _heating_control_loop(self): - """加热控制循环 - 模拟真实加热器的温度变化""" - while self._running and self._power_on: + """加热控制循环""" + while self._running: + # 如果状态是 Stopped,不改变温度 + if self._status == "Stopped": + time.sleep(1.0) + continue + temp_diff = self._target_temperature - self._current_temperature - if abs(temp_diff) < 0.5: # 温度接近目标值 + if abs(temp_diff) < 0.1: self._status = "At Target Temperature" self._is_heating = False - self._heating_power = 10.0 # 维持温度的最小功率 - elif temp_diff > 0: # 需要加热 + self._heating_power = 10.0 + elif temp_diff > 0: self._status = "Heating" self._is_heating = True - # 根据温差调整加热功率 - if temp_diff > 50: - self._heating_power = 100.0 - elif temp_diff > 20: - self._heating_power = 80.0 - elif temp_diff > 10: - self._heating_power = 60.0 - else: - self._heating_power = 40.0 - - # 模拟加热过程,加热速度与功率成正比 - heating_rate = self._heating_power / 100.0 * 2.0 # 最大每秒升温2度 - self._current_temperature += heating_rate - else: # 目标温度低于当前温度,自然冷却 + self._heating_power = min(100.0, abs(temp_diff) * 2) + self._current_temperature += 0.5 + else: self._status = "Cooling Down" self._is_heating = False self._heating_power = 0.0 - # 模拟自然冷却,每秒降低0.2度 self._current_temperature -= 0.2 - # 限制温度范围 - self._current_temperature = max(20.0, min(self._max_temperature, self._current_temperature)) - - time.sleep(1.0) # 每秒更新一次 + time.sleep(1.0) def emergency_stop(self): """紧急停止""" @@ -171,32 +211,37 @@ class MockHeater: "current_temperature": self._current_temperature, "target_temperature": self._target_temperature, "status": self._status, - "power_on": self._power_on, "is_heating": self._is_heating, "heating_power": self._heating_power, "max_temperature": self._max_temperature, + "vessel": self._vessel, + "purpose": self._purpose, + "stir": self._stir, + "stir_speed": self._stir_speed } - # 用于测试的主函数 if __name__ == "__main__": heater = MockHeater() - # 测试基本功能 print("启动加热器测试...") - heater.power_on_off("On") print(f"初始状态: {heater.get_status_info()}") # 设置目标温度为80度 heater.set_temperature(80.0) # 模拟运行15秒 - for i in range(15): - time.sleep(1) - print( - f"第{i+1}秒: 当前温度={heater.current_temperature:.1f}°C, 功率={heater.heating_power:.1f}%, " - f"状态={heater.status}" - ) + try: + for i in range(15): + time.sleep(1) + status = heater.get_status_info() + print( + f"\r温度: {status['current_temperature']:.1f}°C / {status['target_temperature']:.1f}°C | " + f"功率: {status['heating_power']:.1f}% | 状态: {status['status']}", + end="" + ) + except KeyboardInterrupt: + heater.emergency_stop() + print("\n测试被手动停止") - heater.emergency_stop() - print("测试完成") + print("\n测试完成") \ No newline at end of file diff --git a/unilabos/devices/mock/mock_pump.py b/unilabos/devices/mock/mock_pump.py index 68acd85..43cbf00 100644 --- a/unilabos/devices/mock/mock_pump.py +++ b/unilabos/devices/mock/mock_pump.py @@ -1,28 +1,14 @@ import time import threading - +from datetime import datetime, timedelta class MockPump: - """ - 模拟泵设备类 - - 这个类模拟了一个实验室泵设备的行为,包括流量控制、压力监测、 - 运行状态管理等功能。所有的控制参数都使用字符串类型以提供更好的 - 可读性和扩展性。 - """ - def __init__(self, port: str = "MOCK"): - """ - 初始化MockPump实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ self.port = port # 设备基本状态属性 + self._current_device = "MockPump1" # 设备标识符 self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off self._pump_state: str = "Stopped" # 泵运行状态:Running, Stopped, Paused # 流量相关属性 @@ -35,24 +21,39 @@ class MockPump: self._pressure: float = 0.0 # 当前压力 (bar) self._max_pressure: float = 10.0 # 最大压力 (bar) - # 方向控制属性 - self._direction: str = "Forward" # 泵方向:Forward, Reverse - # 运行控制线程 self._pump_thread = None self._running = False self._thread_lock = threading.Lock() + # 新增 PumpTransfer 相关属性 + self._from_vessel: str = "" + self._to_vessel: str = "" + self._transfer_volume: float = 0.0 + self._amount: str = "" + self._transfer_time: float = 0.0 + self._is_viscous: bool = False + self._rinsing_solvent: str = "" + self._rinsing_volume: float = 0.0 + self._rinsing_repeats: int = 0 + self._is_solid: bool = False + + # 时间追踪 + self._start_time: datetime = None + self._time_spent: timedelta = timedelta() + self._time_remaining: timedelta = timedelta() + # ==================== 状态属性 ==================== # 这些属性会被Uni-Lab系统自动识别并定时对外广播 @property def status(self) -> str: return self._status - + @property - def power_state(self) -> str: - return self._power_state + def current_device(self) -> str: + """当前设备标识符""" + return self._current_device @property def pump_state(self) -> str: @@ -74,10 +75,6 @@ class MockPump: def total_volume(self) -> float: return self._total_volume - @property - def direction(self) -> str: - return self._direction - @property def max_flow_rate(self) -> float: return self._max_flow_rate @@ -85,115 +82,121 @@ class MockPump: @property def max_pressure(self) -> float: return self._max_pressure + + # 添加新的属性访问器 + @property + def from_vessel(self) -> str: + return self._from_vessel + + @property + def to_vessel(self) -> str: + return self._to_vessel + + @property + def transfer_volume(self) -> float: + return self._transfer_volume + + @property + def amount(self) -> str: + return self._amount + + @property + def transfer_time(self) -> float: + return self._transfer_time + + @property + def is_viscous(self) -> bool: + return self._is_viscous + + @property + def rinsing_solvent(self) -> str: + return self._rinsing_solvent + + @property + def rinsing_volume(self) -> float: + return self._rinsing_volume + + @property + def rinsing_repeats(self) -> int: + return self._rinsing_repeats + + @property + def is_solid(self) -> bool: + return self._is_solid + + # 修改这两个属性装饰器 + @property + def time_spent(self) -> float: + """已用时间(秒)""" + if isinstance(self._time_spent, timedelta): + return self._time_spent.total_seconds() + return float(self._time_spent) + + @property + def time_remaining(self) -> float: + """剩余时间(秒)""" + if isinstance(self._time_remaining, timedelta): + return self._time_remaining.total_seconds() + return float(self._time_remaining) # ==================== 设备控制方法 ==================== # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - else: - self._status = "Power Off" - # 关机时停止所有运行 - self.stop_pump() - - return "Success" - - def set_flow_rate(self, flow_rate: float) -> str: - """ - 设置目标流速 - - Args: - flow_rate (float): 目标流速 (mL/min) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - try: - flow_rate = float(flow_rate) - except ValueError: - self._status = "Error: Invalid flow rate" - return "Error" - - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if flow_rate < 0 or flow_rate > self._max_flow_rate: - self._status = f"Error: Flow rate out of range (0-{self._max_flow_rate})" - return "Error" - - self._target_flow_rate = flow_rate - self._status = "Setting Flow Rate" - - # 如果设置了非零流速,自动启动泵 - if flow_rate > 0: - # 自动切换泵状态为 "Running" 以触发操作循环 - self._pump_state = "Running" - self._start_pump_operation() - else: - self.stop_pump() - - return "Success" - - def start_pump(self) -> str: - """ - 启动泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if self._target_flow_rate <= 0: - self._status = "Error: No target flow rate set" - return "Error" - - self._pump_state = "Running" - self._status = "Starting Pump" - self._start_pump_operation() - - return "Success" - - def stop_pump(self) -> str: - """ - 停止泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._pump_state = "Stopped" - self._status = "Stopping Pump" + def pump_transfer(self, from_vessel: str, to_vessel: str, volume: float, + amount: str = "", time: float = 0.0, viscous: bool = False, + rinsing_solvent: str = "", rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, solid: bool = False) -> dict: + """Execute pump transfer operation""" + # Stop any existing operation first self._stop_pump_operation() - self._flow_rate = 0.0 - self._pressure = 0.0 + + # Set transfer parameters + self._from_vessel = from_vessel + self._to_vessel = to_vessel + self._transfer_volume = float(volume) + self._amount = amount + self._transfer_time = float(time) + self._is_viscous = viscous + self._rinsing_solvent = rinsing_solvent + self._rinsing_volume = float(rinsing_volume) + self._rinsing_repeats = int(rinsing_repeats) + self._is_solid = solid - return "Success" + # Calculate flow rate + if self._transfer_time > 0 and self._transfer_volume > 0: + self._target_flow_rate = (self._transfer_volume / self._transfer_time) * 60.0 + else: + self._target_flow_rate = 10.0 if not self._is_viscous else 5.0 + + # Reset timers and counters + self._start_time = datetime.now() + self._time_spent = timedelta() + self._time_remaining = timedelta(seconds=self._transfer_time) + self._total_volume = 0.0 + self._flow_rate = 0.0 + + # Start pump operation + self._pump_state = "Running" + self._status = "Starting Transfer" + self._running = True + + # Start pump operation thread + self._pump_thread = threading.Thread(target=self._pump_operation_loop) + self._pump_thread.daemon = True + self._pump_thread.start() + + # Wait briefly to ensure thread starts + time.sleep(0.1) + + return { + "success": True, + "status": self._status, + "current_device": self._current_device, + "time_spent": 0.0, + "time_remaining": float(self._transfer_time) + } def pause_pump(self) -> str: - """ - 暂停泵运行 - Returns: - str: 操作结果状态 ("Success", "Error") - """ if self._pump_state != "Running": self._status = "Error: Pump not running" return "Error" @@ -205,73 +208,23 @@ class MockPump: return "Success" def resume_pump(self) -> str: - """ - 恢复泵运行 - Returns: - str: 操作结果状态 ("Success", "Error") - """ if self._pump_state != "Paused": self._status = "Error: Pump not paused" return "Error" - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - self._pump_state = "Running" self._status = "Resuming Pump" self._start_pump_operation() return "Success" - def set_direction(self, direction: str = "Forward") -> str: - """ - 设置泵方向 - - Args: - direction (str): 泵方向,可选值:"Forward", "Reverse" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if direction not in ["Forward", "Reverse"]: - self._status = "Error: Invalid direction" - return "Error" - - # 如果泵正在运行,需要先停止 - was_running = self._pump_state == "Running" - if was_running: - self.stop_pump() - time.sleep(0.5) # 等待停止完成 - - self._direction = direction - self._status = f"Direction set to {direction}" - - # 如果之前在运行,重新启动 - if was_running: - self.start_pump() - - return "Success" - def reset_volume_counter(self) -> str: - """ - 重置累计流量计数器 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ self._total_volume = 0.0 self._status = "Volume counter reset" return "Success" def emergency_stop(self) -> str: - """ - 紧急停止 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ self._status = "Emergency Stop" self._pump_state = "Stopped" self._stop_pump_operation() @@ -284,84 +237,72 @@ class MockPump: # ==================== 内部控制方法 ==================== def _start_pump_operation(self): - """ - 启动泵运行线程 - - 这个方法启动一个后台线程来模拟泵的实际运行过程, - 包括流速控制、压力变化和累计流量计算。 - """ with self._thread_lock: - if not self._running and self._power_state == "On": + if not self._running: self._running = True self._pump_thread = threading.Thread(target=self._pump_operation_loop) self._pump_thread.daemon = True self._pump_thread.start() def _stop_pump_operation(self): - """ - 停止泵运行线程 - - 安全地停止后台运行线程并等待其完成。 - """ with self._thread_lock: self._running = False if self._pump_thread and self._pump_thread.is_alive(): self._pump_thread.join(timeout=2.0) def _pump_operation_loop(self): - """ - 泵运行主循环 - - 这个方法在后台线程中运行,模拟真实泵的工作过程: - 1. 逐步调整流速到目标值 - 2. 根据流速计算压力 - 3. 累计流量统计 - 4. 状态更新 - """ - while self._running and self._power_state == "On" and self._pump_state == "Running": + """泵运行主循环""" + print("Pump operation loop started") # Debug print + + while self._running and self._pump_state == "Running": try: - # 模拟流速调节过程(逐步接近目标流速) + # Calculate flow rate adjustment flow_diff = self._target_flow_rate - self._flow_rate - - if abs(flow_diff) < 0.1: # 流速接近目标值 - self._flow_rate = self._target_flow_rate - self._status = "At Target Flow Rate" + + # Adjust flow rate more aggressively (50% of difference) + adjustment = flow_diff * 0.5 + self._flow_rate += adjustment + + # Ensure flow rate is within bounds + self._flow_rate = max(0.1, min(self._max_flow_rate, self._flow_rate)) + + # Update status based on flow rate + if abs(flow_diff) < 0.1: + self._status = "Running at Target Flow Rate" else: - # 模拟流速调节,每秒调整10%的差值 - adjustment = flow_diff * 0.1 - self._flow_rate += adjustment self._status = "Adjusting Flow Rate" - # 确保流速在合理范围内 - self._flow_rate = max(0.0, min(self._max_flow_rate, self._flow_rate)) + # Calculate volume increment + volume_increment = (self._flow_rate / 60.0) # mL/s + self._total_volume += volume_increment - # 模拟压力变化(压力与流速成正比,加上一些随机波动) - base_pressure = (self._flow_rate / self._max_flow_rate) * self._max_pressure - pressure_variation = 0.1 * base_pressure * (time.time() % 1.0 - 0.5) # ±5%波动 - self._pressure = max(0.0, base_pressure + pressure_variation) + # Update time tracking + self._time_spent = datetime.now() - self._start_time + if self._transfer_time > 0: + remaining = self._transfer_time - self._time_spent.total_seconds() + self._time_remaining = timedelta(seconds=max(0, remaining)) - # 累计流量计算(每秒更新) - if self._flow_rate > 0: - volume_increment = self._flow_rate / 60.0 # 转换为mL/s - if self._direction == "Reverse": - volume_increment = -volume_increment - self._total_volume += volume_increment + # Check completion + if self._total_volume >= self._transfer_volume: + self._status = "Transfer Completed" + self._pump_state = "Stopped" + self._running = False + break - # 压力保护检查 - if self._pressure > self._max_pressure * 0.95: - self._status = "Warning: High Pressure" + # Update pressure + self._pressure = (self._flow_rate / self._max_flow_rate) * self._max_pressure - # 等待1秒后继续下一次循环 + print(f"Debug - Flow: {self._flow_rate:.1f}, Volume: {self._total_volume:.1f}") # Debug print + time.sleep(1.0) except Exception as e: - self._status = f"Error in pump operation: {str(e)}" + print(f"Error in pump operation: {str(e)}") + self._status = "Error in pump operation" + self._pump_state = "Stopped" + self._running = False break - # 循环结束时的清理工作 - if self._pump_state == "Running": - self._status = "Idle" - def get_status_info(self) -> dict: """ 获取完整的设备状态信息 @@ -370,16 +311,27 @@ class MockPump: dict: 包含所有设备状态的字典 """ return { - "status": self._status, - "power_state": self._power_state, - "pump_state": self._pump_state, - "flow_rate": self._flow_rate, - "target_flow_rate": self._target_flow_rate, - "pressure": self._pressure, - "total_volume": self._total_volume, - "direction": self._direction, - "max_flow_rate": self._max_flow_rate, - "max_pressure": self._max_pressure, + "status": self._status, + "pump_state": self._pump_state, + "flow_rate": self._flow_rate, + "target_flow_rate": self._target_flow_rate, + "pressure": self._pressure, + "total_volume": self._total_volume, + "max_flow_rate": self._max_flow_rate, + "max_pressure": self._max_pressure, + "current_device": self._current_device, + "from_vessel": self._from_vessel, + "to_vessel": self._to_vessel, + "transfer_volume": self._transfer_volume, + "amount": self._amount, + "transfer_time": self._transfer_time, + "is_viscous": self._is_viscous, + "rinsing_solvent": self._rinsing_solvent, + "rinsing_volume": self._rinsing_volume, + "rinsing_repeats": self._rinsing_repeats, + "is_solid": self._is_solid, + "time_spent": self._time_spent.total_seconds(), + "time_remaining": self._time_remaining.total_seconds() } @@ -389,7 +341,6 @@ if __name__ == "__main__": # 测试基本功能 print("启动泵设备测试...") - pump.power_control("On") print(f"初始状态: {pump.get_status_info()}") # 设置流速并启动 @@ -403,12 +354,7 @@ if __name__ == "__main__": # 测试方向切换 print("切换泵方向...") - pump.set_direction("Reverse") - # 继续运行5秒 - for i in range(5): - time.sleep(1) - print(f"反向第{i+1}秒: 累计流量={pump.total_volume:.1f}mL, 方向={pump.direction}") pump.emergency_stop() print("测试完成") diff --git a/unilabos/devices/mock/mock_rotavap.py b/unilabos/devices/mock/mock_rotavap.py index bd2f2f8..9b2ea91 100644 --- a/unilabos/devices/mock/mock_rotavap.py +++ b/unilabos/devices/mock/mock_rotavap.py @@ -22,7 +22,6 @@ class MockRotavap: # 设备基本状态属性 self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off # 旋转相关属性 self._rotate_state: str = "Stopped" # 旋转状态:Running, Stopped @@ -56,10 +55,6 @@ class MockRotavap: def status(self) -> str: return self._status - @property - def power_state(self) -> str: - return self._power_state - @property def rotate_state(self) -> str: return self._rotate_state @@ -95,33 +90,6 @@ class MockRotavap: # ==================== 设备控制方法 ==================== # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - self.success = "False" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - self._start_operation() - else: - self._status = "Power Off" - self.stop_all_operations() - - self.success = "True" - return "Success" - def set_timer(self, command: str) -> str: """ 设置定时器 - 兼容现有RotavapOne接口 @@ -132,10 +100,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - self.success = "False" - return "Error" try: timer = json.loads(command) @@ -165,9 +129,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" self.success = "False" self._rotate_time = max(0.0, float(time_seconds)) @@ -185,9 +146,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" self.success = "False" self._pump_time = max(0.0, float(time_seconds)) @@ -205,9 +163,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if speed < 0 or speed > self._max_rotate_speed: self._status = f"Error: Speed out of range (0-{self._max_rotate_speed})" @@ -227,9 +182,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if temperature < 0 or temperature > self._max_temperature: self._status = f"Error: Temperature out of range (0-{self._max_temperature})" @@ -237,6 +189,10 @@ class MockRotavap: self._target_temperature = temperature self._status = "Temperature set" + + # 启动操作线程以开始温度控制 + self._start_operation() + return "Success" def start_rotation(self) -> str: @@ -246,9 +202,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if self._rotate_time <= 0: self._status = "Error: No rotate time set" @@ -265,9 +218,6 @@ class MockRotavap: Returns: str: 操作结果状态 ("Success", "Error") """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if self._pump_time <= 0: self._status = "Error: No pump time set" @@ -313,7 +263,7 @@ class MockRotavap: 这个方法启动一个后台线程来模拟旋蒸的实际运行过程。 """ with self._thread_lock: - if not self._running and self._power_state == "On": + if not self._running: self._running = True self._operation_thread = threading.Thread(target=self._operation_loop) self._operation_thread.daemon = True @@ -340,7 +290,7 @@ class MockRotavap: 3. 真空度控制 4. 状态更新 """ - while self._running and self._power_state == "On": + while self._running: try: # 处理旋转时间倒计时 if self._rotate_time > 0: @@ -385,7 +335,6 @@ class MockRotavap: break # 循环结束时的清理工作 - if self._power_state == "On": self._status = "Idle" def get_status_info(self) -> dict: @@ -397,7 +346,6 @@ class MockRotavap: """ return { "status": self._status, - "power_state": self._power_state, "rotate_state": self._rotate_state, "rotate_time": self._rotate_time, "rotate_speed": self._rotate_speed, @@ -416,7 +364,6 @@ if __name__ == "__main__": # 测试基本功能 print("启动旋转蒸发器测试...") - rotavap.power_control("On") print(f"初始状态: {rotavap.get_status_info()}") # 设置定时器 diff --git a/unilabos/devices/mock/mock_separator.py b/unilabos/devices/mock/mock_separator.py index fdda4ba..222cb2e 100644 --- a/unilabos/devices/mock/mock_separator.py +++ b/unilabos/devices/mock/mock_separator.py @@ -1,13 +1,12 @@ import time import threading - +from datetime import datetime, timedelta class MockSeparator: def __init__(self, port: str = "MOCK"): self.port = port # 基本状态属性 - self._power_state: str = "Off" # 电源:On 或 Off self._status: str = "Idle" # 当前总体状态 self._valve_state: str = "Closed" # 阀门状态:Open 或 Closed self._settling_time: float = 0.0 # 静置时间(秒) @@ -19,10 +18,33 @@ class MockSeparator: # 用于后台模拟 shake 动作 self._operation_thread = None self._thread_lock = threading.Lock() + self._running = False + + # Separate action 相关属性 + self._current_device: str = "MockSeparator1" + self._purpose: str = "" # wash or extract + self._product_phase: str = "" # top or bottom + self._from_vessel: str = "" + self._separation_vessel: str = "" + self._to_vessel: str = "" + self._waste_phase_to_vessel: str = "" + self._solvent: str = "" + self._solvent_volume: float = 0.0 + self._through: str = "" + self._repeats: int = 1 + self._stir_time: float = 0.0 + self._stir_speed: float = 0.0 + self._time_spent = timedelta() + self._time_remaining = timedelta() + self._start_time = datetime.now() # 添加这一行 @property - def power_state(self) -> str: - return self._power_state + def current_device(self) -> str: + return self._current_device + + @property + def purpose(self) -> str: + return self._purpose @property def valve_state(self) -> str: @@ -45,22 +67,144 @@ class MockSeparator: def shake_status(self) -> str: with self._thread_lock: return self._shake_status + + @property + def product_phase(self) -> str: + return self._product_phase - def power_control(self, power_state: str) -> str: - """ - 电源控制:只接受 "On" 或 "Off" - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" + @property + def from_vessel(self) -> str: + return self._from_vessel - self._power_state = power_state - if power_state == "On": - self._status = "Powered On" - else: - self._status = "Powered Off" - self.stop_operations() - return "Success" + @property + def separation_vessel(self) -> str: + return self._separation_vessel + + @property + def to_vessel(self) -> str: + return self._to_vessel + + @property + def waste_phase_to_vessel(self) -> str: + return self._waste_phase_to_vessel + + @property + def solvent(self) -> str: + return self._solvent + + @property + def solvent_volume(self) -> float: + return self._solvent_volume + + @property + def through(self) -> str: + return self._through + + @property + def repeats(self) -> int: + return self._repeats + + @property + def stir_time(self) -> float: + return self._stir_time + + @property + def stir_speed(self) -> float: + return self._stir_speed + + @property + def time_spent(self) -> float: + if self._running: + self._time_spent = datetime.now() - self._start_time + return self._time_spent.total_seconds() + + @property + def time_remaining(self) -> float: + if self._running: + elapsed = (datetime.now() - self._start_time).total_seconds() + total_time = (self._stir_time + self._settling_time + 10) * self._repeats + remain = max(0, total_time - elapsed) + self._time_remaining = timedelta(seconds=remain) + return self._time_remaining.total_seconds() + + def separate(self, purpose: str, product_phase: str, from_vessel: str, + separation_vessel: str, to_vessel: str, waste_phase_to_vessel: str = "", + solvent: str = "", solvent_volume: float = 0.0, through: str = "", + repeats: int = 1, stir_time: float = 0.0, stir_speed: float = 0.0, + settling_time: float = 60.0) -> dict: + """ + 执行分离操作 + """ + with self._thread_lock: + # 检查是否已经在运行 + if self._running: + return { + "success": False, + "status": "Error: Operation already in progress" + } + # 必填参数验证 + if not all([from_vessel, separation_vessel, to_vessel]): + self._status = "Error: Missing required vessel parameters" + return {"success": False} + # 验证参数 + if purpose not in ["wash", "extract"]: + self._status = "Error: Invalid purpose" + return {"success": False} + + if product_phase not in ["top", "bottom"]: + self._status = "Error: Invalid product phase" + return {"success": False} + # 数值参数验证 + try: + solvent_volume = float(solvent_volume) + repeats = int(repeats) + stir_time = float(stir_time) + stir_speed = float(stir_speed) + settling_time = float(settling_time) + except ValueError: + self._status = "Error: Invalid numeric parameters" + return {"success": False} + + # 设置参数 + self._purpose = purpose + self._product_phase = product_phase + self._from_vessel = from_vessel + self._separation_vessel = separation_vessel + self._to_vessel = to_vessel + self._waste_phase_to_vessel = waste_phase_to_vessel + self._solvent = solvent + self._solvent_volume = float(solvent_volume) + self._through = through + self._repeats = int(repeats) + self._stir_time = float(stir_time) + self._stir_speed = float(stir_speed) + self._settling_time = float(settling_time) + + # 重置计时器 + self._start_time = datetime.now() + self._time_spent = timedelta() + total_time = (self._stir_time + self._settling_time + 10) * self._repeats + self._time_remaining = timedelta(seconds=total_time) + + # 启动分离操作 + self._status = "Starting Separation" + self._running = True + + # 在锁内创建和启动线程 + self._operation_thread = threading.Thread(target=self._operation_loop) + self._operation_thread.daemon = True + self._operation_thread.start() + + # 等待确认操作已经开始 + time.sleep(0.1) # 短暂等待确保操作线程已启动 + + return { + "success": True, + "status": self._status, + "current_device": self._current_device, + "time_spent": self._time_spent.total_seconds(), + "time_remaining": self._time_remaining.total_seconds() + } def shake(self, shake_time: float) -> str: """ @@ -75,10 +219,6 @@ class MockSeparator: self._status = "Error: Invalid shake time" return "Error" - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - with self._thread_lock: self._status = "Shaking" self._settling_time = 0.0 @@ -115,9 +255,6 @@ class MockSeparator: """ 阀门控制命令:传入 "open" 或 "close" """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" command = command.lower() if command == "open": @@ -130,30 +267,109 @@ class MockSeparator: self._status = "Error: Invalid valve command" return "Error" return "Success" + + def _operation_loop(self): + """分离操作主循环""" + try: + current_repeat = 1 + + # 立即更新状态,确保不会停留在Starting Separation + with self._thread_lock: + self._status = f"Separation Cycle {current_repeat}/{self._repeats}" + + while self._running and current_repeat <= self._repeats: + # 第一步:搅拌 + if self._stir_time > 0: + with self._thread_lock: + self._status = f"Stirring (Repeat {current_repeat}/{self._repeats})" + remaining_stir = self._stir_time + while remaining_stir > 0 and self._running: + time.sleep(1) + remaining_stir -= 1 + + # 第二步:静置 + if self._settling_time > 0: + with self._thread_lock: + self._status = f"Settling (Repeat {current_repeat}/{self._repeats})" + remaining_settle = self._settling_time + while remaining_settle > 0 and self._running: + time.sleep(1) + remaining_settle -= 1 + + # 第三步:打开阀门排出 + with self._thread_lock: + self._valve_state = "Open" + self._status = f"Draining (Repeat {current_repeat}/{self._repeats})" + + # 模拟排出时间(5秒) + time.sleep(10) + + # 关闭阀门 + with self._thread_lock: + self._valve_state = "Closed" + + # 检查是否继续下一次重复 + if current_repeat < self._repeats: + current_repeat += 1 + else: + with self._thread_lock: + self._status = "Separation Complete" + break + + except Exception as e: + with self._thread_lock: + self._status = f"Error in separation: {str(e)}" + finally: + with self._thread_lock: + self._running = False + self._valve_state = "Closed" + if self._status == "Starting Separation": + self._status = "Error: Operation failed to start" + elif self._status != "Separation Complete": + self._status = "Stopped" def stop_operations(self) -> str: - """ - 停止任何正在执行的操作 - """ + """停止任何正在执行的操作""" with self._thread_lock: + self._running = False + if self._operation_thread and self._operation_thread.is_alive(): + self._operation_thread.join(timeout=1.0) + self._operation_thread = None self._settling_time = 0.0 self._status = "Idle" self._shake_status = "Idle" self._shake_time = 0.0 + self._time_remaining = timedelta() return "Success" def get_status_info(self) -> dict: - """ - 获取当前设备状态信息 - """ + """获取当前设备状态信息""" with self._thread_lock: + current_time = datetime.now() + if self._start_time: + self._time_spent = current_time - self._start_time + return { "status": self._status, - "power_state": self._power_state, "valve_state": self._valve_state, "settling_time": self._settling_time, "shake_time": self._shake_time, "shake_status": self._shake_status, + "current_device": self._current_device, + "purpose": self._purpose, + "product_phase": self._product_phase, + "from_vessel": self._from_vessel, + "separation_vessel": self._separation_vessel, + "to_vessel": self._to_vessel, + "waste_phase_to_vessel": self._waste_phase_to_vessel, + "solvent": self._solvent, + "solvent_volume": self._solvent_volume, + "through": self._through, + "repeats": self._repeats, + "stir_time": self._stir_time, + "stir_speed": self._stir_speed, + "time_spent": self._time_spent.total_seconds(), + "time_remaining": self._time_remaining.total_seconds() } @@ -162,7 +378,6 @@ if __name__ == "__main__": separator = MockSeparator() print("启动简单版分离器测试...") - print(separator.power_control("On")) print("初始状态:", separator.get_status_info()) # 触发 shake 操作,模拟 10 秒的搅拌 diff --git a/unilabos/devices/mock/mock_stirrer.py b/unilabos/devices/mock/mock_stirrer.py index 6907f7e..a1f2c51 100644 --- a/unilabos/devices/mock/mock_stirrer.py +++ b/unilabos/devices/mock/mock_stirrer.py @@ -3,25 +3,11 @@ import threading class MockStirrer: - """ - 模拟搅拌器设备类 - - 这个类模拟了一个实验室搅拌器的行为,包括搅拌速度控制、 - 温度监测、加热控制等功能。参考了现有的 HeaterStirrer_DaLong 实现。 - """ - def __init__(self, port: str = "MOCK"): - """ - 初始化MockStirrer实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ self.port = port # 设备基本状态属性 self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off # 搅拌相关属性 self._stir_speed: float = 0.0 # 当前搅拌速度 (rpm) @@ -46,52 +32,18 @@ class MockStirrer: @property def status(self) -> str: - """ - 设备状态 - 会被自动识别的设备属性 - - Returns: - str: 当前设备状态 (Idle, Running, Error, Stopped) - """ return self._status - @property - def power_state(self) -> str: - """ - 电源状态 - - Returns: - str: 电源状态 (On, Off) - """ - return self._power_state - @property def stir_speed(self) -> float: - """ - 当前搅拌速度 - - Returns: - float: 当前搅拌速度 (rpm) - """ return self._stir_speed @property def target_stir_speed(self) -> float: - """ - 目标搅拌速度 - - Returns: - float: 目标搅拌速度 (rpm) - """ return self._target_stir_speed @property def stir_state(self) -> str: - """ - 搅拌状态 - - Returns: - str: 搅拌状态 (Running, Stopped) - """ return self._stir_state @property @@ -116,86 +68,26 @@ class MockStirrer: @property def heating_state(self) -> str: - """ - 加热状态 - - Returns: - str: 加热状态 (On, Off) - """ return self._heating_state @property def heating_power(self) -> float: - """ - 加热功率 - - Returns: - float: 加热功率百分比 (0-100) - """ return self._heating_power @property def max_stir_speed(self) -> float: - """ - 最大搅拌速度 - - Returns: - float: 最大搅拌速度 (rpm) - """ return self._max_stir_speed @property def max_temperature(self) -> float: - """ - 最大温度 - - Returns: - float: 最大温度 (°C) - """ return self._max_temperature # ==================== 设备控制方法 ==================== # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - self._start_operation() - else: - self._status = "Power Off" - self.stop_all_operations() - - return "Success" - def set_stir_speed(self, speed: float) -> str: - """ - 设置搅拌速度 - Args: - speed (float): 目标搅拌速度 (rpm) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ speed = float(speed) # 确保传入的速度是浮点数 - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if speed < 0 or speed > self._max_stir_speed: self._status = f"Error: Speed out of range (0-{self._max_stir_speed})" @@ -213,19 +105,7 @@ class MockStirrer: return "Success" def set_temperature(self, temperature: float) -> str: - """ - 设置目标温度 - - Args: - temperature (float): 目标温度 (°C) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ temperature = float(temperature) # 确保传入的温度是浮点数 - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if temperature < 0 or temperature > self._max_temperature: self._status = f"Error: Temperature out of range (0-{self._max_temperature})" @@ -237,15 +117,6 @@ class MockStirrer: return "Success" def start_stirring(self) -> str: - """ - 启动搅拌 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if self._target_stir_speed <= 0: self._status = "Error: No target speed set" @@ -256,30 +127,12 @@ class MockStirrer: return "Success" def stop_stirring(self) -> str: - """ - 停止搅拌 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ self._stir_state = "Stopped" self._target_stir_speed = 0.0 self._status = "Stirring Stopped" return "Success" def heating_control(self, heating_state: str = "On") -> str: - """ - 加热控制 - - Args: - heating_state (str): 加热状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" if heating_state not in ["On", "Off"]: self._status = "Error: Invalid heating state" @@ -296,12 +149,6 @@ class MockStirrer: return "Success" def stop_all_operations(self) -> str: - """ - 停止所有操作 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ self._stir_state = "Stopped" self._heating_state = "Off" self._stop_operation() @@ -325,13 +172,8 @@ class MockStirrer: # ==================== 内部控制方法 ==================== def _start_operation(self): - """ - 启动操作线程 - - 这个方法启动一个后台线程来模拟搅拌器的实际运行过程。 - """ with self._thread_lock: - if not self._running and self._power_state == "On": + if not self._running: self._running = True self._operation_thread = threading.Thread(target=self._operation_loop) self._operation_thread.daemon = True @@ -349,15 +191,7 @@ class MockStirrer: self._operation_thread.join(timeout=2.0) def _operation_loop(self): - """ - 操作主循环 - - 这个方法在后台线程中运行,模拟真实搅拌器的工作过程: - 1. 搅拌速度控制 - 2. 温度控制和加热 - 3. 状态更新 - """ - while self._running and self._power_state == "On": + while self._running: try: # 处理搅拌速度控制 if self._stir_state == "Running": @@ -431,19 +265,11 @@ class MockStirrer: break # 循环结束时的清理工作 - if self._power_state == "On": self._status = "Idle" def get_status_info(self) -> dict: - """ - 获取完整的设备状态信息 - - Returns: - dict: 包含所有设备状态的字典 - """ return { "status": self._status, - "power_state": self._power_state, "stir_speed": self._stir_speed, "target_stir_speed": self._target_stir_speed, "stir_state": self._stir_state, @@ -462,7 +288,6 @@ if __name__ == "__main__": # 测试基本功能 print("启动搅拌器测试...") - stirrer.power_control("On") print(f"初始状态: {stirrer.get_status_info()}") # 设置搅拌速度和温度 diff --git a/unilabos/devices/mock/mock_stirrer_new.py b/unilabos/devices/mock/mock_stirrer_new.py new file mode 100644 index 0000000..ac429db --- /dev/null +++ b/unilabos/devices/mock/mock_stirrer_new.py @@ -0,0 +1,229 @@ +import time +import threading +from datetime import datetime, timedelta + +class MockStirrer_new: + def __init__(self, port: str = "MOCK"): + self.port = port + + # 基本状态属性 + self._status: str = "Idle" + self._vessel: str = "" + self._purpose: str = "" + + # 搅拌相关属性 + self._stir_speed: float = 0.0 + self._target_stir_speed: float = 0.0 + self._max_stir_speed: float = 2000.0 + self._stir_state: str = "Stopped" + + # 计时相关 + self._stir_time: float = 0.0 + self._settling_time: float = 0.0 + self._start_time = datetime.now() + self._time_remaining = timedelta() + + # 运行控制 + self._operation_thread = None + self._running = False + self._thread_lock = threading.Lock() + + # 创建操作线程 + self._operation_thread = threading.Thread(target=self._operation_loop) + self._operation_thread.daemon = True + self._operation_thread.start() + + # ==================== 状态属性 ==================== + @property + def status(self) -> str: + return self._status + + @property + def stir_speed(self) -> float: + return self._stir_speed + + @property + def target_stir_speed(self) -> float: + return self._target_stir_speed + + @property + def stir_state(self) -> str: + return self._stir_state + + @property + def vessel(self) -> str: + return self._vessel + + @property + def purpose(self) -> str: + return self._purpose + + @property + def stir_time(self) -> float: + return self._stir_time + + @property + def settling_time(self) -> float: + return self._settling_time + + @property + def max_stir_speed(self) -> float: + return self._max_stir_speed + + @property + def progress(self) -> float: + """返回当前操作的进度(0-100)""" + if not self._running: + return 0.0 + elapsed = (datetime.now() - self._start_time).total_seconds() + total_time = self._stir_time + self._settling_time + if total_time <= 0: + return 100.0 + return min(100.0, (elapsed / total_time) * 100) + + # ==================== Action Server 方法 ==================== + def start_stir(self, vessel: str, stir_speed: float = 0.0, purpose: str = "") -> dict: + """ + StartStir.action 对应的方法 + """ + with self._thread_lock: + if self._running: + return { + "success": False, + "message": "Operation already in progress" + } + + try: + # 重置所有参数 + self._vessel = vessel + self._purpose = purpose + self._stir_time = 0.0 # 连续搅拌模式下不设置搅拌时间 + self._settling_time = 0.0 + self._start_time = datetime.now() # 重置开始时间 + + if stir_speed > 0: + self._target_stir_speed = min(stir_speed, self._max_stir_speed) + + self._stir_state = "Running" + self._status = "Stirring Started" + self._running = True + + return { + "success": True, + "message": "Stirring started successfully" + } + + except Exception as e: + return { + "success": False, + "message": f"Error: {str(e)}" + } + + def stir(self, stir_time: float, stir_speed: float, settling_time: float) -> dict: + """ + Stir.action 对应的方法 + """ + with self._thread_lock: + try: + # 如果已经在运行,先停止当前操作 + if self._running: + self._running = False + self._stir_state = "Stopped" + self._target_stir_speed = 0.0 + time.sleep(0.1) # 给一个短暂的停止时间 + + + # 重置所有参数 + self._stir_time = float(stir_time) + self._settling_time = float(settling_time) + self._target_stir_speed = min(float(stir_speed), self._max_stir_speed) + self._start_time = datetime.now() # 重置开始时间 + self._stir_state = "Running" + self._status = "Stirring" + self._running = True + + return {"success": True} + + except ValueError: + self._status = "Error: Invalid parameters" + return {"success": False} + + def stop_stir(self, vessel: str) -> dict: + """ + StopStir.action 对应的方法 + """ + with self._thread_lock: + if vessel != self._vessel: + return { + "success": False, + "message": "Vessel mismatch" + } + + self._running = False + self._stir_state = "Stopped" + self._target_stir_speed = 0.0 + self._status = "Stirring Stopped" + + return { + "success": True, + "message": "Stirring stopped successfully" + } + + # ==================== 内部控制方法 ==================== + + def _operation_loop(self): + """操作主循环""" + while True: + try: + current_time = datetime.now() + + with self._thread_lock: # 添加锁保护 + if self._stir_state == "Running": + # 实际搅拌逻辑 + speed_diff = self._target_stir_speed - self._stir_speed + if abs(speed_diff) > 0.1: + adjustment = speed_diff * 0.1 + self._stir_speed += adjustment + else: + self._stir_speed = self._target_stir_speed + + # 更新进度 + if self._running: + if self._stir_time > 0: # 定时搅拌模式 + elapsed = (current_time - self._start_time).total_seconds() + if elapsed >= self._stir_time + self._settling_time: + self._running = False + self._stir_state = "Stopped" + self._target_stir_speed = 0.0 + self._stir_speed = 0.0 + self._status = "Stirring Complete" + elif elapsed >= self._stir_time: + self._status = "Settling" + else: # 连续搅拌模式 + self._status = "Stirring" + else: + # 停止状态下慢慢降低速度 + if self._stir_speed > 0: + self._stir_speed = max(0, self._stir_speed - 20.0) + + time.sleep(0.1) + + except Exception as e: + print(f"Error in operation loop: {str(e)}") # 添加错误输出 + self._status = f"Error: {str(e)}" + time.sleep(1.0) # 错误发生时等待较长时间 + + def get_status_info(self) -> dict: + """获取设备状态信息""" + return { + "status": self._status, + "vessel": self._vessel, + "purpose": self._purpose, + "stir_speed": self._stir_speed, + "target_stir_speed": self._target_stir_speed, + "stir_state": self._stir_state, + "stir_time": self._stir_time, # 添加 + "settling_time": self._settling_time, # 添加 + "progress": self.progress, + "max_stir_speed": self._max_stir_speed + } \ No newline at end of file